Commit 8cae4b85a70896768145f35239d352349273390e

Authored by Wesnydy Ribeiro
1 parent 7854e8d0
Exists in release

Test version

core/extractor.py
... ... @@ -15,33 +15,35 @@ Author: Wesnydy Lima Ribeiro
15 15 E-Mail: wesnydy@lavid.ufpb.br
16 16 """
17 17  
  18 +import json
  19 +import logging
18 20 import os
19 21 import pika
20 22 import PikaManager
21 23 import pysrt
22   -import json
23   -import logging #Logging
24 24  
25 25 from thread import start_new_thread
26 26 from time import sleep
27 27 from urllib import urlretrieve
28 28  
29   -logger = logging.getLogger('extractor')
  29 +# Logging configuration.
  30 +logger = logging.getLogger("extractor")
30 31 logger.setLevel(logging.DEBUG)
31 32  
32   -fh = logging.FileHandler('../log/extractor.log')
  33 +fh = logging.FileHandler("../log/extractor.log")
33 34 fh.setLevel(logging.DEBUG)
34 35  
35 36 ch = logging.StreamHandler()
36 37 ch.setLevel(logging.ERROR)
37 38  
38   -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  39 +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
39 40 fh.setFormatter(formatter)
40 41 ch.setFormatter(formatter)
41 42  
42 43 logger.addHandler(fh)
43 44 logger.addHandler(ch)
44 45  
  46 +# Manager of queues connections.
45 47 manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
46 48  
47 49 def run(ch, method, properties, body):
... ... @@ -56,46 +58,45 @@ def run(ch, method, properties, body):
56 58 Callback method.
57 59 properties : object
58 60 Message containing a set of 14 properties.
59   - body : json object
60   - Informations received from queue.
  61 + body : string
  62 + Json string containing the necessary arguments for workers.
61 63 """
62   - logger.info("Processando a requisição " + properties.correlation_id.encode("utf-8"))
63   - # body it's a json that contains subtitle file
  64 + logger.info("processing request " + properties.correlation_id.encode("utf-8"))
64 65 body = json.loads(body)
65 66 try:
66   - # Try to download the subtitle
67   - logger.info("Obtendo o arquivo de legendas")
  67 + logger.info("Downloading subtitle")
68 68 filename = urlretrieve(body["subtitle"].encode("utf-8"))[0]
69 69 except IOError, e:
70   - logger.error("Falha ao obter o arquivo de legendas")
71   - # Returns if can't download the subtitle file
  70 + logger.error("Download of subtitle fail")
72 71 return
  72 +
73 73 try:
  74 + # Tries to open file with utf-8 encoding.
74 75 subtitle = pysrt.open(filename)
75 76 except UnicodeDecodeError:
76   - subtitle = pysrt.open(filename, encoding='iso-8859-1')
  77 + # Tries to open file with iso-8859-1 encoding if utf-8 encoding fails.
  78 + subtitle = pysrt.open(filename, encoding="iso-8859-1")
  79 +
77 80 index = 1
78   - # Initialize the extraction of subtitles
79 81 print ("Extracting...")
80   - logger.info("Extraindo legendas do arquivo")
  82 + logger.info("Extracting subtitles from file")
81 83 for sub in subtitle:
82 84 pts = calculate_ms(str(sub.start))
83   - message = {'text': sub.text.encode("utf-8"), 'pts': pts, 'index': index}
  85 + message = {"text": sub.text.encode("utf-8"), "pts": pts, "index": index}
84 86 manager.send_to_queue("extractions", message, properties)
85 87 index += 1
86   - # Flag indicating the end of extraction
87   - body['control-message'] = "FINALIZE"
88   - body['pts'] = -1
89   - body['index'] = index
90   - #Number of subtitle extracted
91   - logger.info(str(index-1) + " Legendas extraídas com sucesso")
92   - # Clean temporary file
93   - logger.info("Removendo arquivo temporário")
  88 + # Control message indicating the end of subtitles.
  89 + body["control-message"] = "FINALIZE"
  90 + body["pts"] = -1
  91 + body["index"] = index
  92 + logger.info(str(index-1) + " Subtitles extracted successfully")
  93 +
  94 + logger.info("Cleaning temp files")
