diff --git a/bin/user/mqtt.py b/bin/user/mqtt.py index 00d0448..8e23a04 100644 --- a/bin/user/mqtt.py +++ b/bin/user/mqtt.py @@ -4,8 +4,9 @@ Upload data to MQTT server This service requires the python bindings for mqtt: - - pip install paho-mqtt + apt-get install python3-paho-mqtt # on Debian/Ubuntu based systems if using python3 + apt-get install python-paho-mqtt # on Debian/Ubuntu based systems if using python2 + pip install paho-mqtt # on any system with pip installed Minimal configuration: @@ -30,12 +31,12 @@ ... aggregation = individual, aggregate # individual, aggregate, or both -Bind to loop packets or archive records: +Bind to loop packets or/and archive records: [StdRestful] [[MQTT]] ... - binding = loop # options are loop or archive + binding = loop # options are loop, archive or both Use the inputs map to customize name, format, or units for any observation: @@ -238,6 +239,14 @@ def __init__(self, engine, config_dict): tls: dictionary of TLS parameters used by the Paho client to establish a secure connection with the broker. Default is None + + agg_topic_loop: Allows to set the destination sub-topic for aggregated + loop packets (only used if aggregation contains aggregate) + Default is loop + + agg_topic_archive: Allows to set the destination sub-topic for aggregated + archive packets (only used if aggregation contains aggregate) + Default is loop """ super(MQTT, self).__init__(engine, config_dict) loginf("service version is %s" % VERSION) @@ -261,6 +270,9 @@ def __init__(self, engine, config_dict): site_dict.setdefault('qos', 0) site_dict.setdefault('aggregation', 'individual,aggregate') + site_dict.setdefault('agg_topic_loop', 'loop') + site_dict.setdefault('agg_topic_archive', 'loop') + usn = site_dict.get('unit_system', None) if usn is not None: site_dict['unit_system'] = weewx.units.unit_constants[usn] @@ -378,6 +390,7 @@ def __init__(self, queue, server_url, manager_dict=None, tls=None, qos=0, post_interval=None, stale=None, log_success=True, log_failure=True, + agg_topic_loop='loop', agg_topic_archive='loop', timeout=60, max_tries=3, retry_wait=5, max_backlog=sys.maxsize): super(MQTTThread, self).__init__(queue, @@ -394,6 +407,8 @@ def __init__(self, queue, server_url, self.server_url = server_url self.client_id = client_id self.topic = topic + self.agg_topic_loop = agg_topic_loop + self.agg_topic_archive = agg_topic_archive self.upload_all = True if obs_to_upload.lower() == 'all' else False self.append_units_label = append_units_label self.tls_dict = {} @@ -468,8 +483,15 @@ def filter_data(self, record): def process_record(self, record, dbm): import socket + + # Archive packets are easily discriminated by the fact they have an 'interval' property + # LOOP packets don't have it + aggregation_topic = self.agg_topic_archive \ + if 'interval' in record else self.agg_topic_loop + if self.augment_record and dbm is not None: record = self.get_record(record, dbm) + if self.unit_system is not None: record = weewx.units.to_std_system(record, self.unit_system) data = self.filter_data(record) @@ -479,6 +501,7 @@ def process_record(self, record, dbm): loginf("skipping upload") return url = urlparse(self.server_url) + for _count in range(self.max_tries): try: client_id = self.client_id @@ -494,7 +517,8 @@ def process_record(self, record, dbm): mc.connect(url.hostname, url.port) mc.loop_start() if self.aggregation.find('aggregate') >= 0: - tpc = self.topic + '/loop' + tpc = self.topic + '/' + aggregation_topic + (res, mid) = mc.publish(tpc, json.dumps(data), retain=self.retain, qos=self.qos) if res != mqtt.MQTT_ERR_SUCCESS: