""" Author: Caio Marcelo Campoy Guedes E-Mail: caiomcg@gmail.com Author: Caio Marcelo Campoy Guedes E-Mail: caiomcg@gmail.com Author: Caio Marcelo Campoy Guedes E-Mail: caiomcg@gmail.com """ import pika import json class PikaManager: def __init__(self, ip): """ Initialize the class without Parameters ---------- ip : string The server IP. """ self.server_ip = ip self.MAX_ERR_ATTEMPT = 3 def __init__(self, ip, username, password): self.server_ip = ip self.MAX_ERR_ATTEMPT = 3 self.add_credentials(username, password) self.add_blockConnection() def add_credentials(self, username, password): self.credentials = pika.PlainCredentials(username, password) def add_blockConnection(self): self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0)) def add_localBlockConnection(self): def _reload_connection(self, connection): try: connection.close() except: pass #HANDLE connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ? def _setup_channel(self, connection): """ Atempt to create a connection. Parameters ---------- connection : Object Connection to setup. Returns ------- Object None if cailed to connect. The object if successfuly connected. """ attempts = 0 while attempts < self.MAX_ERR_ATTEMPT: try: channel = connection.channel() return channel except: self._reload_connection(connection) attempts += 1 print("Send Error: Attempt(" + str(attempts) + ")") if attempts == self.MAX_ERR_ATTEMPT: return None def send_to_queue(self, queue_name, body, props): channel = self._setup_channel(self.conn_send) if channel != None: channel.queue_declare(queue = queue_name) channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body)) channel.close() def receive_from_queue(self, queue_name, callback): channel = self._setup_channel(self.conn_receive) if channel != None: channel.queue_declare(queue = queue_name) channel.basic_qos(prefetch_count = 1) channel.basic_consume(callback, queue = queue_name, no_ack = True) channel.start_consuming() channel.close() def get_conn_send(self): return self.conn_send def get_conn_receive(self): return self.conn_receive def close_connections(self): try: self.conn_receive.close() self.conn_send.close() except: pass