#!/usr/bin/python # -*- coding: utf-8 -*- #Autor: Erickson Silva #Email: #LAViD - Laboratório de Aplicações de Vídeo Digital import os import pika import pysrt import json from thread import start_new_thread from time import sleep from urllib import urlretrieve # credentials = pika.PlainCredentials('test', 'test') # conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials)) # conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials)) conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def run(ch, method, properties, body): body = json.loads(body) try: filename = urlretrieve(body["subtitle"].encode("utf-8"))[0] except IOError, e: print str(e) return try: subtitle = pysrt.open(filename) except UnicodeDecodeError: subtitle = pysrt.open(filename, encoding='iso-8859-1') index = 1 print "Extracting...", for sub in subtitle: pts = calculate_ms(str(sub.start)) message = {'text': sub.text.encode("utf-8"), 'pts': pts, 'index': index} send_to_queue(message, properties) index += 1 body['control-message'] = "FINALIZE" body['pts'] = -1 body['index'] = index os.remove(filename) send_to_queue(body, properties) print "OK" def calculate_ms(time_in): time = time_in.split(":") time = time[:2] + time[2].split(",") hour = int(time[0]) * 3600000 minute = int(time[1]) * 60000 second = int(time[2]) * 1000 millisec = int(time[3]) + second + minute + hour return millisec def send_to_queue(body, props): try: channel = conn_send.channel() except KeyError: reload_connection_send() channel = conn_send.channel() queue = "extractions" channel.queue_declare(queue=queue) channel.basic_publish(exchange='', routing_key=queue, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=json.dumps(body)) channel.close() def receive_from_queue(): try: channel = conn_receive.channel() except KeyError: reload_connection_receive() channel = conn_receive.channel() queue = "requests" channel.queue_declare(queue=queue) channel.basic_qos(prefetch_count=1) channel.basic_consume(run, queue=queue, no_ack=True) channel.start_consuming() channel.close() def reload_connection_send(): global conn_send try: conn_send.close() except: pass conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def reload_connection_receive(): global conn_receive try: conn_receive.close() except: pass conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0)) def keep_alive(conn_send, conn_receive): while True: sleep(30) try: conn_send.process_data_events() conn_receive.process_data_events() except: continue start_new_thread(keep_alive, (conn_send, conn_receive)) while True: try: receive_from_queue() except KeyboardInterrupt: conn_send.close() conn_receive.close() os._exit(0)