-
Notifications
You must be signed in to change notification settings - Fork 4
/
rpc_floodd
executable file
·64 lines (54 loc) · 2.08 KB
/
rpc_floodd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python3
import json
import pika
import random
rabbitmq_exchange = "rpc_flood"
node1 = pika.URLParameters('amqp://user:password@node1/')
node2 = pika.URLParameters('amqp://user:password@node2/')
node3 = pika.URLParameters('amqp://user:password@node3/')
all_endpoints = [node1, node2, node3]
#
# MAIN
#
def recv_callback(ch, method, properties, body):
print ("Got Request:" + str(body))
try:
request = json.loads(body)
reply_key = request['routing_key']
data = { 'msg': 'have a nice day' }
json_string = json.dumps(data)
channel.basic_publish(exchange=reply_key,
routing_key=reply_key,
body=json_string)
print(" [x] Sent " + json_string + " to exchange " + reply_key)
except Exception as e:
print("ERR - %s" % e)
while True:
try:
random.shuffle(all_endpoints)
# RabbitMQ Connection
connection = pika.BlockingConnection(all_endpoints[0])
channel = connection.channel()
channel.exchange_declare(exchange=rabbitmq_exchange, exchange_type='topic')
result = channel.queue_declare(rabbitmq_exchange + '_control')
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange=rabbitmq_exchange, queue=queue_name, routing_key='control')
channel.basic_consume(queue_name, recv_callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
# except pika.exceptions.ConnectionClosedByBroker:
# # Uncomment this to make the example not attempt recovery
# # from server-initiated connection closure, including
# # when the node is stopped cleanly
# #
# # break
# continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
print("Connection was closed, retrying...")
continue