renderer.py 8.04 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import logging
import os
import pika
import PikaManager
import signal
import socket
import subprocess

from operator import itemgetter
from pyvirtualdisplay import Display
from thread import start_new_thread
from time import sleep

#Temporary
from shutil import rmtree

#Temporary
def make_dir_if_exists(path):
    if not os.path.exists(path):
        os.makedirs(path)

# Logging configuration.
logger = logging.getLogger('renderer')
logger.setLevel(logging.DEBUG)

fh = logging.FileHandler('/home/vlibras/log/renderer.log')
fh.setLevel(logging.DEBUG)

ch = logging.StreamHandler()
ch.setLevel(logging.INFO)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

logger.addHandler(fh)
logger.addHandler(ch)

# Manager of queues connections.
#manager = PikaManager.PikaManager("150.165.205.10", "test", "test")

manager = PikaManager.PikaManager("rabbit")

TCP_IP = '0.0.0.0'
TCP_PORT = 5555

PATH_LIBRAS = os.getenv("VLIBRAS_VIDEO_LIBRAS")
VIDEO_CREATOR = os.getenv("VLIBRAS_VIDEO_CREATOR")
PATH_SCREENS = os.getenv("VLIBRAS_VIDEO_SCREENS")

#Create Paths if needed
make_dir_if_exists(PATH_LIBRAS)
make_dir_if_exists(VIDEO_CREATOR)
make_dir_if_exists(PATH_SCREENS)

# Status of renderer to process new requests. Answer one request at a time.
worker_available = True
# Identification to indicate the request being processed.
correlation_id = None
# Array that stores gloss and pts in json format to be sent to videoCreator.
gloss_buffer = []
# pyvirtualdisplay instance
display = None
# ffmpeg process instance
ffmpeg = None

def start_video_creator(id):
    """
    Start video creator server.

    Parameters
    ----------
    id : string
        Identification of request.
    """
    global display, ffmpeg
    logger.info("Starting video creator server")
    display = Display(visible=0, size=(800,600))
    display.start()
    subprocess.call(
        [
            VIDEO_CREATOR,
            id,
            "0",
            "30",
            "20",
            "25",
            "-screen-fullscreen", "1",
            "-screen-quality", "Fantastic",
            "-force-opengl"
        ],
        shell=False
    )
    ffmpeg.send_signal(signal.SIGQUIT)
    ffmpeg.communicate()
    display.stop()

def start_ffmpeg(id):
    """
    Start FFmpeg to capture the video creator display.

    Parameters
    ----------
    id : string
        Identification of request.
    """
    global ffmpeg, display
    logger.info("Starting ffmpeg")
    libras_video = os.path.join(PATH_LIBRAS, id + ".mp4")
    ffmpeg = subprocess.Popen(
        [
            "ffmpeg",
            "-y",
            "-loglevel", "quiet",
            "-video_size", "800x600",
            "-r", "30",
            "-f", "x11grab",
            "-draw_mouse", "0",
            "-i", str(display.cmd_param[-1]) + ".0+nomouse",
            "-vcodec", "libx264",
            "-pix_fmt", "yuv420p",
            "-an",
            libras_video
        ],
        shell=False
    )

def open_socket_connection():
    """
    Create a new socket TCP connection with video creator server.

    Returns
    -------
    socket object
        Connection with video creator server.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    logger.info("Opening connection with video creator")
    while True:
        try:
            s.connect((TCP_IP, TCP_PORT))
            break
        except:
            sleep(2)
    return s

def send_to_video_creator(id):
    # Stablishes connection with video creator server.
    socket = open_socket_connection()
    # Sort buffer to restore the original sequence.
    sorted_buffer = sorted(gloss_buffer, key=itemgetter("index"))
    logger.info("Sending gloss to video creator")
    for content in sorted_buffer:
        try:
            # Send gloss to video creator.
            socket.send(content["gloss"].encode("utf-8")+"#"+str(content["pts"]))
        except KeyError:
            logger.info("Sending control message to video creator")
            socket.send(content["control-message"].encode("utf-8")+"#"+str(content["pts"]))
            # Start ffmpeg to capture the video creator display.
            logger.info("Rendering video")
            start_ffmpeg(id)
        # sleep for 500 milliseconds
        sleep(.500)
    socket.close()
    del gloss_buffer[:]

def run(ch, method, properties, body):
    """
    Execute the worker.

    Parameters
    ----------
    ch : object
        Channel of communication.
    method : function
        Callback method.
    properties : object
        Message containing a set of 14 properties.
    body : string
        Json string containing the necessary arguments for workers.
    """
    global worker_available, correlation_id
    body = json.loads(body)
    # Check if worker is available to process a new request.
    if worker_available:
        logger.info("Processing request " + properties.correlation_id.encode("utf-8"))
        # Accept only messages with index equals to 1.
        try:
            if body["index"] == 1:
                # Change the status of renderer to occupied.
                worker_available = False
                # Stores the id of request in process.
                correlation_id = properties.correlation_id.encode("utf-8")
                # Stores the first gloss in the buffer.
                gloss_buffer.append(body)
            else:
                ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
        except KeyError:
            ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    # Else the worker is alread processing a request.
    else:
        # Check if the id of message match with the id of request being processed.
        if properties.correlation_id.encode("utf-8") == correlation_id:
            # Check if the body contains the control-message.
            try:
                if body["control-message"] == "FINALIZE":
                    # Get the total number of gloss of the current request.
                    total = body["index"] # Index of "FINALIZE" is the total number of gloss.
                    # Check if the buffer contains the correct number of gloss.
                    if len(gloss_buffer) == total - 1:
                        gloss_buffer.append(body)
                        logger.info("Preparing to generate the video")
                        start_new_thread(send_to_video_creator, (correlation_id,))
                        start_video_creator(correlation_id)
                        #Temporary
                        clean(correlation_id)
                        # Add path of libras video on body.
                        body["libras-video"] = os.path.join(PATH_LIBRAS, correlation_id + ".mp4")
                        worker_available = True
                        correlation_id = None
                        logger.info("Sending libras video to the libras queue")
                        manager.send_to_queue("libras", body, properties)
                        print ("OK")
                    else:
                        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
            except KeyError:
                # Control message doesn't exist, continues to store gloss.
                gloss_buffer.append(body)
        else:
            ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

#Temporary
def clean(id):
	logger.info("Cleaning screens files")
	path = os.path.join(PATH_SCREENS, id)
	rmtree(path, ignore_errors=True)

def keep_alive(conn_send, conn_receive):
	"""
    Keep the connection alive.

    Parameters
    ----------
    conn_send : object
        Connection of writer.
    conn_receive : object
        Connection of receiver.
    """
	while  True:
		sleep(30)
		try:
			conn_send.process_data_events()
			conn_receive.process_data_events()
		except:
			continue

start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive()))

print("Renderer listening...")
while True:
	try:
		manager.receive_from_queue("translations", run)
	except KeyboardInterrupt:
		manager.close_connections()
		os._exit(0)