PikaManager.py_old 2.94 KB

"""
Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com

Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com

Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com
"""

import pika
import json

class PikaManager:

	def __init__(self, ip):
		"""
		Initialize the class without

		Parameters
		----------
		ip : string
			The server IP.
		"""
		self.server_ip = ip
		self.MAX_ERR_ATTEMPT = 3
		self.add_localBlockConnection()

	def __init__(self, ip, username, password):
		self.server_ip = ip
		self.MAX_ERR_ATTEMPT = 3
		self.add_credentials(username, password)
		self.add_blockConnection()

	def add_credentials(self, username, password):
		self.credentials = pika.PlainCredentials(username, password)

	def add_blockConnection(self):
		self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))
		self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))

	def add_localBlockConnection(self):
		self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0))
		self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0))

	def _reload_connection(self, connection):
		try:
			connection.close()
		except:
			pass #HANDLE
		connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ?

	def _setup_channel(self, connection):
		"""
		Atempt to create a connection.

		Parameters
		----------
		connection : Object
			Connection to setup.
		Returns
		-------
		Object
			None if cailed to connect. The object if successfuly connected.
		"""
		attempts = 0

		while attempts < self.MAX_ERR_ATTEMPT:
			try:
				channel = connection.channel()
				return channel
			except:
				self._reload_connection(connection)
				attempts += 1
				print("Send Error: Attempt(" + str(attempts) + ")")

		if attempts == self.MAX_ERR_ATTEMPT:
			return None

	def send_to_queue(self, queue_name, body, props):
		channel = self._setup_channel(self.conn_send)
		if channel != None:
			channel.queue_declare(queue = queue_name)
			channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body))
			channel.close()

	def receive_from_queue(self, queue_name, callback):
		channel = self._setup_channel(self.conn_receive)
		if channel != None:
			channel.queue_declare(queue = queue_name)
			channel.basic_qos(prefetch_count = 1)
			channel.basic_consume(callback, queue = queue_name, no_ack = True)
			channel.start_consuming()
			channel.close()

	def get_conn_send(self):
		return self.conn_send

	def get_conn_receive(self):
		return self.conn_receive

	def close_connections(self):
		try:
			self.conn_receive.close()
			self.conn_send.close()
		except:
			pass