#!/usr/bin/python # -*- coding: utf-8 -*- #Autor: Erickson Silva #Email: #LAViD - Laboratório de Aplicações de Vídeo Digital import os import sys import pika import pysrt import socket import json import subprocess from time import sleep from thread import start_new_thread from pyvirtualdisplay import Display from operator import itemgetter from shutil import rmtree, move # credentials = pika.PlainCredentials('test', 'test') # conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials)) # conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials)) conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) running = False contents = [] correlation_id = None TCP_IP = '0.0.0.0' TCP_PORT = 5555 PATH_SCREENS="/storage/frames/" PATH_LIBRAS="/storage/libras/" PATH_VIDEO="/storage/videos/" VIDEO_CREATOR="/root/unityVideo/videoCreator.x86_64" def run(ch, method, properties, body): global running, correlation_id body = json.loads(body) if running: if properties.correlation_id.encode("utf-8") == correlation_id: try: if body["control-message".decode("utf-8")] == "FINALIZE".decode("utf-8"): size = body["index"] if len(contents) == size-1: contents.append(body) ch.basic_ack(delivery_tag = method.delivery_tag) make_video(correlation_id) body['libras_video'] = os.path.join(PATH_LIBRAS, correlation_id+".mp4") running = False correlation_id = "" send_to_queue(body, properties) else: ch.basic_reject(delivery_tag=method.delivery_tag) except KeyError: contents.append(body) ch.basic_ack(delivery_tag = method.delivery_tag) else: ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) else: if "index" in body and body["index"] == 1: running = True correlation_id = properties.correlation_id.encode("utf-8") contents.append(body) ch.basic_ack(delivery_tag = method.delivery_tag) elif "type" in body and body["type"] == "text": body['pts'] = -1 body['index'] = 0 contents.append(body) message = {'control-message': "FINALIZE", 'pts': -1, 'index': 1} contents.append(message) ch.basic_ack(delivery_tag = method.delivery_tag) make_video(properties.correlation_id.encode("utf-8")) path_libras = os.path.join(PATH_LIBRAS, properties.correlation_id.encode("utf-8")+".mp4") path_video = os.path.join(PATH_VIDEO, properties.correlation_id.encode("utf-8")+".mp4") move(path_libras, path_video) body['libras_video'] = path_video send_to_queue(body, properties) else: ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) def make_video(id): start_new_thread(send_to_player, ()) capture(id) render(id) clean(id) def send_to_player(): socket = open_socket() contents_sorted = sorted(contents, key=itemgetter('index')) for message in contents_sorted: try: socket.send(message["gloss"].encode('utf-8')+"#"+str(message["pts"])) except KeyError: socket.send(message["control-message"].encode('utf-8')+"#"+str(message["pts"])) sleep(1) socket.close() del contents[:] def open_socket(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while True: try: s.connect((TCP_IP, TCP_PORT)) break except: sleep(2) return s def render(id): print "Rendering..." subprocess.call(["ffmpeg", "-y", "-loglevel", "quiet", "-framerate", "30", "-i", os.path.join(PATH_SCREENS, id+"/frame_%d.png"), "-vcodec", "libx264", "-pix_fmt", "yuv420p", PATH_LIBRAS+id+".mp4"], shell=False) print "OK" def capture(id): print "Capture..." display = Display(visible=0, size=(800, 600)) display.start() subprocess.call([VIDEO_CREATOR, id, "0", "30", "20", "25", "-screen-fullscreen", "1", "-screen-quality", "Fantastic", "-force-opengl"], shell=False) display.stop() print "OK" def clean(id): path = os.path.join(PATH_SCREENS, id) rmtree(path, ignore_errors=True) def send_to_queue(body, props): try: channel = conn_send.channel() except KeyError: reload_connection_send() channel = conn_send.channel() queue = "libras" if body["type"].encode("UTF-8") == "video" else "videos" channel.basic_publish(exchange='', routing_key=queue, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=json.dumps(body)) channel.close() def receive_from_queue(): try: channel = conn_receive.channel() except KeyError: reload_connection_receive() channel = conn_receive.channel() queue = "translations" channel.basic_qos(prefetch_count=1) channel.basic_consume(run, queue=queue) channel.start_consuming() channel.close() def reload_connection_send(): global conn_send try: conn_send.close() except: pass conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def reload_connection_receive(): global conn_receive try: conn_receive.close() except: pass conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def keep_alive(conn_send, conn_receive): while True: sleep(30) try: conn_send.process_data_events() conn_receive.process_data_events() except: continue start_new_thread(keep_alive, (conn_send, conn_receive)) while True: try: receive_from_queue() except KeyboardInterrupt: conn_send.close() conn_receive.close() os._exit(0) except: continue