renderer.py 6.37 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Author: Erickson Silva
E-Mail: erickson.silva@lavid.ufpb.br

Author: Jonathan Lincoln Brilhante
E-Mail: jonathan.lincoln.brilhante@gmail.com

Author: Wesnydy Lima Ribeiro
E-Mail: wesnydy@lavid.ufpb.br
"""

import ffmpy
import json
import os
import pika
import PikaManager
import signal
import socket
import subprocess

from pyvirtualdisplay import Display
from thread import start_new_thread
from time import sleep

#Temporary
from shutil import rmtree

# Manager of queues connections.
manager = PikaManager.PikaManager("localhost")

TCP_IP = '0.0.0.0'
TCP_PORT = 5555

CONTROL_MESSAGE = "FINALIZE"
DEFAULT_PTS = "#-1"

PATH_LIBRAS = os.getenv("VLIBRAS_VIDEO_LIBRAS")
VIDEO_CREATOR = os.getenv("VLIBRAS_VIDEO_CREATOR")

#Temporary
PATH_FRAMES = os.getenv("VLIBRAS_VIDEO_SCREENS")

# Status of renderer to process new requests. Answer one request at a time.
worker_available = True
# Path of libras video.
libras_video = None
# pyvirtualdisplay instance
display = None
# ffmpeg process instance
ffmpeg = None

def start_video_creator(id):
    """
    Start video creator server.

    Parameters
    ----------
    id : string
        Identification of request.
    """
    global display, ffmpeg
    # logger.info("Starting video creator server")
    display = Display(visible=0, size=(800,600))
    display.start()
    subprocess.call(
        [
            VIDEO_CREATOR,
            id,
            "1",
            "30",
            "20",
            "25",
            "-screen-fullscreen", "1",
            "-screen-quality", "Fantastic",
            "-force-opengl"
        ],
        shell=False
    )
    ffmpeg.send_signal(signal.SIGQUIT)
    ffmpeg.communicate()
    display.stop()

def start_ffmpeg(id):
    """
    Start FFmpeg to capture the video creator display.

    Parameters
    ----------
    id : string
        Identification of request.
    """
    global ffmpeg, display, libras_video
    # 'vl.mp4' sufix to distinguish from video with watermark
    libras_video = os.path.join(PATH_LIBRAS, id + "vl.mp4")
    ffmpeg = subprocess.Popen(
        [
            "ffmpeg",
            "-y",
            "-loglevel", "quiet",
            "-video_size", "800x600",
            "-r", "30",
            "-f", "x11grab",
            "-draw_mouse", "0",
            "-i", str(display.cmd_param[-1]) + ".0+nomouse",
            "-vcodec", "libx264",
            "-pix_fmt", "yuv420p",
            "-an",
            libras_video
        ],
        shell=False
    )

def open_socket_connection():
    """
    Create a new socket TCP connection with video creator server.

    Returns
    -------
    socket object
        Connection with video creator server.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # logger.info("Opening connection with video creator")
    while True:
        try:
            s.connect((TCP_IP, TCP_PORT))
            break
        except:
            sleep(2)
    return s

def send_to_video_creator(id, message):
    # Stablishes connection with video creator server.
    socket = open_socket_connection()
    # Send gloss to video creator.
    socket.send(str(message)+DEFAULT_PTS)
    # Receive a response from the video creator
    socket.recv(3)
    # Send the control message to video creator
    socket.send(CONTROL_MESSAGE+DEFAULT_PTS)
    # Start ffmpeg to capture the video creator display.
    start_ffmpeg(id)
    # Close the connection with video creator server.
    socket.close()

def run(ch, method, properties, body):
    """
    Execute the worker.

    Parameters
    ----------
    ch : object
        Channel of communication.
    method : function
        Callback method.
    properties : object
        Message containing a set of 14 properties.
    body : string
        Json string containing the necessary arguments for workers.
    """
    global worker_available

    print ("Rendering...")
    if worker_available:
        worker_available = False
        start_new_thread(send_to_video_creator, (properties.correlation_id, body))
        start_video_creator(properties.correlation_id)

        final_video = os.path.join(PATH_LIBRAS, properties.correlation_id+".mp4")
        watermark(libras_video, final_video, "Vídeo gerado automaticamente pelo usuário")
        try:
            filesize = os.path.getsize(libras_video)
        except:
            filesize = 0

        body = {"file": properties.correlation_id+".mp4", "size": filesize, "status": "success"}
        json.dumps(body)

        manager.send_to_queue("videos", body, properties)

        clean(properties.correlation_id)
        worker_available = True
        print ("Ok")
    else:
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

# Post processor functions
def watermark(input_video, output_video, text):
    fontfile = os.getenv("TEXT_FONTS")

    filter_graph = ','.join(
        [
            "mpdecimate",
            "setpts=N/FRAME_RATE/TB",
            "drawbox=y=ih-208:color=black@0.4:width=iw:height=48:t=max",
            "drawtext=fontfile="+fontfile+":text="+text+":fontcolor=white@0.8:fontsize=36:y=h-200:x=w/10*mod(t\,10)"
        ]
    )

    ffmpeg = ffmpy.FFmpeg(
        inputs = { input_video: None },
        outputs = {
            output_video: [
                "-loglevel", "error",
                "-vf", filter_graph,
                "-ss", "0.03",
                "-c:v", "libx264",
                "-movflags", "+faststart",
                "-y"
            ]
        }
    )
    try:
        ffmpeg.run()
    except:
        print "FFmpeg retuned with a non-zero exit code. Watermark error."

#Temporary
def clean(id):
    # logger.info("Cleaning screens files")
    os.remove(libras_video)
    path = os.path.join(PATH_FRAMES, id)
    rmtree(path, ignore_errors=True)

def keep_alive(conn_send, conn_receive):
    """
    Keep the connection alive.

    Parameters
    ----------
    conn_send : object
        Connection of writer.
    conn_receive : object
        Connection of receiver.
    """
    while  True:
        sleep(30)
        try:
            conn_send.process_data_events()
            conn_receive.process_data_events()
        except:
            continue

# start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive()))

print("Renderer listening...")
while True:
    try:
        manager.receive_from_queue("glosses", run)
    except KeyboardInterrupt:
        manager.close_connections()
        os._exit(0)