Commit 7517012921544ef91796c8cd147f79c872cceef7

Authored by Wesnydy Ribeiro
1 parent 558bba05

Refactory of workers

core/PikaManager.py 0 → 100644
... ... @@ -0,0 +1,179 @@
  1 +
  2 +"""
  3 +Author: Caio Marcelo Campoy Guedes
  4 +E-Mail: caiomcg@gmail.com
  5 +
  6 +Author: Erickson Silva
  7 +E-Mail: erickson.silva@lavid.ufpb.br
  8 +
  9 +Author: Jorismar Barbosa
  10 +E-Mail: jorismar.barbosa@lavid.ufpb.br
  11 +
  12 +Author: Wesnydy Lima Ribeiro
  13 +E-Mail: wesnydy@lavid.ufpb.br
  14 +"""
  15 +
  16 +import pika
  17 +import json
  18 +
  19 +class PikaManager:
  20 +
  21 + def __init__(self, ip):
  22 + """
  23 + Initialize the class without credentials.
  24 +
  25 + Parameters
  26 + ----------
  27 + ip : string
  28 + The server IP.
  29 + """
  30 + self.server_ip = ip
  31 + self.MAX_ERR_ATTEMPT = 3
  32 +
  33 + def __init__(self, ip, username, password):
  34 + """
  35 + Initialize the class with credentials.
  36 +
  37 + Parameters
  38 + ----------
  39 + ip : string
  40 + The server IP.
  41 + username : string
  42 + The user login.
  43 + password : string
  44 + The user password.
  45 + """
  46 + self.server_ip = ip
  47 + self.MAX_ERR_ATTEMPT = 3
  48 + self.add_credentials(username, password)
  49 + self.add_blockConnection()
  50 +
  51 + def add_credentials(self, username, password):
  52 + """
  53 + Add user credentials.
  54 +
  55 + Parameters
  56 + ----------
  57 + username : string
  58 + The user login.
  59 + password : string
  60 + The user password.
  61 + """
  62 + self.credentials = pika.PlainCredentials(username, password)
  63 +
  64 + def add_blockConnection(self):
  65 + """
  66 + Create the blocking connection object. Credentials are used.
  67 + """
  68 + self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))
  69 + self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))
  70 +
  71 + def _reload_connection(self, connection):
  72 + """
  73 + Reload a specific connection.
  74 +
  75 + Parameters
  76 + ----------
  77 + connection : Object
  78 + The connection to be reloaded.
  79 + """
  80 + try:
  81 + connection.close()
  82 + except:
  83 + pass
  84 + connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ?
  85 +
  86 + def _setup_channel(self, connection):
  87 + """
  88 + Atempt to create a connection.
  89 +
  90 + Parameters
  91 + ----------
  92 + connection : Object
  93 + Connection to setup.
  94 + Returns
  95 + -------
  96 + Object
  97 + None if cailed to connect. The object if successfuly connected.
  98 + """
  99 + attempts = 0
  100 +
  101 + while attempts < self.MAX_ERR_ATTEMPT:
  102 + try:
  103 + channel = connection.channel()
  104 + return channel
  105 + except:
  106 + self._reload_connection(connection)
  107 + attempts += 1
  108 + print("Send Error: Attempt(" + str(attempts) + ")")
  109 +
  110 + if attempts == self.MAX_ERR_ATTEMPT:
  111 + return None
  112 +
  113 + def send_to_queue(self, queue_name, body, props):
  114 + """
  115 + Send a message to the queue.
  116 +
  117 + Parameters
  118 + ----------
  119 + queue_name : string
  120 + Queue that receives the message.
  121 + body : string
  122 + The message to be sent.
  123 + props : Object
  124 + Object containing a set of 14 properties.
  125 + """
  126 + channel = self._setup_channel(self.conn_send)
  127 + if channel != None:
  128 + channel.queue_declare(queue = queue_name)
  129 + channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body))
  130 + channel.close()
  131 +
  132 + def receive_from_queue(self, queue_name, callback):
  133 + """
  134 + Receive a message from the queue.
  135 +
  136 + Parameters
  137 + ----------
  138 + queue_name : string
  139 + Queue where the message will be received.
  140 + callback : function
  141 + Function that process the message.
  142 + """
  143 + channel = self._setup_channel(self.conn_receive)
  144 + if channel != None:
  145 + channel.queue_declare(queue = queue_name)
  146 + channel.basic_qos(prefetch_count = 1)
  147 + channel.basic_consume(callback, queue = queue_name, no_ack = True)
  148 + channel.start_consuming()
  149 + channel.close()
  150 +
  151 + def get_conn_send(self):
  152 + """
  153 + Get the send connection.
  154 +
  155 + Returns
  156 + -------
  157 + The send connection.
  158 + """
  159 + return self.conn_send
  160 +
  161 + def get_conn_receive(self):
  162 + """
  163 + Get the receive connection.
  164 +
  165 + Returns
  166 + -------
  167 + The receive connection.
  168 + """
  169 + return self.conn_receive
  170 +
  171 + def close_connections(self):
  172 + """
  173 + Close all connections.
  174 + """
  175 + try:
  176 + self.conn_receive.close()
  177 + self.conn_send.close()
  178 + except:
  179 + pass