94 95 os.remove(filename)
95   - # Send the body to the queue
96   - logger.info("Enviando para a fila de extrações")
  96 +
  97 + logger.info("Sending control message to the queue")
97 98 manager.send_to_queue("extractions", body, properties)
98   - print ("Sucess")
  99 + print ("Ok")
99 100  
100 101 def calculate_ms(time_in):
101 102 """
... ... @@ -111,8 +112,8 @@ def calculate_ms(time_in):
111 112 number
112 113 The timestamp in milliseconds.
113 114 """
114   - time = time_in.split(":")
115   - time = time[:2] + time[2].split(",")
  115 + time = time_in.split(':')
  116 + time = time[:2] + time[2].split(',')
116 117 hour = int(time[0]) * 3600000
117 118 minute = int(time[1]) * 60000
118 119 second = int(time[2]) * 1000
... ...
core/mixer.py
... ... @@ -15,19 +15,18 @@ Author: Wesnydy Lima Ribeiro
15 15 E-Mail: wesnydy@lavid.ufpb.br
16 16 """
17 17  
  18 +import json
  19 +import logging
18 20 import os
19 21 import pika
20   -import json
21 22 import PikaManager
22   -import logging #Logging
  23 +import subprocess
23 24  
24   -from subprocess import call, Popen, PIPE
25 25 from thread import start_new_thread
26 26 from time import sleep
27 27 from urllib import urlretrieve
28 28  
29   -PATH_MIXED_VIDEO = os.getenv("VLIBRAS_VIDEO_MIXED")
30   -
  29 +# Logging configuration.
31 30 logger = logging.getLogger('mixer')
32 31 logger.setLevel(logging.DEBUG)
33 32  
... ... @@ -44,8 +43,11 @@ ch.setFormatter(formatter)
44 43 logger.addHandler(fh)
45 44 logger.addHandler(ch)
46 45  
  46 +# Manager of queues connections.
47 47 manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
48 48  
  49 +PATH_MIXED_VIDEO = os.getenv("VLIBRAS_VIDEO_MIXED")
  50 +
49 51 def main_video_height(main_video):
50 52 """
51 53 Extract height information of video.
... ... @@ -60,20 +62,27 @@ def main_video_height(main_video):
60 62 string
61 63 None if failed to extract info. The height if extraction has been successfuly.
62 64 """
63   - logger.info("Extraindo informações de resolução do vídeo original")
  65 + logger.info("Extracting resolution of main video")
64 66 try:
65 67 # Obtains the main video height using ffprobe
66   - pipe = Popen(
67   - ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-print_format',
68   - 'json', '-show_entries', 'stream=height', main_video],
69   - stdout=PIPE, shell=False)
  68 + ffprobe = subprocess.Popen(
  69 + [
  70 + "ffprobe",
  71 + "-loglevel", "error",
  72 + "-select_streams", "v:0",
  73 + "-print_format", "json",
  74 + "-show_entries", 'stream=height',
  75 + main_video
  76 + ],
  77 + stdout=subprocess.PIPE,
  78 + shell=False
  79 + )
70 80 # The results comes in a json
71   - video_height = json.loads(pipe.communicate()[0])
  81 + video_height = json.loads(ffprobe.communicate()[0])
72 82 # Returns the height obtained
73 83 return video_height['streams'][0]['height']
74 84 except OSError as ex:
75   - logger.error("Impossível extrair informações, resolução padrão será utilizada")
76   - # If an error occurs, return empty
  85 + logger.error("Error when extracting resolution, default will be used")
77 86 return None
78 87  
79 88 def secondary_video_heigth(main_height, window_size):
... ... @@ -92,7 +101,7 @@ def secondary_video_heigth(main_height, window_size):
92 101 number
93 102 The height of window.
94 103 """
95   - logger.info("Calculando a resolução da janela de libras")
  104 + logger.info("Calculating the resolution of the libras window")
