diff --git a/core/PikaManager.py b/core/PikaManager.py new file mode 100644 index 0000000..0cd442a --- /dev/null +++ b/core/PikaManager.py @@ -0,0 +1,179 @@ + +""" +Author: Caio Marcelo Campoy Guedes +E-Mail: caiomcg@gmail.com + +Author: Erickson Silva +E-Mail: erickson.silva@lavid.ufpb.br + +Author: Jorismar Barbosa +E-Mail: jorismar.barbosa@lavid.ufpb.br + +Author: Wesnydy Lima Ribeiro +E-Mail: wesnydy@lavid.ufpb.br +""" + +import pika +import json + +class PikaManager: + + def __init__(self, ip): + """ + Initialize the class without credentials. + + Parameters + ---------- + ip : string + The server IP. + """ + self.server_ip = ip + self.MAX_ERR_ATTEMPT = 3 + + def __init__(self, ip, username, password): + """ + Initialize the class with credentials. + + Parameters + ---------- + ip : string + The server IP. + username : string + The user login. + password : string + The user password. + """ + self.server_ip = ip + self.MAX_ERR_ATTEMPT = 3 + self.add_credentials(username, password) + self.add_blockConnection() + + def add_credentials(self, username, password): + """ + Add user credentials. + + Parameters + ---------- + username : string + The user login. + password : string + The user password. + """ + self.credentials = pika.PlainCredentials(username, password) + + def add_blockConnection(self): + """ + Create the blocking connection object. Credentials are used. + """ + self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) + self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) + + def _reload_connection(self, connection): + """ + Reload a specific connection. + + Parameters + ---------- + connection : Object + The connection to be reloaded. + """ + try: + connection.close() + except: + pass + connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ? + + def _setup_channel(self, connection): + """ + Atempt to create a connection. + + Parameters + ---------- + connection : Object + Connection to setup. + Returns + ------- + Object + None if cailed to connect. The object if successfuly connected. + """ + attempts = 0 + + while attempts < self.MAX_ERR_ATTEMPT: + try: + channel = connection.channel() + return channel + except: + self._reload_connection(connection) + attempts += 1 + print("Send Error: Attempt(" + str(attempts) + ")") + + if attempts == self.MAX_ERR_ATTEMPT: + return None + + def send_to_queue(self, queue_name, body, props): + """ + Send a message to the queue. + + Parameters + ---------- + queue_name : string + Queue that receives the message. + body : string + The message to be sent. + props : Object + Object containing a set of 14 properties. + """ + channel = self._setup_channel(self.conn_send) + if channel != None: + channel.queue_declare(queue = queue_name) + channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body)) + channel.close() + + def receive_from_queue(self, queue_name, callback): + """ + Receive a message from the queue. + + Parameters + ---------- + queue_name : string + Queue where the message will be received. + callback : function + Function that process the message. + """ + channel = self._setup_channel(self.conn_receive) + if channel != None: + channel.queue_declare(queue = queue_name) + channel.basic_qos(prefetch_count = 1) + channel.basic_consume(callback, queue = queue_name, no_ack = True) + channel.start_consuming() + channel.close() + + def get_conn_send(self): + """ + Get the send connection. + + Returns + ------- + The send connection. + """ + return self.conn_send + + def get_conn_receive(self): + """ + Get the receive connection. + + Returns + ------- + The receive connection. + """ + return self.conn_receive + + def close_connections(self): + """ + Close all connections. + """ + try: + self.conn_receive.close() + self.conn_send.close() + except: + pass diff --git a/core/logger.py b/core/logger.py new file mode 100644 index 0000000..b178a26 --- /dev/null +++ b/core/logger.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Author: Erickson Silva +E-Mail: erickson.silva@lavid.ufpb.br + +Author: Jonathan Lincoln Brilhante +E-Mail: jonathan.lincoln.brilhante@gmail.com + +Author: Wesnydy Lima Ribeiro +E-Mail: wesnydy@lavid.ufpb.br +""" + +import graypy +import logging +import pika +import PikaManager +import sys + +from time import sleep + +SERVER_URL = sys.argv[1] + +# Manager of queues connections. +manager = PikaManager.PikaManager("150.165.205.10", "test", "test") + +# Logging configuration. +logger = logging.getLogger('text_container') +logger.setLevel(logging.DEBUG) + +handler = graypy.GELFHandler(SERVER_URL, 12201) +logger.addHandler(handler) + +def run(ch, method, properties, body): + print ("Writing log...") + logger.debug(" [L] LOGGER %r" % body) + +def keep_alive(conn_send, conn_receive): + """ + Keep the connection alive. + + Parameters + ---------- + conn_send : object + Connection of writer. + conn_receive : object + Connection of receiver. + """ + while True: + sleep(30) + try: + conn_send.process_data_events() + conn_receive.process_data_events() + except: + continue + +# start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive())) + +print ("Logger listening...") +while True: + try: + manager.receive_from_queue("logs", run) + except KeyboardInterrupt: + manager.close_connections() + os._exit(0) diff --git a/core/processManager.py b/core/processManager.py new file mode 100755 index 0000000..602f8cf --- /dev/null +++ b/core/processManager.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python + +import subprocess +from threading import Thread +import signal +import sys +import os + +KEEP_RUNNING = True + +def signalHandler(signal, frame): + global KEEP_RUNNING + KEEP_RUNNING = False + print("Closing") + sys.exit(0) + +def spawnTranslator(): + while KEEP_RUNNING: + proc = subprocess.Popen(["./translator.py"], shell=True, stdout=subprocess.PIPE) + print("Process TRANSLATOR PID: " + str(proc.pid)) + try: + stdoutdata, stderrdata = proc.communicate() + except: + print "An error occured, running again..." + return None + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signalHandler) + + t = Thread(target=spawnTranslator, args=()) + t.start() diff --git a/core/translator.py b/core/translator.py new file mode 100755 index 0000000..58827d3 --- /dev/null +++ b/core/translator.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Author: Erickson Silva +E-Mail: erickson.silva@lavid.ufpb.br + +Author: Jonathan Lincoln Brilhante +E-Mail: jonathan.lincoln.brilhante@gmail.com + +Author: Wesnydy Lima Ribeiro +E-Mail: wesnydy@lavid.ufpb.br +""" + +import os +import pika +import PikaManager + +from PortGlosa import traduzir +from time import sleep + +# Manager of queues connections. +manager = PikaManager.PikaManager("150.165.205.10", "test", "test") + +def run(ch, method, properties, body): + """ + Execute the worker. + + Parameters + ---------- + ch : object + Channel of communication. + method : function + Callback method. + properties : object + Message containing a set of 14 properties. + body : string + Json string containing the necessary arguments for workers. + """ + print ("Translating...") + gloss = traduzir(body) + manager.send_to_queue("glosses", gloss, properties) + print ("Ok") + +def keep_alive(conn_send, conn_receive): + """ + Keep the connection alive. + + Parameters + ---------- + conn_send : object + Connection of writer. + conn_receive : object + Connection of receiver. + """ + while True: + sleep(30) + try: + conn_send.process_data_events() + conn_receive.process_data_events() + except: + continue + +# start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive())) + +print ("Translator listening...") +while True: + try: + manager.receive_from_queue("texts", run) + except KeyboardInterrupt: + manager.close_connections() + os._exit(0) -- libgit2 0.21.2