PikaManager.py 4.97 KB

"""
Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com

Author: Erickson Silva
E-Mail: erickson.silva@lavid.ufpb.br

Author: Jorismar Barbosa
E-Mail: jorismar.barbosa@lavid.ufpb.br

Author: Wesnydy Lima Ribeiro
E-Mail: wesnydy@lavid.ufpb.br
"""

import pika
import json

class PikaManager:

    def __init__(self, ip, username=None, password=None):
        """
        Initialize the class with credentials.

        Parameters
        ----------
        ip : string
            The server IP.
        username : string
            The user login.
        password : string
            The user password.
        """
        self.server_ip = ip
        self.MAX_ERR_ATTEMPT = 3

        if not all((username, password)):
            self.add_localBlockConnection()
        else:
            self.add_credentials(username, password)
            self.add_blockConnection()

    def add_credentials(self, username, password):
        """
        Add user credentials.

        Parameters
        ----------
        username : string
            The user login.
        password : string
            The user password.
        """
        self.credentials = pika.PlainCredentials(username, password)

    def add_localBlockConnection(self):
        """
        Create the blocking connection object.
        """
        self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0))
        self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0))

    def add_blockConnection(self):
        """
        Create the blocking connection object. Credentials are used.
        """
        self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))
        self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))

    def _reload_connection(self, connection):
        """
        Reload a specific connection.

        Parameters
        ----------
        connection : Object
            The connection to be reloaded.
        """
        try:
            connection.close()
        except:
            pass
        connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ?

    def _setup_channel(self, connection):
        """
        Atempt to create a connection.

        Parameters
        ----------
        connection : Object
            Connection to setup.
        Returns
        -------
        Object
            None if cailed to connect. The object if successfuly connected.
        """
        attempts = 0

        while attempts < self.MAX_ERR_ATTEMPT:
            try:
                channel = connection.channel()
                return channel
            except:
                self._reload_connection(connection)
                attempts += 1
                print("Send Error: Attempt(" + str(attempts) + ")")

        if attempts == self.MAX_ERR_ATTEMPT:
            return None

    def send_to_queue(self, queue_name, body, props):
        """
        Send a message to the queue.

        Parameters
        ----------
        queue_name : string
            Queue that receives the message.
        body : string
            The message to be sent.
        props : Object
            Object containing a set of 14 properties.
        """
        channel = self._setup_channel(self.conn_send)
        if channel != None:
            channel.queue_declare(queue = queue_name)
            channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = body)
            channel.close()

    def receive_from_queue(self, queue_name, callback):
        """
        Receive a message from the queue.

        Parameters
        ----------
        queue_name : string
            Queue where the message will be received.
        callback : function
            Function that process the message.
        """
        channel = self._setup_channel(self.conn_receive)
        if channel != None:
            channel.queue_declare(queue = queue_name)
            channel.basic_qos(prefetch_count = 1)
            channel.basic_consume(callback, queue = queue_name, no_ack = True)
            channel.start_consuming()
            channel.close()

    def get_conn_send(self):
        """
        Get the send connection.

        Returns
        -------
            The send connection.
        """
        return self.conn_send

    def get_conn_receive(self):
        """
        Get the receive connection.

        Returns
        -------
            The receive connection.
        """
        return self.conn_receive

    def close_connections(self):
        """
        Close all connections.
        """
        try:
            self.conn_receive.close()
            self.conn_send.close()
        except:
            pass