From 5ddca534b66a44709261acc6832f4c5894d8f248 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 8 Feb 2017 18:34:01 +0000 Subject: [PATCH] Atualização das informações de log. Atualização dos diretórios. Atualização das variaveis de ambiente. Adição do docker-compose. --- Dockerfile | 17 +++++++++-------- core/PikaManager.py | 257 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------------------------------------------------------------------- core/extractor.py | 7 ++++--- core/mixer.py | 14 +++++++++++--- core/processManager.py | 10 +++++----- core/renderer.py | 20 +++++++++++++++----- core/translator.py | 8 +++++--- 7 files changed, 214 insertions(+), 119 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9748846..0e087d0 100755 --- a/Dockerfile +++ b/Dockerfile @@ -76,6 +76,7 @@ COPY ./core/renderer.py /root/renderer.py COPY ./core/translator.py /root/translator.py COPY ./core/PikaManager.py /root/PikaManager.py COPY ./core/processManager.py /root/processManager.py +COPY ./log/ /root/log/ COPY ./unityVideo/ /root/unityVideo/ COPY ./vlibras-translate/ /root/vlibras-translate/ COPY ./vlibras-libs/ /root/vlibras-libs/ @@ -83,8 +84,9 @@ COPY ./vlibras-api/ /root/vlibras-api/ #Workers environment variables ENV VLIBRAS_VIDEO_CREATOR="/root/unityVideo/videoCreator.x86_64" -ENV VLIBRAS_VIDEO_LIBRAS="/root/storage/libras" -ENV VLIBRAS_VIDEO_MIXED="/root/storage/videos" +ENV VLIBRAS_VIDEO_LIBRAS="/storage/libras" +ENV VLIBRAS_VIDEO_MIXED="/storage/videos" +ENV VLIBRAS_VIDEO_SCREENS="/storage/frames" #Translator environment variables ENV HUNPOS_TAGGER="/root/vlibras-libs/aelius/bin/hunpos-tag" @@ -93,15 +95,14 @@ ENV TRANSLATE_DATA="/root/vlibras-translate/data" ENV NLTK_DATA="/root/vlibras-libs/aelius/nltk_data" ENV PYTHONPATH=":/root/vlibras-libs/aelius:/root/vlibras-translate/src:/root/vlibras-libs/aelius:/root/vlibras-translate/src" + #ENV NODE_PATH=/root/vlibras-api/node_modules/;%NODE_PATH% #Portas de comunicacao do container -EXPOSE 27017 -EXPOSE 5672 -EXPOSE 80:80 -#EXPOSE 15672 WORKDIR /root/vlibras-api/ -VOLUME ["/root/volume/", "/storage/"] + +VOLUME ["/storage/"] + RUN npm cache clean RUN rm -rf node_modules RUN npm i @@ -112,4 +113,4 @@ RUN node --version RUN /usr/bin/mongod & #Comando de entrada quando inicializado -ENTRYPOINT (/usr/bin/mongod & redis-server & rabbitmq-server start & sleep 3m) ; (node server.js & sleep 10) ; python /root/extractor.py & python /root/translator.py & python /root/renderer.py & python /root/mixer.py +ENTRYPOINT (/usr/bin/mongod & redis-server & rabbitmq-server start & sleep 3m) ; (node /root/vlibras-api/server.js & sleep 10) ; python /root/processManager.py diff --git a/core/PikaManager.py b/core/PikaManager.py index 17415ee..182bf98 100644 --- a/core/PikaManager.py +++ b/core/PikaManager.py @@ -1,13 +1,15 @@ - """ Author: Caio Marcelo Campoy Guedes E-Mail: caiomcg@gmail.com -Author: Caio Marcelo Campoy Guedes -E-Mail: caiomcg@gmail.com +Author: Erickson Silva +E-Mail: erickson.silva@lavid.ufpb.br -Author: Caio Marcelo Campoy Guedes -E-Mail: caiomcg@gmail.com +Author: Jorismar Barbosa +E-Mail: jorismar.barbosa@lavid.ufpb.br + +Author: Wesnydy Lima Ribeiro +E-Mail: wesnydy@lavid.ufpb.br """ import pika @@ -15,90 +17,161 @@ import json class PikaManager: - def __init__(self, ip): - """ - Initialize the class without - - Parameters - ---------- - ip : string - The server IP. - """ - self.server_ip = ip - self.MAX_ERR_ATTEMPT = 3 - - def __init__(self, ip, username, password): - self.server_ip = ip - self.MAX_ERR_ATTEMPT = 3 - self.add_credentials(username, password) - self.add_blockConnection() - - def add_credentials(self, username, password): - self.credentials = pika.PlainCredentials(username, password) - - def add_blockConnection(self): - self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) - self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) - - def _reload_connection(self, connection): - try: - connection.close() - except: - pass #HANDLE - connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ? - - def _setup_channel(self, connection): - """ - Atempt to create a connection. - - Parameters - ---------- - connection : Object - Connection to setup. - Returns - ------- - Object - None if cailed to connect. The object if successfuly connected. - """ - attempts = 0 - - while attempts < self.MAX_ERR_ATTEMPT: - try: - channel = connection.channel() - return channel - except: - self._reload_connection(connection) - attempts += 1 - print("Send Error: Attempt(" + str(attempts) + ")") - - if attempts == self.MAX_ERR_ATTEMPT: - return None - - def send_to_queue(self, queue_name, body, props): - channel = self._setup_channel(self.conn_send) - if channel != None: - channel.queue_declare(queue = queue_name) - channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body)) - channel.close() - - def receive_from_queue(self, queue_name, callback): - channel = self._setup_channel(self.conn_receive) - if channel != None: - channel.queue_declare(queue = queue_name) - channel.basic_qos(prefetch_count = 1) - channel.basic_consume(callback, queue = queue_name, no_ack = True) - channel.start_consuming() - channel.close() - - def get_conn_send(self): - return self.conn_send - - def get_conn_receive(self): - return self.conn_receive - - def close_connections(self): - try: - self.conn_receive.close() - self.conn_send.close() - except: - pass + def __init__(self, ip, username=None, password=None): + """ + Initialize the class with credentials. + + Parameters + ---------- + ip : string + The server IP. + username : string + The user login. + password : string + The user password. + """ + self.server_ip = ip + self.MAX_ERR_ATTEMPT = 3 + + if not all((username, password)): + self.add_localBlockConnection() + else: + self.add_credentials(username, password) + self.add_blockConnection() + + def add_credentials(self, username, password): + """ + Add user credentials. + + Parameters + ---------- + username : string + The user login. + password : string + The user password. + """ + self.credentials = pika.PlainCredentials(username, password) + + def add_localBlockConnection(self): + """ + Create the blocking connection object. + """ + self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) + self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) + + def add_blockConnection(self): + """ + Create the blocking connection object. Credentials are used. + """ + self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) + self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) + + def _reload_connection(self, connection): + """ + Reload a specific connection. + + Parameters + ---------- + connection : Object + The connection to be reloaded. + """ + try: + connection.close() + except: + pass + connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ? + + def _setup_channel(self, connection): + """ + Atempt to create a connection. + + Parameters + ---------- + connection : Object + Connection to setup. + Returns + ------- + Object + None if cailed to connect. The object if successfuly connected. + """ + attempts = 0 + + while attempts < self.MAX_ERR_ATTEMPT: + try: + channel = connection.channel() + return channel + except: + self._reload_connection(connection) + attempts += 1 + print("Send Error: Attempt(" + str(attempts) + ")") + + if attempts == self.MAX_ERR_ATTEMPT: + return None + + def send_to_queue(self, queue_name, body, props): + """ + Send a message to the queue. + + Parameters + ---------- + queue_name : string + Queue that receives the message. + body : string + The message to be sent. + props : Object + Object containing a set of 14 properties. + """ + channel = self._setup_channel(self.conn_send) + if channel != None: + channel.queue_declare(queue = queue_name) + channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body)) + channel.close() + + def receive_from_queue(self, queue_name, callback): + """ + Receive a message from the queue. + + Parameters + ---------- + queue_name : string + Queue where the message will be received. + callback : function + Function that process the message. + """ + channel = self._setup_channel(self.conn_receive) + if channel != None: + channel.queue_declare(queue = queue_name) + channel.basic_qos(prefetch_count = 1) + channel.basic_consume(callback, queue = queue_name, no_ack = True) + channel.start_consuming() + channel.close() + + def get_conn_send(self): + """ + Get the send connection. + + Returns + ------- + The send connection. + """ + return self.conn_send + + def get_conn_receive(self): + """ + Get the receive connection. + + Returns + ------- + The receive connection. + """ + return self.conn_receive + + def close_connections(self): + """ + Close all connections. + """ + try: + self.conn_receive.close() + self.conn_send.close() + except: + pass diff --git a/core/extractor.py b/core/extractor.py index 9a7201b..d4854b6 100755 --- a/core/extractor.py +++ b/core/extractor.py @@ -30,11 +30,11 @@ from urllib import urlretrieve logger = logging.getLogger("extractor") logger.setLevel(logging.DEBUG) -fh = logging.FileHandler("../log/extractor.log") +fh = logging.FileHandler("/root/log/extractor.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() -ch.setLevel(logging.ERROR) +ch.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") fh.setFormatter(formatter) @@ -44,7 +44,8 @@ logger.addHandler(fh) logger.addHandler(ch) # Manager of queues connections. -manager = PikaManager.PikaManager("150.165.205.10", "test", "test") +#manager = PikaManager.PikaManager("150.165.205.10", "test", "test") +manager = PikaManager.PikaManager("localhost") def run(ch, method, properties, body): """ diff --git a/core/mixer.py b/core/mixer.py index 361271a..05deecd 100755 --- a/core/mixer.py +++ b/core/mixer.py @@ -26,15 +26,19 @@ from thread import start_new_thread from time import sleep from urllib import urlretrieve +def make_dir_if_exists(path): + if not os.path.exists(path): + os.makedirs(path) + # Logging configuration. logger = logging.getLogger('mixer') logger.setLevel(logging.DEBUG) -fh = logging.FileHandler('../log/mixer.log') +fh = logging.FileHandler('/root/log/mixer.log') fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() -ch.setLevel(logging.ERROR) +ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) @@ -44,9 +48,13 @@ logger.addHandler(fh) logger.addHandler(ch) # Manager of queues connections. -manager = PikaManager.PikaManager("150.165.205.10", "test", "test") +#manager = PikaManager.PikaManager("150.165.205.10", "test", "test") + +manager = PikaManager.PikaManager("localhost") PATH_MIXED_VIDEO = os.getenv("VLIBRAS_VIDEO_MIXED") +#Create Path if Needed +make_dir_if_exists(PATH_MIXED_VIDEO) def main_video_height(main_video): """ diff --git a/core/processManager.py b/core/processManager.py index ab05c1a..c3baec1 100755 --- a/core/processManager.py +++ b/core/processManager.py @@ -15,7 +15,7 @@ def signalHandler(signal, frame): def spawnRenderer(): while KEEP_RUNNING: - proc = subprocess.Popen(["./renderer.py"], shell=True, stdout=subprocess.PIPE) + proc = subprocess.Popen(["/root/renderer.py"], shell=True, stdout=subprocess.PIPE) print("Process RENDERER PID: " + str(proc.pid)) try: stdoutdata, stderrdata = proc.communicate() @@ -25,7 +25,7 @@ def spawnRenderer(): def spawnMixer(): while KEEP_RUNNING: - proc = subprocess.Popen(["./mixer.py"], shell=True, stdout=subprocess.PIPE) + proc = subprocess.Popen(["/root/mixer.py"], shell=True, stdout=subprocess.PIPE) print("Process MIXER PID: " + str(proc.pid)) try: stdoutdata, stderrdata = proc.communicate() @@ -35,7 +35,7 @@ def spawnMixer(): def spawnTranslator(): while KEEP_RUNNING: - proc = subprocess.Popen(["./translator.py"], shell=True, stdout=subprocess.PIPE) + proc = subprocess.Popen(["/root/translator.py"], shell=True, stdout=subprocess.PIPE) print("Process TRANSLATOR PID: " + str(proc.pid)) try: stdoutdata, stderrdata = proc.communicate() @@ -45,7 +45,7 @@ def spawnTranslator(): def spawnExtractor(): while KEEP_RUNNING: - proc = subprocess.Popen(["./extractor.py"], shell=True, stdout=subprocess.PIPE) + proc = subprocess.Popen(["/root/extractor.py"], shell=True, stdout=subprocess.PIPE) print("Process EXTRACTOR PID: " + str(proc.pid)) try: stdoutdata, stderrdata = proc.communicate() @@ -67,4 +67,4 @@ if __name__ == "__main__": t4 = Thread(target=spawnMixer, args=()) t4.start() - #thread.start_new_thread(spawnWorker, (worker,)) \ No newline at end of file + #thread.start_new_thread(spawnWorker, (worker,)) diff --git a/core/renderer.py b/core/renderer.py index df16bc7..650d6ca 100755 --- a/core/renderer.py +++ b/core/renderer.py @@ -18,15 +18,20 @@ from time import sleep #Temporary from shutil import rmtree +#Temporary +def make_dir_if_exists(path): + if not os.path.exists(path): + os.makedirs(path) + # Logging configuration. logger = logging.getLogger('renderer') logger.setLevel(logging.DEBUG) -fh = logging.FileHandler('../log/renderer.log') +fh = logging.FileHandler('/root/log/renderer.log') fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() -ch.setLevel(logging.ERROR) +ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) @@ -36,16 +41,21 @@ logger.addHandler(fh) logger.addHandler(ch) # Manager of queues connections. -manager = PikaManager.PikaManager("150.165.205.10", "test", "test") +#manager = PikaManager.PikaManager("150.165.205.10", "test", "test") + +manager = PikaManager.PikaManager("localhost") TCP_IP = '0.0.0.0' TCP_PORT = 5555 PATH_LIBRAS = os.getenv("VLIBRAS_VIDEO_LIBRAS") VIDEO_CREATOR = os.getenv("VLIBRAS_VIDEO_CREATOR") +PATH_SCREENS = os.getenv("VLIBRAS_VIDEO_SCREENS") -#Temporary -PATH_SCREENS = "/storage/frames/" +#Create Paths if needed +make_dir_if_exists(PATH_LIBRAS) +make_dir_if_exists(VIDEO_CREATOR) +make_dir_if_exists(PATH_SCREENS) # Status of renderer to process new requests. Answer one request at a time. worker_available = True diff --git a/core/translator.py b/core/translator.py index 9064e84..115e76a 100755 --- a/core/translator.py +++ b/core/translator.py @@ -29,11 +29,11 @@ from time import sleep logger = logging.getLogger("translator") logger.setLevel(logging.DEBUG) -fh = logging.FileHandler("../log/translator.log") +fh = logging.FileHandler("/root/log/translator.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() -ch.setLevel(logging.ERROR) +ch.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") fh.setFormatter(formatter) @@ -43,7 +43,9 @@ logger.addHandler(fh) logger.addHandler(ch) # Manager of queues connections. -manager = PikaManager.PikaManager("150.165.205.10", "test", "test") +#manager = PikaManager.PikaManager("150.165.205.10", "test", "test") + +manager = PikaManager.PikaManager("localhost") def run(ch, method, properties, body): """ -- libgit2 0.21.2