Skip to content

Commit

Permalink
New feature : Aggregated packet sub-topic selection
Browse files Browse the repository at this point in the history
This new feature allows the user to select the aggregated
sub-topics names for archive and loop packets if needed, this allows to
subscribe to two different topics when both archive and loop bindings
are enabled.
The defaults are to still continue to post on the "loop" topic for both
types in order to not break existing users applications.

Added a bit of documentation for paho-mqtt installation and for the two
new options. Version bumped to 0.24.
  • Loading branch information
moonpyk committed Aug 23, 2020
1 parent 00e23e8 commit 18f68da
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions bin/user/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
Upload data to MQTT server
This service requires the python bindings for mqtt:
pip install paho-mqtt
apt-get install python3-paho-mqtt # on Debian/Ubuntu based systems if using python3
apt-get install python-paho-mqtt # on Debian/Ubuntu based systems if using python2
pip install paho-mqtt # on any system with pip installed
Minimal configuration:
Expand All @@ -30,12 +31,12 @@
...
aggregation = individual, aggregate # individual, aggregate, or both
Bind to loop packets or archive records:
Bind to loop packets or/and archive records:
[StdRestful]
[[MQTT]]
...
binding = loop # options are loop or archive
binding = loop # options are loop, archive or both
Use the inputs map to customize name, format, or units for any observation:
Expand Down Expand Up @@ -117,7 +118,7 @@
import weewx.units
from weeutil.weeutil import to_int, to_bool, accumulateLeaves

VERSION = "0.23"
VERSION = "0.24"

if weewx.__version__ < "3":
raise weewx.UnsupportedFeature("weewx 3 is required, found %s" %
Expand Down Expand Up @@ -238,6 +239,14 @@ def __init__(self, engine, config_dict):
tls: dictionary of TLS parameters used by the Paho client to establish
a secure connection with the broker.
Default is None
agg_topic_loop: Allows to set the destination sub-topic for aggregated
loop packets (only used if aggregation contains aggregate)
Default is loop
agg_topic_archive: Allows to set the destination sub-topic for aggregated
archive packets (only used if aggregation contains aggregate)
Default is loop
"""
super(MQTT, self).__init__(engine, config_dict)
loginf("service version is %s" % VERSION)
Expand All @@ -261,6 +270,9 @@ def __init__(self, engine, config_dict):
site_dict.setdefault('qos', 0)
site_dict.setdefault('aggregation', 'individual,aggregate')

site_dict.setdefault('agg_topic_loop', 'loop')
site_dict.setdefault('agg_topic_archive', 'loop')

usn = site_dict.get('unit_system', None)
if usn is not None:
site_dict['unit_system'] = weewx.units.unit_constants[usn]
Expand Down Expand Up @@ -308,10 +320,16 @@ def __init__(self, engine, config_dict):
loginf("network encryption/authentication will be attempted")

def new_archive_record(self, event):
self.archive_queue.put(event.record)
# We copy the record and augment it with a type
record = dict(event.record)
record['rcrd_type'] = 'archive'
self.archive_queue.put(record)

def new_loop_packet(self, event):
self.archive_queue.put(event.packet)
# We copy the packet and augment it with a type
record = dict(event.packet)
record['rcrd_type'] = 'loop'
self.archive_queue.put(record)


class TLSDefaults(object):
Expand Down Expand Up @@ -378,6 +396,7 @@ def __init__(self, queue, server_url,
manager_dict=None, tls=None, qos=0,
post_interval=None, stale=None,
log_success=True, log_failure=True,
agg_topic_loop='loop', agg_topic_archive='loop',
timeout=60, max_tries=3, retry_wait=5,
max_backlog=sys.maxsize):
super(MQTTThread, self).__init__(queue,
Expand All @@ -394,6 +413,8 @@ def __init__(self, queue, server_url,
self.server_url = server_url
self.client_id = client_id
self.topic = topic
self.agg_topic_loop = agg_topic_loop
self.agg_topic_archive = agg_topic_archive
self.upload_all = True if obs_to_upload.lower() == 'all' else False
self.append_units_label = append_units_label
self.tls_dict = {}
Expand Down Expand Up @@ -468,8 +489,20 @@ def filter_data(self, record):

def process_record(self, record, dbm):
import socket

# Process the record type, aggregation_topic then delete the added key
if 'rcrd_type' in record:
record_type = record['rcrd_type']
del record['rcrd_type']
else:
record_type = 'loop'

aggregation_topic = self.agg_topic_archive \
if record_type == 'archive' else self.agg_topic_loop

if self.augment_record and dbm is not None:
record = self.get_record(record, dbm)

if self.unit_system is not None:
record = weewx.units.to_std_system(record, self.unit_system)
data = self.filter_data(record)
Expand All @@ -479,6 +512,7 @@ def process_record(self, record, dbm):
loginf("skipping upload")
return
url = urlparse(self.server_url)

for _count in range(self.max_tries):
try:
client_id = self.client_id
Expand All @@ -494,7 +528,8 @@ def process_record(self, record, dbm):
mc.connect(url.hostname, url.port)
mc.loop_start()
if self.aggregation.find('aggregate') >= 0:
tpc = self.topic + '/loop'
tpc = self.topic + '/' + aggregation_topic

(res, mid) = mc.publish(tpc, json.dumps(data),
retain=self.retain, qos=self.qos)
if res != mqtt.MQTT_ERR_SUCCESS:
Expand Down

0 comments on commit 18f68da

Please sign in to comment.