... ...
core/logger.py 0 → 100644
... ... @@ -0,0 +1,66 @@
  1 +#!/usr/bin/env python
  2 +# -*- coding: utf-8 -*-
  3 +
  4 +"""
  5 +Author: Erickson Silva
  6 +E-Mail: erickson.silva@lavid.ufpb.br
  7 +
  8 +Author: Jonathan Lincoln Brilhante
  9 +E-Mail: jonathan.lincoln.brilhante@gmail.com
  10 +
  11 +Author: Wesnydy Lima Ribeiro
  12 +E-Mail: wesnydy@lavid.ufpb.br
  13 +"""
  14 +
  15 +import graypy
  16 +import logging
  17 +import pika
  18 +import PikaManager
  19 +import sys
  20 +
  21 +from time import sleep
  22 +
  23 +SERVER_URL = sys.argv[1]
  24 +
  25 +# Manager of queues connections.
  26 +manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
  27 +
  28 +# Logging configuration.
  29 +logger = logging.getLogger('text_container')
  30 +logger.setLevel(logging.DEBUG)
  31 +
  32 +handler = graypy.GELFHandler(SERVER_URL, 12201)
  33 +logger.addHandler(handler)
  34 +
  35 +def run(ch, method, properties, body):
  36 + print ("Writing log...")
  37 + logger.debug(" [L] LOGGER %r" % body)
  38 +
  39 +def keep_alive(conn_send, conn_receive):
  40 + """
  41 + Keep the connection alive.
  42 +
  43 + Parameters
  44 + ----------
  45 + conn_send : object
  46 + Connection of writer.
  47 + conn_receive : object
  48 + Connection of receiver.
  49 + """
  50 + while True:
  51 + sleep(30)
  52 + try:
  53 + conn_send.process_data_events()
  54 + conn_receive.process_data_events()
  55 + except:
  56 + continue
  57 +
  58 +# start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive()))
  59 +
  60 +print ("Logger listening...")
  61 +while True:
  62 + try:
  63 + manager.receive_from_queue("logs", run)
  64 + except KeyboardInterrupt:
  65 + manager.close_connections()
  66 + os._exit(0)
... ...
core/processManager.py 0 → 100755
... ... @@ -0,0 +1,31 @@
  1 +#!/usr/bin/env python
  2 +
  3 +import subprocess
  4 +from threading import Thread
  5 +import signal
  6 +import sys
  7 +import os
  8 +
  9 +KEEP_RUNNING = True
  10 +
  11 +def signalHandler(signal, frame):
  12 + global KEEP_RUNNING
  13 + KEEP_RUNNING = False
  14 + print("Closing")
  15 + sys.exit(0)
  16 +
  17 +def spawnTranslator():
  18 + while KEEP_RUNNING:
  19 + proc = subprocess.Popen(["./translator.py"], shell=True, stdout=subprocess.PIPE)
  20 + print("Process TRANSLATOR PID: " + str(proc.pid))
  21 + try:
  22 + stdoutdata, stderrdata = proc.communicate()
  23 + except:
  24 + print "An error occured, running again..."
  25 + return None
  26 +
  27 +if __name__ == "__main__":
  28 + signal.signal(signal.SIGINT, signalHandler)
  29 +
  30 + t = Thread(target=spawnTranslator, args=())
  31 + t.start()
... ...
core/translator.py 0 → 100755
... ... @@ -0,0 +1,72 @@
  1 +#!/usr/bin/env python
  2 +# -*- coding: utf-8 -*-
  3 +
  4 +"""
  5 +Author: Erickson Silva
  6 +E-Mail: erickson.silva@lavid.ufpb.br
  7 +
  8 +Author: Jonathan Lincoln Brilhante
  9 +E-Mail: jonathan.lincoln.brilhante@gmail.com
  10 +
  11 +Author: Wesnydy Lima Ribeiro
  12 +E-Mail: wesnydy@lavid.ufpb.br
  13 +"""
  14 +
  15 +import os
  16 +import pika
  17 +import PikaManager
  18 +
  19 +from PortGlosa import traduzir
  20 +from time import sleep
  21 +
  22 +# Manager of queues connections.
  23 +manager = PikaManager.PikaManager("150.165.205.10", "test", "test")
  24 +
  25 +def run(ch, method, properties, body):
  26 + """
  27 + Execute the worker.
  28 +
  29 + Parameters
  30 + ----------
  31 + ch : object
  32 + Channel of communication.
  33 + method : function
  34 + Callback method.
  35 + properties : object
  36 + Message containing a set of 14 properties.
  37 + body : string
  38 + Json string containing the necessary arguments for workers.
  39 + """
  40 + print ("Translating...")
  41 + gloss = traduzir(body)
  42 + manager.send_to_queue("glosses", gloss, properties)
  43 + print ("Ok")
  44 +
  45 +def keep_alive(conn_send, conn_receive):
  46 + """
  47 + Keep the connection alive.
  48 +
  49 + Parameters
  50 + ----------
  51 + conn_send : object
  52 + Connection of writer.
  53 + conn_receive : object
  54 + Connection of receiver.
  55 + """
  56 + while True:
  57 + sleep(30)
  58 + try:
  59 + conn_send.process_data_events()
  60 + conn_receive.process_data_events()
  61 + except:
  62 + continue
  63 +
  64 +# start_new_thread(keep_alive, (manager.get_conn_send(), manager.get_conn_receive()))
  65 +
  66 +print ("Translator listening...")
  67 +while True:
  68 + try:
  69 + manager.receive_from_queue("texts", run)
  70 + except KeyboardInterrupt:
  71 + manager.close_connections()
  72 + os._exit(0)
... ...