96 105 # Set the default height of main video if a height is not given
97 106 if main_height is None:
98 107 main_height = 324
... ... @@ -120,7 +129,7 @@ def secondary_video_position(window_position):
120 129 string
121 130 The configurations of position of window.
122 131 """
123   - logger.info("Definindo a posição da janela de libras")
  132 + logger.info("Defining the position of the libras window")
124 133 # Overlap the window at top Left on main video
125 134 if window_position == 'top_left':
126 135 return "10:10"
... ... @@ -146,19 +155,16 @@ def run(ch, method, properties, body):
146 155 Callback method.
147 156 properties : object
148 157 Message containing a set of 14 properties.
149   - body : json object
150   - Informations received from queue.
  158 + body : string
  159 + Json string containing the necessary arguments for workers.
151 160 """
152   - logger.info("Processando a requisição " + properties.correlation_id.encode("utf-8"))
153   - # body it's a json that contains main video, libras video, window size and window position
  161 + logger.info("Processing request " + properties.correlation_id.encode("utf-8"))
154 162 body = json.loads(body)
155 163 try:
156   - # Try to download the main video
157   - logger.info("Obtendo o vídeo original")
  164 + logger.info("Downloading main video")
158 165 main_video = urlretrieve(body["video"].encode("utf-8"))[0]
159 166 except IOError as ex:
160   - logger.error("Falha ao obter o vídeo original")
161   - # Returns if can't download the video
  167 + logger.error("Download of video fail")
162 168 return
163 169 # Get the main video height
164 170 main_height = main_video_height(main_video)
... ... @@ -169,7 +175,7 @@ def run(ch, method, properties, body):
169 175 # Get the window position regarding to the main video
170 176 window_pos = secondary_video_position(body["window_position"].encode("utf-8"))
171 177 # Defines the window movie
172   - movie = 'movie=' + body["libras_video"].encode("utf-8")
  178 + movie = 'movie=' + body["libras-video"].encode("utf-8")
173 179 # Defines the scale of window movie
174 180 scale = 'scale=' + str(window_width) + ':' + str(window_heigth)
175 181 # Defines the overlay position
... ... @@ -180,26 +186,39 @@ def run(ch, method, properties, body):
180 186 mixed_video = os.path.join(PATH_MIXED_VIDEO, properties.correlation_id.encode("utf-8")+".mp4")
181 187 # Mix videos using ffmpeg
182 188 print ("Mixing videos...")
183   - logger.info("Mixando o vídeo original com a janela de libras")
  189 + logger.info("Mixing videos")
184 190 try:
185   - call(
186   - ['ffmpeg', '-v', 'error', '-i', main_video, '-y', '-vf', filter_graph,
187   - '-qscale', '0', '-strict', 'experimental', '-vcodec', 'libx264',
188   - '-preset', 'fast', '-r', '30', '-threads', '8', mixed_video], shell=False)
189   - logger.info("Mixagem bem sucedida")
  191 + subprocess.call(
  192 + [
  193 + "ffmpeg",
  194 + "-loglevel", "error",
  195 + "-i", main_video,
  196 + "-y",
  197 + "-vf", filter_graph,
  198 + "-qscale", "0",
  199 + "-strict", "experimental",
  200 + "-vcodec", "libx264",
  201 + "-preset", "fast",
  202 + "-r", "30",
  203 + "-threads", "4",
  204 + mixed_video
  205 + ],
  206 + shell=False
  207 + )
  208 + logger.info("Mixing successfuly")
190 209 except OSError as ex:
191   - logger.error("Mixagem mal sucedida")
  210 + logger.error("Mixing fail")
192 211 print ("Error")
193   - # Inserts the mixed video into the body
  212 + # Add mixed video to the body
194 213 body["mixed_video"] = mixed_video
195   - # Clean temporary files
196   - logger.info("Removendo arquivos temporários")
  214 +
  215 + logger.info("Cleaning temp files")
197 216 os.remove(main_video)
198   - os.remove(body["libras_video"])
199   - # Send the body to the queue
200   - logger.info("Enviando para a fila de videos")
  217 + os.remove(body["libras-video"])
  218 +
  219 + logger.info("Sending mixed video to the videos queue")
201 220 manager.send_to_queue("videos", body, properties)
202   - print ("Success")
  221 + print ("Ok")
203 222  
204 223 def keep_alive(conn_send, conn_receive):
205 224 while True:
... ...
core/renderer.py
1   -#!/usr/bin/python
  1 +#!/usr/bin/env python
2 2 # -*- coding: utf-8 -*-
3 3  
4   -"""
5   -Author: Caio Marcelo Campoy Guedes
6   -E-Mail: caiomcg@gmail.com
7   -
8   -Author: Erickson Silva
9   -E-Mail: erickson.silva@lavid.ufpb.br
10   -
11   -Author: Jorismar Barbosa
12   -E-Mail: jorismar.barbosa@lavid.ufpb.br
13   -
14   -Author: Wesnydy Lima Ribeiro
15   -E-Mail: wesnydy@lavid.ufpb.br
16   -"""
17   -
  4 +import json
  5 +import logging
18 6 import os
19   -import sys
20 7 import pika
21   -import pysrt
  8 +import PikaManager
  9 +import signal
22 10 import socket
23   -import json
24 11 import subprocess
25   -import threading
26   -import signal
27   -from time import sleep
28   -from thread import start_new_thread
29   -from pyvirtualdisplay import Display
  12 +
30 13 from operator import itemgetter
31   -from shutil import rmtree, move
32   -import logging #Logging
  14 +from pyvirtualdisplay import Display
  15 +from thread import start_new_thread
  16 +from time import sleep
33 17  
  18 +# Logging configuration.
34 19 logger = logging.getLogger('renderer')
35 20 logger.setLevel(logging.DEBUG)
36 21  
37   -fh = logging.FileHandler('renderer.log')
  22 +fh = logging.FileHandler('../log/renderer.log')
38 23 fh.setLevel(logging.DEBUG)
39 24  
40 25 ch = logging.StreamHandler()
... ... @@ -47,206 +32,202 @@ ch.setFormatter(formatter)
47 32 logger.addHandler(fh)
48 33 logger.addHandler(ch)
49 34  
50   -credentials = pika.PlainCredentials('test', 'test')
51   -conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials))
52   -conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='150.165.205.10', credentials=credentials))
53   -
54   -#conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
55   -#conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
56   -
57   -running = False
58   -gl_id = None
59   -contents = []
60   -correlation_id = None
61   -ffmpeg = None
62   -display = None
  35 +# Manager of queues connections.
  36 +manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
