Skip to content

Commit

Permalink
New feature : Aggregated packet sub-topic selection
Browse files Browse the repository at this point in the history
This new feature allows the user to select the aggregated
sub-topics names for archive and loop packets if needed, this allows to
subscribe to two different topics when both archive and loop bindings
are enabled.
The defaults are to still continue to post on the "loop" topic for both
types in order to not break existing users applications.

Added a bit of documentation for paho-mqtt installation and for the two
new options.
  • Loading branch information
moonpyk committed Aug 31, 2020
1 parent 00e23e8 commit 31e4073
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions bin/user/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
Upload data to MQTT server
This service requires the python bindings for mqtt:
pip install paho-mqtt
apt-get install python3-paho-mqtt # on Debian/Ubuntu based systems if using python3
apt-get install python-paho-mqtt # on Debian/Ubuntu based systems if using python2
pip install paho-mqtt # on any system with pip installed
Minimal configuration:
Expand All @@ -30,12 +31,12 @@
...
aggregation = individual, aggregate # individual, aggregate, or both
Bind to loop packets or archive records:
Bind to loop packets or/and archive records:
[StdRestful]
[[MQTT]]
...
binding = loop # options are loop or archive
binding = loop # options are loop, archive or both
Use the inputs map to customize name, format, or units for any observation:
Expand Down Expand Up @@ -238,6 +239,14 @@ def __init__(self, engine, config_dict):
tls: dictionary of TLS parameters used by the Paho client to establish
a secure connection with the broker.
Default is None
agg_topic_loop: Allows to set the destination sub-topic for aggregated
loop packets (only used if aggregation contains aggregate)
Default is loop
agg_topic_archive: Allows to set the destination sub-topic for aggregated
archive packets (only used if aggregation contains aggregate)
Default is loop
"""
super(MQTT, self).__init__(engine, config_dict)
loginf("service version is %s" % VERSION)
Expand All @@ -261,6 +270,9 @@ def __init__(self, engine, config_dict):
site_dict.setdefault('qos', 0)
site_dict.setdefault('aggregation', 'individual,aggregate')

site_dict.setdefault('agg_topic_loop', 'loop')
site_dict.setdefault('agg_topic_archive', 'loop')

usn = site_dict.get('unit_system', None)
if usn is not None:
site_dict['unit_system'] = weewx.units.unit_constants[usn]
Expand Down Expand Up @@ -378,6 +390,7 @@ def __init__(self, queue, server_url,
manager_dict=None, tls=None, qos=0,
post_interval=None, stale=None,
log_success=True, log_failure=True,
agg_topic_loop='loop', agg_topic_archive='loop',
timeout=60, max_tries=3, retry_wait=5,
max_backlog=sys.maxsize):
super(MQTTThread, self).__init__(queue,
Expand All @@ -394,6 +407,8 @@ def __init__(self, queue, server_url,
self.server_url = server_url
self.client_id = client_id
self.topic = topic
self.agg_topic_loop = agg_topic_loop
self.agg_topic_archive = agg_topic_archive
self.upload_all = True if obs_to_upload.lower() == 'all' else False
self.append_units_label = append_units_label
self.tls_dict = {}
Expand Down Expand Up @@ -468,8 +483,15 @@ def filter_data(self, record):

def process_record(self, record, dbm):
import socket

# Archive packets are easily discriminated by the fact they have an 'interval' property
# LOOP packets don't have it
aggregation_topic = self.agg_topic_archive \
if 'interval' in record else self.agg_topic_loop

if self.augment_record and dbm is not None:
record = self.get_record(record, dbm)

if self.unit_system is not None:
record = weewx.units.to_std_system(record, self.unit_system)
data = self.filter_data(record)
Expand All @@ -479,6 +501,7 @@ def process_record(self, record, dbm):
loginf("skipping upload")
return
url = urlparse(self.server_url)

for _count in range(self.max_tries):
try:
client_id = self.client_id
Expand All @@ -494,7 +517,8 @@ def process_record(self, record, dbm):
mc.connect(url.hostname, url.port)
mc.loop_start()
if self.aggregation.find('aggregate') >= 0:
tpc = self.topic + '/loop'
tpc = self.topic + '/' + aggregation_topic

(res, mid) = mc.publish(tpc, json.dumps(data),
retain=self.retain, qos=self.qos)
if res != mqtt.MQTT_ERR_SUCCESS:
Expand Down

0 comments on commit 31e4073

Please sign in to comment.