""" Author: Caio Marcelo Campoy Guedes E-Mail: caiomcg@gmail.com Author: Erickson Silva E-Mail: erickson.silva@lavid.ufpb.br Author: Jorismar Barbosa E-Mail: jorismar.barbosa@lavid.ufpb.br Author: Wesnydy Lima Ribeiro E-Mail: wesnydy@lavid.ufpb.br """ import pika import json class PikaManager: def __init__(self, ip, username=None, password=None): """ Initialize the class with credentials. Parameters ---------- ip : string The server IP. username : string The user login. password : string The user password. """ self.server_ip = ip self.MAX_ERR_ATTEMPT = 3 if not all((username, password)): self.add_localBlockConnection() else: self.add_credentials(username, password) self.add_blockConnection() def add_credentials(self, username, password): """ Add user credentials. Parameters ---------- username : string The user login. password : string The user password. """ self.credentials = pika.PlainCredentials(username, password) def add_localBlockConnection(self): """ Create the blocking connection object. """ self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) def add_blockConnection(self): """ Create the blocking connection object. Credentials are used. """ self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) def _reload_connection(self, connection): """ Reload a specific connection. Parameters ---------- connection : Object The connection to be reloaded. """ try: connection.close() except: pass connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ? def _setup_channel(self, connection): """ Atempt to create a connection. Parameters ---------- connection : Object Connection to setup. Returns ------- Object None if cailed to connect. The object if successfuly connected. """ attempts = 0 while attempts < self.MAX_ERR_ATTEMPT: try: channel = connection.channel() return channel except: self._reload_connection(connection) attempts += 1 print("Send Error: Attempt(" + str(attempts) + ")") if attempts == self.MAX_ERR_ATTEMPT: return None def send_to_queue(self, queue_name, body, props): """ Send a message to the queue. Parameters ---------- queue_name : string Queue that receives the message. body : string The message to be sent. props : Object Object containing a set of 14 properties. """ channel = self._setup_channel(self.conn_send) if channel != None: channel.queue_declare(queue = queue_name) channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body)) channel.close() def receive_from_queue(self, queue_name, callback): """ Receive a message from the queue. Parameters ---------- queue_name : string Queue where the message will be received. callback : function Function that process the message. """ channel = self._setup_channel(self.conn_receive) if channel != None: channel.queue_declare(queue = queue_name) channel.basic_qos(prefetch_count = 1) channel.basic_consume(callback, queue = queue_name, no_ack = True) channel.start_consuming() channel.close() def get_conn_send(self): """ Get the send connection. Returns ------- The send connection. """ return self.conn_send def get_conn_receive(self): """ Get the receive connection. Returns ------- The receive connection. """ return self.conn_receive def close_connections(self): """ Close all connections. """ try: self.conn_receive.close() self.conn_send.close() except: pass