#!/usr/bin/python # -*- coding: utf-8 -*- #Author: Wesnydy L. Ribeiro #Author: Erickson Silva #LAViD - Laboratório de Aplicações de Vídeo Digital import os import pika import json from subprocess import call, Popen, PIPE from thread import start_new_thread from time import sleep from urllib import urlretrieve # credentials = pika.PlainCredentials('test', 'test') # conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials)) # conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials)) conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) PATH_VIDEO="/storage/videos/" def main_video_height(main_video): # Get video height using ffprobe try: pipe = Popen( ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-print_format', 'json', '-show_entries', 'stream=height', main_video], stdout=PIPE, shell=False) video_height = json.loads(pipe.communicate()[0]) return video_height['streams'][0]['height'] except OSError as ex: print ex.errno return None def secondary_video_heigth(main_height, window_size): # Default height if main_height is None: main_height = 324 # Size = 1, the video window is small if window_size == 'small': return int(0.3 * int(main_height)) # Size = 3, the video window is large elif window_size == 'large': return int(0.5 * int(main_height)) # Size = 2 or Default, the video window is medium else: return int(0.4 * int(main_height)) def secondary_video_position(window_position): # Position = 1, overlap the window at top Left on main video if window_position == 'top_left': return "10:10" # Position = 2, overlap the window at top right on main video elif window_position == 'top_right': return "main_w-overlay_w-10:10" # Position = 3, overlap the window at bottom right on main video elif window_position == 'bottom_right': return "main_w-overlay_w-10:main_h-overlay_h-10" # Position = 4, overlap the window at bottom left on main video elif window_position == 'bottom_left': return "10:main_h-overlay_h-10" # Default, overlap the window at bottom right on main video else: return "main_w-overlay_w-10:main_h-overlay_h-10" def run(ch, method, properties, body): #main_video, libras_video, window_size, window_position body = json.loads(body) # Download original video try: orginal_video = urlretrieve(body["video"].encode("utf-8"))[0] except IOError, e: print str(e) return # Get the original video height main_height = main_video_height(orginal_video) # Calculates the window height based on the original video height window_heigth = secondary_video_heigth(main_height, body["window_size"].encode("utf-8")) # The width is proportionally to height window_width = '-1' # Get the window position regarding to the main video window_pos = secondary_video_position(body["window_position"].encode("utf-8")) # Window movie movie = 'movie=' + body["libras_video"].encode("utf-8") # Scale of window movie scale = 'scale=' + str(window_width) + ':' + str(window_heigth) # Overlay position overlay = '[movie] overlay=' + window_pos + ' [out]' # -Vf param filter_graph = ','.join([movie, scale, 'setpts=PTS-STARTPTS', overlay]) # Create file path mixed_video = os.path.join(PATH_VIDEO, properties.correlation_id.encode("utf-8")+".mp4") # Mix videos using ffmpeg print "Mixing videos...", try: call( ['ffmpeg', '-v', 'error', '-i', orginal_video, '-y', '-vf', filter_graph, '-qscale', '0', '-strict', 'experimental', '-vcodec', 'libx264', '-preset', 'fast', '-r', '30', '-threads', '4', mixed_video], shell=False) print 'OK' except OSError as ex: print ex.errno body["mixed_video"] = mixed_video os.remove(orginal_video) os.remove(body["libras_video"]) send_to_queue(body, properties) def send_to_queue(body, props): try: channel = conn_send.channel() except: reload_connection_send() channel = conn_send.channel() queue = "videos" channel.queue_declare(queue=queue) channel.basic_publish(exchange='', routing_key=queue, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=json.dumps(body)) channel.close() def receive_from_queue(): try: channel = conn_receive.channel() except: reload_connection_receive() channel = conn_receive.channel() queue = "libras" channel.queue_declare(queue=queue) channel.basic_qos(prefetch_count=1) channel.basic_consume(run, queue=queue, no_ack=True) channel.start_consuming() channel.close() def reload_connection_send(): global conn_send try: conn_send.close() except: pass conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def reload_connection_receive(): global conn_receive try: conn_receive.close() except: pass conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def keep_alive(conn_send, conn_receive): while True: sleep(30) try: conn_send.process_data_events() conn_receive.process_data_events() except: continue #start_new_thread(keep_alive, (conn_send, conn_receive)) while True: try: receive_from_queue() except KeyboardInterrupt: conn_send.close() conn_receive.close() os._exit(0)