PikaManager.py_old
2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com
Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com
Author: Caio Marcelo Campoy Guedes
E-Mail: caiomcg@gmail.com
"""
import pika
import json
class PikaManager:
def __init__(self, ip):
"""
Initialize the class without
Parameters
----------
ip : string
The server IP.
"""
self.server_ip = ip
self.MAX_ERR_ATTEMPT = 3
self.add_localBlockConnection()
def __init__(self, ip, username, password):
self.server_ip = ip
self.MAX_ERR_ATTEMPT = 3
self.add_credentials(username, password)
self.add_blockConnection()
def add_credentials(self, username, password):
self.credentials = pika.PlainCredentials(username, password)
def add_blockConnection(self):
self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))
self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, credentials = self.credentials, heartbeat_interval = 0))
def add_localBlockConnection(self):
self.conn_send = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0))
self.conn_receive = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0))
def _reload_connection(self, connection):
try:
connection.close()
except:
pass #HANDLE
connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.server_ip, heartbeat_interval = 0)) # ?
def _setup_channel(self, connection):
"""
Atempt to create a connection.
Parameters
----------
connection : Object
Connection to setup.
Returns
-------
Object
None if cailed to connect. The object if successfuly connected.
"""
attempts = 0
while attempts < self.MAX_ERR_ATTEMPT:
try:
channel = connection.channel()
return channel
except:
self._reload_connection(connection)
attempts += 1
print("Send Error: Attempt(" + str(attempts) + ")")
if attempts == self.MAX_ERR_ATTEMPT:
return None
def send_to_queue(self, queue_name, body, props):
channel = self._setup_channel(self.conn_send)
if channel != None:
channel.queue_declare(queue = queue_name)
channel.basic_publish(exchange = '', routing_key = queue_name, properties = pika.BasicProperties(correlation_id = props.correlation_id), body = json.dumps(body))
channel.close()
def receive_from_queue(self, queue_name, callback):
channel = self._setup_channel(self.conn_receive)
if channel != None:
channel.queue_declare(queue = queue_name)
channel.basic_qos(prefetch_count = 1)
channel.basic_consume(callback, queue = queue_name, no_ack = True)
channel.start_consuming()
channel.close()
def get_conn_send(self):
return self.conn_send
def get_conn_receive(self):
return self.conn_receive
def close_connections(self):
try:
self.conn_receive.close()
self.conn_send.close()
except:
pass