extractor.py
3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com
Author: Erickson Silva
E-Mail: erickson.silva@lavid.ufpb.br
Author: Jorismar Barbosa
E-Mail: jorismar.barbosa@lavid.ufpb.br
Author: Wesnydy Lima Ribeiro
E-Mail: wesnydy@lavid.ufpb.br
"""
import json
import logging
import os
import pika
import PikaManager
import pysrt
from thread import start_new_thread
from time import sleep
from urllib import urlretrieve
# Logging configuration.
logger = logging.getLogger("extractor")
logger.setLevel(logging.DEBUG)
fh = logging.FileHandler("/home/vlibras/log/extractor.log")
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
# Manager of queues connections.
#manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
manager = PikaManager.PikaManager("rabbit")
def run(ch, method, properties, body):
"""
Execute the worker.
Parameters
----------
ch : object
Channel of communication.
method : function
Callback method.
properties : object
Message containing a set of 14 properties.
body : string
Json string containing the necessary arguments for workers.
"""
logger.info("processing request " + properties.correlation_id.encode("utf-8"))
body = json.loads(body)
try:
logger.info("Downloading subtitle")
filename = urlretrieve(body["subtitle"].encode("utf-8"))[0]
except IOError, e:
logger.error("Download of subtitle fail")
return
try:
# Tries to open file with utf-8 encoding.
subtitle = pysrt.open(filename)
except UnicodeDecodeError:
# Tries to open file with iso-8859-1 encoding if utf-8 encoding fails.
subtitle = pysrt.open(filename, encoding="iso-8859-1")
index = 1
print ("Extracting...")
logger.info("Extracting subtitles from file")
for sub in subtitle:
pts = calculate_ms(str(sub.start))
message = {"text": sub.text.encode("utf-8"), "pts": pts, "index": index}
manager.send_to_queue("extractions", message, properties)
index += 1
# Control message indicating the end of subtitles.
body["control-message"] = "FINALIZE"
body["pts"] = -1
body["index"] = index
logger.info(str(index-1) + " Subtitles extracted successfully")
logger.info("Cleaning temp files")
os.remove(filename)
logger.info("Sending control message to the queue")
manager.send_to_queue("extractions", body, properties)
print ("Ok")
def calculate_ms(time_in):
"""
Calculates timestamp in milliseconds.
Parameters
----------
time_in : string
Time in of timestamp.
Returns
-------
number
The timestamp in milliseconds.
"""
time = time_in.split(':')
time = time[:2] + time[2].split(',')
hour = int(time[0]) * 3600000
minute = int(time[1]) * 60000
second = int(time[2]) * 1000
millisec = int(time[3]) + second + minute + hour
return millisec
def keep_alive(conn_send, conn_receive):
"""
Keep the connection alive.
Parameters
----------
conn_send : object
Connection of writer.
conn_receive : object
Connection of receiver.
"""
while True:
sleep(30)
try:
conn_send.process_data_events()
conn_receive.process_data_events()
except:
continue
start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive()))
print("Extractor listening...")
while True:
try:
manager.receive_from_queue("requests", run)
except KeyboardInterrupt:
manager.close_connections()
os._exit(0)