63 37  
64 38 TCP_IP = '0.0.0.0'
65 39 TCP_PORT = 5555
66 40  
67   -PATH_SCREENS="/home/caiomcg/.config/unity3d/LAViD/VLibrasVideoMaker/"
68   -PATH_LIBRAS="/home/caiomcg/libras/"
69   -PATH_VIDEO="/home/caiomcg/videos/"
70   -VIDEO_CREATOR="/home/caiomcg/unityVideo/videoCreator.x86_64"
71   -
72   -def run(ch, method, properties, body):
73   - logger.info("Processando a requisição " + properties.correlation_id.encode("utf-8"))
74   - global running, correlation_id
75   - body = json.loads(body)
76   - print body
77   - if running:
78   - print "Running..."
79   - if properties.correlation_id.encode("utf-8") == correlation_id:
80   - try:
81   - if body["control-message".decode("utf-8")] == "FINALIZE".decode("utf-8"):
82   - size = body["index"]
83   -
84   - if len(contents) == size - 1:
85   - contents.append(body)
86   - ch.basic_ack(delivery_tag = method.delivery_tag)
87   - make_video(correlation_id)
88   - body['libras_video'] = os.path.join(PATH_LIBRAS, correlation_id + ".mp4")
89   - running = False
90   - correlation_id = ""
91   - send_to_queue(body, properties)
92   - else:
93   - ch.basic_reject(delivery_tag=method.delivery_tag)
94   - except KeyError:
95   - contents.append(body)
96   - ch.basic_ack(delivery_tag = method.delivery_tag)
97   - else:
98   - ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
99   - else:
100   - print body["index"]
101   - if "index" in body and body["index"] == 1:
102   - print "index"
103   - running = True
104   - correlation_id = properties.correlation_id.encode("utf-8")
105   - contents.append(body)
106   - ch.basic_ack(delivery_tag = method.delivery_tag)
107   - elif "type" in body and body["type"] == "text":
108   - print "Body=Text..."
109   - body['pts'] = -1
110   - body['index'] = 0
111   - contents.append(body)
112   - message = {'control-message': "FINALIZE", 'pts': -1, 'index': 1}
113   - contents.append(message)
114   - ch.basic_ack(delivery_tag = method.delivery_tag)
115   - make_video(properties.correlation_id.encode("utf-8"))
116   - path_libras = os.path.join(PATH_LIBRAS, properties.correlation_id.encode("utf-8")+".mp4")
117   - path_video = os.path.join(PATH_VIDEO, properties.correlation_id.encode("utf-8")+".mp4")
118   - move(path_libras, path_video)
119   - body['libras_video'] = path_video
120   - send_to_queue(body, properties)
121   - else:
122   - print "basic"
123   - ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
124   -
125   -def make_video(id):
126   - logger.info("Gerando o vídeo de libras")
127   - print "Making video..."
128   - global gl_id
129   - gl_id = id
130   - start_new_thread(send_to_player, ())
131   - #threading.Thread(target=send_to_player, args=(id,)).start()
132   - capture(id)
133   - #render(id)
134   - clean(id)
135   -
136   -def send_to_player():
137   - logger.info("Enviando glosa para o player")
138   - global gl_id
139   - socket = open_socket()
140   - contents_sorted = sorted(contents, key=itemgetter('index'))
141   - for message in contents_sorted:
142   - try:
143   - socket.send(message["gloss"].encode('utf-8')+"#"+str(message["pts"]))
144   - except KeyError:
145   - logger.error("Impossível enviar glosa. Mensagem de controle será enviada para o servidor")
146   - socket.send(message["control-message"].encode('utf-8')+"#"+str(message["pts"]))
147   - render(gl_id)
148   - sleep(1)
149   - socket.close()
150   - del contents[:]
151   -
152   -def open_socket():
153   - logger.info("Abrindo conexão via socket")
154   - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
155   - while True:
156   - try:
157   - s.connect((TCP_IP, TCP_PORT))
158   - print "Connected..."
159   - break
160   - except:
161   - sleep(2)
162   - return s
163   -
164   -def render(id):
165   - global ffmpeg, display
166   -
167   - print "Rendering..."
168   - logger.info("Renderizando o vídeo de libras")
169   - ffmpeg = subprocess.Popen(
170   - [
171   - "ffmpeg",
172   - "-y",
173   - "-loglevel", "quiet",
174   - "-video_size", "800x600",
175   - "-r", "30",
176   - "-f", "x11grab",
177   - "-draw_mouse", "0",
178   - "-i", str(display.cmd_param[-1]) + ".0+nomouse",
179   - "-vcodec", "libx264",
180   - "-pix_fmt", "yuv420p",
181   - "-an",
182   - PATH_LIBRAS + id + ".mp4"
183   - ],
184   - shell=False
185   - )
186   -
187   - print "OK"
188   -
189   -def capture(id):
190   - print "Capture..."
191   - logger.info("Capturando informações do display")
192   - global ffmpeg, display
193   - display = Display(visible=0, size=(800, 600))
194   - display.start()
195   - subprocess.call([VIDEO_CREATOR, id, "0", "30", "20", "25", "-screen-fullscreen", "1", "-screen-quality", "Fantastic", "-force-opengl"], shell=False)
196   - ffmpeg.send_signal(signal.SIGQUIT)
197   - ffmpeg.communicate()
198   - display.stop()
199   - print "OK"
  41 +PATH_LIBRAS = os.getenv("VLIBRAS_VIDEO_LIBRAS")
  42 +VIDEO_CREATOR = os.getenv("VLIBRAS_VIDEO_CREATOR")
200 43  
201   -def clean(id):
202   - logger.info("Removendo arquivos temporários")
203   - path = os.path.join(PATH_SCREENS, id)
204   - rmtree(path, ignore_errors=True)
205   -
206   -def send_to_queue(body, props):
207   - try:
208   - channel = conn_send.channel()
209   - except KeyError:
210   - reload_connection_send()
211   - channel = conn_send.channel()
212   - queue = "libras" if body["type"].encode("UTF-8") == "video" else "videos"
213   - channel.basic_publish(exchange='',
214   - routing_key=queue,
215   - properties=pika.BasicProperties(correlation_id = props.correlation_id),
216   - body=json.dumps(body))
217   - channel.close()
218   -
219   -def receive_from_queue():
220   - try:
221   - channel = conn_receive.channel()
222   - except KeyError:
223   - reload_connection_receive()
224   - channel = conn_receive.channel()
225   - queue = "translations"
226   - channel.basic_qos(prefetch_count=1)
227   - channel.basic_consume(run,
228   - queue=queue)
229   - channel.start_consuming()
230   - channel.close()
  44 +# Status of renderer to process new requests. Answer one request at a time.
  45 +worker_available = True
  46 +# Identification to indicate the request being processed.
  47 +correlation_id = None
  48 +# Array that stores gloss and pts in json format to be sent to videoCreator.
  49 +gloss_buffer = []
  50 +# pyvirtualdisplay instance
  51 +display = None
  52 +# ffmpeg process instance
  53 +ffmpeg = None
231 54  
232   -def reload_connection_send():
233   - global conn_send
234   - try:
235   - conn_send.close()
236   - except:
237   - pass
238   - conn_send = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
  55 +def start_video_creator(id):
  56 + """
  57 + Start video creator server.
  58 +
  59 + Parameters
  60 + ----------
  61 + id : string
  62 + Identification of request.
  63 + """
  64 + global display, ffmpeg
  65 + logger.info("Starting video creator server")
  66 + display = Display(visible=0, size=(800,600))
  67 + display.start()
  68 + subprocess.call(
  69 + [
  70 + VIDEO_CREATOR,
  71 + id,
  72 + "0",
  73 + "30",
  74 + "20",
  75 + "25",
  76 + "-screen-fullscreen", "1",
  77 + "-screen-quality", "Fantastic",
  78 + "-force-opengl"
  79 + ],
  80 + shell=False
  81 + )
  82 + ffmpeg.send_signal(signal.SIGQUIT)
  83 + ffmpeg.communicate()
  84 + display.stop()
  85 +
  86 +def start_ffmpeg(id):
  87 + """
  88 + Start FFmpeg to capture the video creator display.
  89 +
  90 + Parameters
  91 + ----------
  92 + id : string
  93 + Identification of request.
  94 + """
  95 + global ffmpeg, display
  96 + logger.info("Starting ffmpeg")
  97 + libras_video = os.path.join(PATH_LIBRAS, id + ".mp4")
  98 + ffmpeg = subprocess.Popen(
  99 + [
  100 + "ffmpeg",
  101 + "-y",
  102 + "-loglevel", "quiet",
  103 + "-video_size", "800x600",
  104 + "-r", "30",
  105 + "-f", "x11grab",
  106 + "-draw_mouse", "0",
  107 + "-i", str(display.cmd_param[-1]) + ".0+nomouse",
  108 + "-vcodec", "libx264",
  109 + "-pix_fmt", "yuv420p",
  110 + "-an",
  111 + libras_video
  112 + ],
  113 + shell=False
  114 + )
  115 +
  116 +def open_socket_connection():
  117 + """
  118 + Create a new socket TCP connection with video creator server.
  119 +
  120 + Returns
  121 + -------
  122 + socket object
  123 + Connection with video creator server.
  124 + """
  125 + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  126 + logger.info("Opening connection with video creator")
  127 + while True:
  128 + try:
  129 + s.connect((TCP_IP, TCP_PORT))
  130 + break
  131 + except:
  132 + sleep(2)
  133 + return s
  134 +
  135 +def send_to_video_creator(id):
  136 + # Stablishes connection with video creator server.
  137 + socket = open_socket_connection()
  138 + # Sort buffer to restore the original sequence.
  139 + sorted_buffer = sorted(gloss_buffer, key=itemgetter("index"))
  140 + logger.info("Sending gloss to video creator")
  141 + for content in sorted_buffer:
  142 + try:
  143 + # Send gloss to video creator.
  144 + socket.send(content["gloss"].encode("utf-8")+"#"+str(content["pts"]))
  145 + except KeyError:
  146 + logger.info("Sending control message to video creator")
  147 + socket.send(content["control-message"].encode("utf-8")+"#"+str(content["pts"]))
  148 + # Start ffmpeg to capture the video creator display.
  149 + logger.info("Rendering video")
  150 + start_ffmpeg(id)
  151 + # sleep for 500 milliseconds
  152 + sleep(.500)
  153 + socket.close()
  154 + del gloss_buffer[:]
239 155  
240   -def reload_connection_receive():
241   - global conn_receive
242   - try:
243   - conn_receive.close()
244   - except:
245   - pass
246   - conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host='localhost',heartbeat_interval=0))
  156 +def run(ch, method, properties, body):
  157 + """
  158 + Execute the worker.
  159 +
  160 + Parameters
  161 + ----------
  162 + ch : object
  163 + Channel of communication.
  164 + method : function
  165 + Callback method.
  166 + properties : object
  167 + Message containing a set of 14 properties.
  168 + body : string
  169 + Json string containing the necessary arguments for workers.
  170 + """
  171 + global worker_available, correlation_id
  172 + body = json.loads(body)
  173 + # Check if worker is available to process a new request.
  174 + if worker_available:
  175 + logger.info("Processing request " + properties.correlation_id.encode("utf-8"))
  176 + # Accept only messages with index equals to 1.
  177 + try:
  178 + if body["index"] == 1:
  179 + # Change the status of renderer to occupied.
  180 + worker_available = False
  181 + # Stores the id of request in process.
  182 + correlation_id = properties.correlation_id.encode("utf-8")
  183 + # Stores the first gloss in the buffer.
  184 + gloss_buffer.append(body)
  185 + else:
  186 + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
  187 + except KeyError:
  188 + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
  189 + # Else the worker is alread processing a request.
  190 + else:
  191 + # Check if the id of message match with the id of request being processed.
  192 + if properties.correlation_id.encode("utf-8") == correlation_id:
  193 + # Check if the body contains the control-message.
  194 + try:
  195 + if body["control-message"] == "FINALIZE":
  196 + # Get the total number of gloss of the current request.
  197 + total = body["index"] # Index of "FINALIZE" is the total number of gloss.
  198 + # Check if the buffer contains the correct number of gloss.
  199 + if len(gloss_buffer) == total - 1:
  200 + gloss_buffer.append(body)
  201 + logger.info("Preparing to generate the video")
  202 + start_new_thread(send_to_video_creator, (correlation_id,))
  203 + start_video_creator(correlation_id)
  204 + # Add path of libras video on body.
  205 + body["libras-video"] = os.path.join(PATH_LIBRAS, correlation_id + ".mp4")
  206 + worker_available = True
  207 + correlation_id = None
  208 + logger.info("Sending libras video to the translations queue")
  209 + manager.send_to_queue("libras", body, properties)
  210 + print ("OK")
  211 + else:
  212 + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
  213 + except KeyError:
  214 + # Control message doesn't exist, continues to store gloss.
  215 + gloss_buffer.append(body)
  216 + else:
  217 + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
247 218  
248 219 def keep_alive(conn_send, conn_receive):
249   - while True:
  220 + """
  221 + Keep the connection alive.
  222 +
  223 + Parameters
  224 + ----------
  225 + conn_send : object
  226 + Connection of writer.
  227 + conn_receive : object
  228 + Connection of receiver.
  229 + """
  230 + while True:
250 231 sleep(30)
251 232 try:
252 233 conn_send.process_data_events()
... ... @@ -254,13 +235,12 @@ def keep_alive(conn_send, conn_receive):
254 235 except:
255 236 continue
256 237  
257   -start_new_thread(keep_alive, (conn_send, conn_receive))
  238 +start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive()))
  239 +
  240 +print("Renderer listening...")
258 241 while True:
259 242 try:
260   - receive_from_queue()
  243 + manager.receive_from_queue("translations", run)
261 244 except KeyboardInterrupt:
262   - conn_send.close()
263   - conn_receive.close()
  245 + manager.close_connections()
264 246 os._exit(0)
265   - except:
266   - continue
... ...
core/translator.py
... ... @@ -15,32 +15,34 @@ Author: Wesnydy Lima Ribeiro
15 15 E-Mail: wesnydy@lavid.ufpb.br
16 16 """
17 17  
  18 +import json
  19 +import logging
18 20 import os
19 21 import pika
20   -import json
21 22 import PikaManager
22   -import logging #Logging
23 23  
24 24 from PortGlosa import traduzir
25 25 from thread import start_new_thread
26 26 from time import sleep
27 27  
28   -logger = logging.getLogger('translator')
  28 +# Logging configuration.
  29 +logger = logging.getLogger("translator")
29 30 logger.setLevel(logging.DEBUG)
30 31  
31   -fh = logging.FileHandler('../log/translator.log')
  32 +fh = logging.FileHandler("../log/translator.log")
32 33 fh.setLevel(logging.DEBUG)
33 34  
34 35 ch = logging.StreamHandler()
35 36 ch.setLevel(logging.ERROR)
36 37  
37   -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  38 +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
38 39 fh.setFormatter(formatter)
39 40 ch.setFormatter(formatter)
40 41  
41 42 logger.addHandler(fh)
42 43 logger.addHandler(ch)
43 44  
  45 +# Manager of queues connections.
44 46 manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
45 47  
46 48 def run(ch, method, properties, body):
... ... @@ -55,25 +57,24 @@ def run(ch, method, properties, body):
55 57 Callback method.
56 58 properties : object
57 59 Message containing a set of 14 properties.
58   - body : json object
59   - Informations received from queue.
  60 + body : string
  61 + Json string containing the necessary arguments for workers.
60 62 """
61   - # body it's a json that contains text to be translating
62 63 body = json.loads(body)
63 64 print ("Translating...")
64   - # Initialize the translation of text
65 65 try:
66   - logger.info("Traduzindo: "+body["text"]+" id: "+properties.correlation_id.encode("utf-8"))
  66 + logger.info("Translating: "+body["text"]+" id: "+properties.correlation_id.encode("utf-8"))
67 67 gloss = traduzir(body["text"].encode("utf-8"))
  68 + # Add gloss key with glosa content on the body.
68 69 body["gloss"] = gloss
  70 + # Remove text translated.
69 71 del body["text"]
70 72 except KeyError:
71   - logger.info("Não existe texto para traduzir")
72 73 pass
73   - # Send the body to the queue
74   - logger.info("Enviando glosa para a fila de traduções")
  74 +
  75 + logger.info("Sending gloss to the translations queue")
75 76 manager.send_to_queue("translations", body, properties)
76   - print ("Success")
  77 + print ("Ok")
77 78  
78 79 def keep_alive(conn_send, conn_receive):
79 80 """
... ...