-
Notifications
You must be signed in to change notification settings - Fork 35
/
psmqtt-publish.py
127 lines (106 loc) · 3.82 KB
/
psmqtt-publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python
#
# Publish the requested data to the MQTT broker and exit.
# All parameters are specified on the command line.
#
import json
import argparse
import logging
import os
import socket
import sys
from typing import Any, Dict, List, Union
def run_tasks(tasks: List[str]) -> None:
# delay import to enable PYTHONPATH adjustment
from src.task import run_task
def parse_task(t:str) -> Union[str, Dict[str, Any]]:
logging.debug("Parsing: %s", t)
if t.startswith('{'):
res = json.loads(t)
assert isinstance(res, dict)
return res
return t
for t in tasks:
task = parse_task(t)
if isinstance(task, dict):
for k,t in task.items():
run_task(k, t)
else:
run_task(task, task)
return
def run() -> None:
def validate_args(args:argparse.Namespace) -> argparse.Namespace:
#
# establish logging level
#
if args.verbose > 2:
logging.basicConfig(level=logging.DEBUG)
elif args.verbose > 1:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.WARNING)
logging.debug('validate_args(%s)', args)
broker_port = args.broker
parts = broker_port.split(':')
if len(parts) == 1:
args.port = 1883
elif len(parts) == 2:
args.broker = parts[0]
args.port = int(parts[1])
else:
raise Exception('Bad broker-port spec')
logging.debug('validate_args() => %s', args)
return args
parser = argparse.ArgumentParser(
prog=os.path.basename(__file__),
description='Report data to the MQTT broker and exit',
epilog='See documentation for task definition syntax and examples')
parser.add_argument('-v', '--verbose', action='count', default=0,
help='verbosity level, defaults to 0. Increase it like this: "-vv"')
parser.add_argument('--qos', type=int, default=0,
help='QOS, defaults to 0')
parser.add_argument('--retain', action='store_true',
help='Retain flag, defaults to False')
parser.add_argument('--username', default='',
help='Broker host and optional port, e.g. "mqtt:1883"')
parser.add_argument('--password', default='', # BAD IDEA
help='MQTT password, defaults to ""')
parser.add_argument('--pythonpath', default='',
help='Add this directory to PYTHONPATH, defaults to ""')
parser.add_argument('broker',
help='Broker host and optional port, e.g. "mqtt:1883"')
parser.add_argument('task', nargs='+',
help='Task, e.g. "cpu_percent", "virtual_memory/percent"')
args = validate_args(parser.parse_args())
#
# start processing args
#
if args.pythonpath:
logging.debug("Adding to PYTHONPATH: '%s'", args.pythonpath)
sys.path.append(args.pythonpath)
topic_prefix = f'psmqtt/{socket.gethostname()}/'
request_topic = 'request'
if request_topic != '':
request_topic = topic_prefix + request_topic + '/'
# delayed import to enable PYTHONPATH adjustment
from src.task import MqttClient
mqttc = MqttClient(f'psmqtt-once-{os.getpid()}', False, topic_prefix,
request_topic, args.qos, args.retain)
#
# connect MqttClient to the specified broker
#
mqttc.connect(args.broker, args.port, args.username, args.password)
run_tasks(args.task)
# wait for ACK from the broker...
while not mqttc.connected:
mqttc.mqttc.loop()
return
if __name__ == '__main__':
try:
run()
except socket.error as err:
logging.error("socket.error caught: %s", err)
except KeyboardInterrupt:
logging.error("KeyboardInterrupt caught, exiting")
except Exception as ex:
logging.error("Caught: %s", ex)