diff --git a/bin/user/mqtt.py b/bin/user/mqtt.py index 00d0448..3e0768b 100644 --- a/bin/user/mqtt.py +++ b/bin/user/mqtt.py @@ -4,8 +4,9 @@ Upload data to MQTT server This service requires the python bindings for mqtt: - - pip install paho-mqtt + apt-get install python3-paho-mqtt # on Debian/Ubuntu based systems if using python3 + apt-get install python-paho-mqtt # on Debian/Ubuntu based systems if using python2 + pip install paho-mqtt # on any system with pip installed Minimal configuration: @@ -30,12 +31,12 @@ ... aggregation = individual, aggregate # individual, aggregate, or both -Bind to loop packets or archive records: +Bind to loop packets or/and archive records: [StdRestful] [[MQTT]] ... - binding = loop # options are loop or archive + binding = loop # options are loop, archive or both Use the inputs map to customize name, format, or units for any observation: @@ -117,7 +118,7 @@ import weewx.units from weeutil.weeutil import to_int, to_bool, accumulateLeaves -VERSION = "0.23" +VERSION = "0.24" if weewx.__version__ < "3": raise weewx.UnsupportedFeature("weewx 3 is required, found %s" % @@ -238,6 +239,14 @@ def __init__(self, engine, config_dict): tls: dictionary of TLS parameters used by the Paho client to establish a secure connection with the broker. Default is None + + agg_topic_loop: Allows to set the destination sub-topic for aggregated + loop packets (only used if aggregation contains aggregate) + Default is loop + + agg_topic_archive: Allows to set the destination sub-topic for aggregated + archive packets (only used if aggregation contains aggregate) + Default is loop """ super(MQTT, self).__init__(engine, config_dict) loginf("service version is %s" % VERSION) @@ -261,6 +270,9 @@ def __init__(self, engine, config_dict): site_dict.setdefault('qos', 0) site_dict.setdefault('aggregation', 'individual,aggregate') + site_dict.setdefault('agg_topic_loop', 'loop') + site_dict.setdefault('agg_topic_archive', 'loop') + usn = site_dict.get('unit_system', None) if usn is not None: site_dict['unit_system'] = weewx.units.unit_constants[usn] @@ -308,10 +320,16 @@ def __init__(self, engine, config_dict): loginf("network encryption/authentication will be attempted") def new_archive_record(self, event): - self.archive_queue.put(event.record) + # We copy the record and augment it with a type + record = dict(event.record) + record['rcrd_type'] = 'archive' + self.archive_queue.put(record) def new_loop_packet(self, event): - self.archive_queue.put(event.packet) + # We copy the packet and augment it with a type + record = dict(event.packet) + record['rcrd_type'] = 'loop' + self.archive_queue.put(record) class TLSDefaults(object): @@ -378,6 +396,7 @@ def __init__(self, queue, server_url, manager_dict=None, tls=None, qos=0, post_interval=None, stale=None, log_success=True, log_failure=True, + agg_topic_loop='loop', agg_topic_archive='loop', timeout=60, max_tries=3, retry_wait=5, max_backlog=sys.maxsize): super(MQTTThread, self).__init__(queue, @@ -394,6 +413,8 @@ def __init__(self, queue, server_url, self.server_url = server_url self.client_id = client_id self.topic = topic + self.agg_topic_loop = agg_topic_loop + self.agg_topic_archive = agg_topic_archive self.upload_all = True if obs_to_upload.lower() == 'all' else False self.append_units_label = append_units_label self.tls_dict = {} @@ -468,8 +489,20 @@ def filter_data(self, record): def process_record(self, record, dbm): import socket + + # Process the record type, aggregation_topic then delete the added key + if 'rcrd_type' in record: + record_type = record['rcrd_type'] + del record['rcrd_type'] + else: + record_type = 'loop' + + aggregation_topic = self.agg_topic_archive \ + if record_type == 'archive' else self.agg_topic_loop + if self.augment_record and dbm is not None: record = self.get_record(record, dbm) + if self.unit_system is not None: record = weewx.units.to_std_system(record, self.unit_system) data = self.filter_data(record) @@ -479,6 +512,7 @@ def process_record(self, record, dbm): loginf("skipping upload") return url = urlparse(self.server_url) + for _count in range(self.max_tries): try: client_id = self.client_id @@ -494,7 +528,8 @@ def process_record(self, record, dbm): mc.connect(url.hostname, url.port) mc.loop_start() if self.aggregation.find('aggregate') >= 0: - tpc = self.topic + '/loop' + tpc = self.topic + '/' + aggregation_topic + (res, mid) = mc.publish(tpc, json.dumps(data), retain=self.retain, qos=self.qos) if res != mqtt.MQTT_ERR_SUCCESS: