-
Notifications
You must be signed in to change notification settings - Fork 4
/
check_rabbitmq_unroutable_msg
executable file
·146 lines (127 loc) · 4.67 KB
/
check_rabbitmq_unroutable_msg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
import argparse
import configparser
import json
import logging
import re
import requests
import sys
from pathlib import Path
from pprint import pprint
from requests.auth import HTTPBasicAuth
from urllib.parse import quote
parser = argparse.ArgumentParser(description='check openstack rabbitmq for unroutable msgs.')
parser.add_argument('config', type=str, help='ini-based config-file to read settings from')
args = parser.parse_args()
# Read Config
config = configparser.ConfigParser()
config.read(args.config)
base = config['check_unroutable']['base']
vhost = config['check_unroutable']['vhost']
username = config['check_unroutable']['username']
password = config['check_unroutable']['password']
if 'logging' in config:
logging.basicConfig(level=config['logging']['level'])
basic_auth = HTTPBasicAuth(username, password)
ret_code = 0
json_data_file = str(Path.home()) + "/.check_rabbitmq_unroutable_msg.json"
unroutable_queue = "unroutable"
dropmsg_queue = "dropmsg"
dropmsg_data = [
{'exchange': 'q-agent-notifier-security_group-update_fanout'},
{'exchange': 'q-agent-notifier-port-update_fanout'},
{'exchange': 'q-agent-notifier-port-delete_fanout'},
{'exchange': 'q-agent-notifier-network-delete_fanout'},
]
#
# Go..
#
try:
with open(json_data_file, "r") as fp:
drop_unroutable = json.load(fp)
except FileNotFoundError:
drop_unroutable = {}
#
# create dropmsg_queue
path = "/api/queues/" + quote(vhost, safe='') + "/" + dropmsg_queue
response = requests.put(base+path, auth=basic_auth, data='{"auto_delete":false,"durable":false,"arguments":{"x-message-ttl": 10000}}')
logging.debug(response.text)
response.raise_for_status()
#
# create bindings from exchanges to dropmsg_queue
for dropmsg in dropmsg_data:
# fix bindings
path = "/api/exchanges/" + quote(vhost, safe='') + "/" + dropmsg['exchange']
logging.debug("Req %s" % path)
try:
response = requests.get(base+path, auth=basic_auth)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code != 404:
logging.error(response.text)
raise(e)
logging.debug("Got %s" % response.text)
if response.status_code == 404:
continue
path = "/api/bindings/" + quote(vhost, safe='') + "/e/" + dropmsg['exchange'] + "/q/" + dropmsg_queue
logging.debug("POST-Req %s" % path)
response = requests.post(base+path, auth=basic_auth)
response.raise_for_status()
logging.debug(response.text)
logging.debug("OK")
#
# check if unroutable_counter increases
path = '/api/channels'
logging.debug("Req %s" % path)
response = requests.get(base+path, auth=basic_auth)
response.raise_for_status()
logging.debug("OK")
new_drop_unroutable = {}
if(response.ok):
data = response.json()
for entry in data:
if 'message_stats' in entry:
if 'drop_unroutable' in entry['message_stats']:
if entry['message_stats']['drop_unroutable'] > 0:
name = entry['name']
if name not in drop_unroutable or entry['message_stats']['drop_unroutable'] > drop_unroutable[name]:
logging.error("dropped msg in %s" % name)
pprint(entry['user'])
ret_code = 2
new_drop_unroutable[entry['name']] = entry['message_stats']['drop_unroutable']
logging.debug("---")
with open(json_data_file, "w") as fp:
json.dump(new_drop_unroutable, fp)
#
# create unroutable_queue
path = "/api/queues/" + quote(vhost, safe='') + "/" + unroutable_queue
response = requests.put(base+path, auth=basic_auth, data='{"auto_delete":false,"durable":false,"arguments":{"x-message-ttl": 300000}}')
logging.debug(response.text)
response.raise_for_status()
#
# get msg from unroutable queue
path = "/api/queues/" + quote(vhost, safe='') + "/" + unroutable_queue + "/get"
logging.debug("Req %s" % path)
try:
response = requests.post(base+path, auth=basic_auth, data='{"count": 1000, "ackmode": "ack_requeue_false", "encoding": "auto"}')
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if response.status_code != 404:
logging.error(response.text)
raise(e)
else:
logging.debug("Queue not found")
if response.status_code != 404:
logging.debug("OK")
for entry in response.json():
exchange = entry['exchange']
try:
payload = json.loads(entry['payload'])
routing_key = entry['routing_key']
logging.error("Exchange: %s, RoutingKey: %s could not be routed" % (exchange, routing_key))
ret_code = 2
except Exception:
pprint(entry)
if (ret_code == 0):
print("OK")
sys.exit(ret_code)