renderer.py 5.5 KB
#!/usr/bin/python
# -*- coding: utf-8 -*-

#Autor: Erickson Silva 
#Email: <erickson.silva@lavid.ufpb.br>

#LAViD - Laboratório de Aplicações de Vídeo Digital

import os
import sys
import pika
import pysrt
import socket
import json
import subprocess
from time import sleep
from thread import start_new_thread
from pyvirtualdisplay import Display
from operator import itemgetter
from shutil import rmtree, move

# credentials = pika.PlainCredentials('test', 'test')
# conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials))
# conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials))

conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))

running = False
contents = []
correlation_id = None

TCP_IP = '0.0.0.0'
TCP_PORT = 5555

PATH_SCREENS="/storage/frames/"
PATH_LIBRAS="/storage/libras/"
PATH_VIDEO="/storage/videos/"
VIDEO_CREATOR="/root/unityVideo/videoCreator.x86_64"

def run(ch, method, properties, body):
	global running, correlation_id
	body = json.loads(body)
	if running:
		if properties.correlation_id.encode("utf-8") == correlation_id:
			try:
				if body["control-message".decode("utf-8")] == "FINALIZE".decode("utf-8"):
					size = body["index"]
					if len(contents) == size-1:
						contents.append(body)
						ch.basic_ack(delivery_tag = method.delivery_tag)
						make_video(correlation_id)
						body['libras_video'] = os.path.join(PATH_LIBRAS, correlation_id+".mp4")
						running = False
						correlation_id = ""
						send_to_queue(body, properties)
					else:
						ch.basic_reject(delivery_tag=method.delivery_tag)
			except KeyError:
				contents.append(body)
				ch.basic_ack(delivery_tag = method.delivery_tag)
		else:
			ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
	else:
		if "index" in body and body["index"] == 1:
			running = True
			correlation_id = properties.correlation_id.encode("utf-8")
			contents.append(body)
			ch.basic_ack(delivery_tag = method.delivery_tag)
		elif "type" in body and body["type"] == "text":
			body['pts'] = -1
			body['index'] = 0
			contents.append(body)
			message = {'control-message': "FINALIZE", 'pts': -1,  'index': 1}
			contents.append(message)		
			ch.basic_ack(delivery_tag = method.delivery_tag)
			make_video(properties.correlation_id.encode("utf-8"))
			path_libras = os.path.join(PATH_LIBRAS, properties.correlation_id.encode("utf-8")+".mp4")
			path_video = os.path.join(PATH_VIDEO, properties.correlation_id.encode("utf-8")+".mp4")
			move(path_libras, path_video)
			body['libras_video'] = path_video
			send_to_queue(body, properties)
		else:
			ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

def make_video(id):
	start_new_thread(send_to_player, ())
	capture(id)
	render(id)
	clean(id)

def send_to_player():
	socket = open_socket()
	contents_sorted = sorted(contents, key=itemgetter('index'))  
	for message in contents_sorted:
		try:
			socket.send(message["gloss"].encode('utf-8')+"#"+str(message["pts"])) 
		except KeyError:
			socket.send(message["control-message"].encode('utf-8')+"#"+str(message["pts"])) 
		sleep(1)
	socket.close()
	del contents[:]

def open_socket():
	s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
	while True:
		try:
			s.connect((TCP_IP, TCP_PORT))
			break
		except:
			sleep(2)
	return s

def render(id):
	print "Rendering..."
	subprocess.call(["ffmpeg", "-y", "-loglevel", "quiet", "-framerate", "30", "-i", os.path.join(PATH_SCREENS, id+"/frame_%d.png"), "-vcodec", "libx264", "-pix_fmt", "yuv420p", PATH_LIBRAS+id+".mp4"], shell=False)
	print "OK"

def capture(id):
	print "Capture..."
	display = Display(visible=0, size=(800, 600))
	display.start()
	subprocess.call([VIDEO_CREATOR, id, "0", "30", "20", "25", "-screen-fullscreen", "1", "-screen-quality", "Fantastic", "-force-opengl"], shell=False)
	display.stop()
	print "OK"

def clean(id):
	path = os.path.join(PATH_SCREENS, id)
	rmtree(path, ignore_errors=True)

def send_to_queue(body, props):	
	try:
		channel = conn_send.channel()
	except KeyError:
		reload_connection_send()
		channel = conn_send.channel()
	queue = "libras" if body["type"].encode("UTF-8") == "video" else "videos"
	channel.basic_publish(exchange='',
					  routing_key=queue,
					  properties=pika.BasicProperties(correlation_id = props.correlation_id),
					  body=json.dumps(body))
	channel.close()
	
def receive_from_queue():
	try:
		channel = conn_receive.channel()
	except KeyError:
		reload_connection_receive()
		channel = conn_receive.channel()
	queue = "translations"
	channel.basic_qos(prefetch_count=1)
	channel.basic_consume(run,
				  queue=queue)
	channel.start_consuming()
	channel.close()

def reload_connection_send():
	global conn_send
	try:
		conn_send.close()
	except:
		pass
	conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))

def reload_connection_receive():
	global conn_receive
	try:
		conn_receive.close()
	except:
		pass
	conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
	
def keep_alive(conn_send, conn_receive):
	while True:
		sleep(30)
		try:
			conn_send.process_data_events()
			conn_receive.process_data_events()
		except:
			continue

start_new_thread(keep_alive, (conn_send, conn_receive))
while True:
	try:
		receive_from_queue()
	except KeyboardInterrupt:
		conn_send.close()
		conn_receive.close()
		os._exit(0)
	except:
		continue