mixer.py 5.99 KB
#!/usr/bin/python
# -*- coding: utf-8 -*-

#Author: Wesnydy L. Ribeiro <wesnydy@lavid.ufpb.br>
#Author: Erickson Silva <erickson.silva@lavid.ufpb.br>

#LAViD - Laboratório de Aplicações de Vídeo Digital

import os
import pika
import json
from subprocess import call, Popen, PIPE
from thread import start_new_thread
from time import sleep
from urllib import urlretrieve

# credentials = pika.PlainCredentials('test', 'test')
# conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials))
# conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials))

conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))

PATH_VIDEO="/storage/videos/"

def main_video_height(main_video):
    # Get video height using ffprobe
    try:
        pipe = Popen(
                ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-print_format',
                'json', '-show_entries', 'stream=height', main_video],
                stdout=PIPE, shell=False)
        video_height = json.loads(pipe.communicate()[0])
        return video_height['streams'][0]['height']
    except OSError as ex:
        print ex.errno
        return None

def secondary_video_heigth(main_height, window_size):
    # Default height
    if main_height is None:
        main_height = 324
    # Size = 1, the video window is small
    if window_size == 'small':
        return int(0.3 * int(main_height))
    # Size = 3, the video window is large
    elif window_size == 'large':
        return int(0.5 * int(main_height))
    # Size = 2 or Default, the video window is medium
    else:
        return int(0.4 * int(main_height))

def secondary_video_position(window_position):
    # Position = 1, overlap the window at top Left on main video
    if window_position == 'top_left':
        return "10:10"
    # Position = 2, overlap the window at top right on main video
    elif window_position == 'top_right':
        return "main_w-overlay_w-10:10"
    # Position = 3, overlap the window at bottom right on main video
    elif window_position == 'bottom_right':
        return "main_w-overlay_w-10:main_h-overlay_h-10"
    # Position = 4, overlap the window at bottom left on main video
    elif window_position == 'bottom_left':
        return "10:main_h-overlay_h-10"
    # Default, overlap the window at bottom right on main video
    else:
        return "main_w-overlay_w-10:main_h-overlay_h-10"

def run(ch, method, properties, body):
    #main_video, libras_video, window_size, window_position
    body = json.loads(body)
    # Download original video
    try:
        orginal_video = urlretrieve(body["video"].encode("utf-8"))[0]
    except IOError, e:
        print str(e)
        return
    # Get the original video height
    main_height = main_video_height(orginal_video)
    # Calculates the window height based on the original video height
    window_heigth = secondary_video_heigth(main_height, body["window_size"].encode("utf-8"))
    # The width is proportionally to height
    window_width = '-1'
    # Get the window position regarding to the main video
    window_pos = secondary_video_position(body["window_position"].encode("utf-8"))

    # Window movie
    movie = 'movie=' + body["libras_video"].encode("utf-8")
    # Scale of window movie
    scale = 'scale=' + str(window_width) + ':' + str(window_heigth)
    # Overlay position
    overlay = '[movie] overlay=' + window_pos + ' [out]'
    # -Vf param
    filter_graph = ','.join([movie, scale, 'setpts=PTS-STARTPTS', overlay])
    # Create file path
    mixed_video = os.path.join(PATH_VIDEO, properties.correlation_id.encode("utf-8")+".mp4")
    # Mix videos using ffmpeg
    print "Mixing videos...",
    try:
        call(
            ['ffmpeg', '-v', 'error', '-i', orginal_video, '-y', '-vf', filter_graph,
            '-qscale', '0', '-strict', 'experimental', '-vcodec', 'libx264',
            '-preset', 'fast', '-r', '30', '-threads', '4', mixed_video], shell=False)
        print 'OK'
    except OSError as ex:
        print ex.errno
    body["mixed_video"] = mixed_video
    os.remove(orginal_video)
    os.remove(body["libras_video"])
    send_to_queue(body, properties)

def send_to_queue(body, props):
    try:
        channel = conn_send.channel()
    except:
        reload_connection_send()
        channel = conn_send.channel()
    queue = "videos"
    channel.queue_declare(queue=queue)
    channel.basic_publish(exchange='',
                      routing_key=queue,
                      properties=pika.BasicProperties(correlation_id = props.correlation_id),
                      body=json.dumps(body))
    channel.close()
    
def receive_from_queue():
    try:
        channel = conn_receive.channel()
    except:
        reload_connection_receive()
        channel = conn_receive.channel()
    queue = "libras"
    channel.queue_declare(queue=queue)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(run,
                  queue=queue,
                  no_ack=True)
    channel.start_consuming()
    channel.close()

def reload_connection_send():
    global conn_send
    try:
        conn_send.close()
    except:
        pass
    conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))

def reload_connection_receive():
    global conn_receive
    try:
        conn_receive.close()
    except:
        pass
    conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))

def keep_alive(conn_send, conn_receive):
    while True:
        sleep(30)
        try:
            conn_send.process_data_events()
            conn_receive.process_data_events()
        except:
            continue

#start_new_thread(keep_alive, (conn_send, conn_receive))
while True:
    try:
        receive_from_queue()
    except KeyboardInterrupt:
        conn_send.close()
        conn_receive.close()
        os._exit(0)