From 18f68da97ba65d06d9886b066d0110a0addc639d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Bourgeois?= Date: Sun, 23 Aug 2020 10:47:03 +0000 Subject: [PATCH] New feature : Aggregated packet sub-topic selection This new feature allows the user to select the aggregated sub-topics names for archive and loop packets if needed, this allows to subscribe to two different topics when both archive and loop bindings are enabled. The defaults are to still continue to post on the "loop" topic for both types in order to not break existing users applications. Added a bit of documentation for paho-mqtt installation and for the two new options. Version bumped to 0.24. --- bin/user/mqtt.py | 51 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/bin/user/mqtt.py b/bin/user/mqtt.py index 00d0448..3e0768b 100644 --- a/bin/user/mqtt.py +++ b/bin/user/mqtt.py @@ -4,8 +4,9 @@ Upload data to MQTT server This service requires the python bindings for mqtt: - - pip install paho-mqtt + apt-get install python3-paho-mqtt # on Debian/Ubuntu based systems if using python3 + apt-get install python-paho-mqtt # on Debian/Ubuntu based systems if using python2 + pip install paho-mqtt # on any system with pip installed Minimal configuration: @@ -30,12 +31,12 @@ ... aggregation = individual, aggregate # individual, aggregate, or both -Bind to loop packets or archive records: +Bind to loop packets or/and archive records: [StdRestful] [[MQTT]] ... - binding = loop # options are loop or archive + binding = loop # options are loop, archive or both Use the inputs map to customize name, format, or units for any observation: @@ -117,7 +118,7 @@ import weewx.units from weeutil.weeutil import to_int, to_bool, accumulateLeaves -VERSION = "0.23" +VERSION = "0.24" if weewx.__version__ < "3": raise weewx.UnsupportedFeature("weewx 3 is required, found %s" % @@ -238,6 +239,14 @@ def __init__(self, engine, config_dict): tls: dictionary of TLS parameters used by the Paho client to establish a secure connection with the broker. Default is None + + agg_topic_loop: Allows to set the destination sub-topic for aggregated + loop packets (only used if aggregation contains aggregate) + Default is loop + + agg_topic_archive: Allows to set the destination sub-topic for aggregated + archive packets (only used if aggregation contains aggregate) + Default is loop """ super(MQTT, self).__init__(engine, config_dict) loginf("service version is %s" % VERSION) @@ -261,6 +270,9 @@ def __init__(self, engine, config_dict): site_dict.setdefault('qos', 0) site_dict.setdefault('aggregation', 'individual,aggregate') + site_dict.setdefault('agg_topic_loop', 'loop') + site_dict.setdefault('agg_topic_archive', 'loop') + usn = site_dict.get('unit_system', None) if usn is not None: site_dict['unit_system'] = weewx.units.unit_constants[usn] @@ -308,10 +320,16 @@ def __init__(self, engine, config_dict): loginf("network encryption/authentication will be attempted") def new_archive_record(self, event): - self.archive_queue.put(event.record) + # We copy the record and augment it with a type + record = dict(event.record) + record['rcrd_type'] = 'archive' + self.archive_queue.put(record) def new_loop_packet(self, event): - self.archive_queue.put(event.packet) + # We copy the packet and augment it with a type + record = dict(event.packet) + record['rcrd_type'] = 'loop' + self.archive_queue.put(record) class TLSDefaults(object): @@ -378,6 +396,7 @@ def __init__(self, queue, server_url, manager_dict=None, tls=None, qos=0, post_interval=None, stale=None, log_success=True, log_failure=True, + agg_topic_loop='loop', agg_topic_archive='loop', timeout=60, max_tries=3, retry_wait=5, max_backlog=sys.maxsize): super(MQTTThread, self).__init__(queue, @@ -394,6 +413,8 @@ def __init__(self, queue, server_url, self.server_url = server_url self.client_id = client_id self.topic = topic + self.agg_topic_loop = agg_topic_loop + self.agg_topic_archive = agg_topic_archive self.upload_all = True if obs_to_upload.lower() == 'all' else False self.append_units_label = append_units_label self.tls_dict = {} @@ -468,8 +489,20 @@ def filter_data(self, record): def process_record(self, record, dbm): import socket + + # Process the record type, aggregation_topic then delete the added key + if 'rcrd_type' in record: + record_type = record['rcrd_type'] + del record['rcrd_type'] + else: + record_type = 'loop' + + aggregation_topic = self.agg_topic_archive \ + if record_type == 'archive' else self.agg_topic_loop + if self.augment_record and dbm is not None: record = self.get_record(record, dbm) + if self.unit_system is not None: record = weewx.units.to_std_system(record, self.unit_system) data = self.filter_data(record) @@ -479,6 +512,7 @@ def process_record(self, record, dbm): loginf("skipping upload") return url = urlparse(self.server_url) + for _count in range(self.max_tries): try: client_id = self.client_id @@ -494,7 +528,8 @@ def process_record(self, record, dbm): mc.connect(url.hostname, url.port) mc.loop_start() if self.aggregation.find('aggregate') >= 0: - tpc = self.topic + '/loop' + tpc = self.topic + '/' + aggregation_topic + (res, mid) = mc.publish(tpc, json.dumps(data), retain=self.retain, qos=self.qos) if res != mqtt.MQTT_ERR_SUCCESS: