Skip to content

Commit

Permalink
Merge pull request #667 from onkelandy/sdp_resend
Browse files Browse the repository at this point in the history
proposal: sdp resend feature
  • Loading branch information
Morg42 authored Aug 6, 2024
2 parents 1038d60 + ada53c3 commit dd4ec55
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 22 deletions.
28 changes: 21 additions & 7 deletions lib/model/sdp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def close(self):
self._close()
self._is_connected = False

def send(self, data_dict):
def send(self, data_dict, **kwargs):
"""
Send data, possibly return response
Expand Down Expand Up @@ -175,7 +175,7 @@ def send(self, data_dict):
self._send_lock.acquire()

if self._send_init_on_send():
response = self._send(data_dict)
response = self._send(data_dict, **kwargs)
except Exception:
raise
finally:
Expand Down Expand Up @@ -210,6 +210,12 @@ def on_disconnect(self, by=None):
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](by)

def check_reply(self, command, value):
"""
checking reply, e.g. for resend feature
"""
return self._check_reply(command, value)

#
#
# overwriting needed for at least some of the following methods...
Expand All @@ -232,14 +238,21 @@ def _close(self):
"""
self.logger.debug(f'simulating closing connection as {__name__} with params {self._params}')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
overwrite with sending of data and - possibly - returning response data
Return None if no response is received or expected.
"""
self.logger.debug(f'simulating to send data {data_dict}...')
return self.dummy

def _check_reply(self, command, value):
"""
overwrite with checking of data
Return False by default
"""
return False

def _send_init_on_open(self):
"""
This class can be overwritten if anything special is needed to make the
Expand Down Expand Up @@ -428,7 +441,7 @@ def _open(self):
def _close(self):
self.logger.debug(f'{self.__class__.__name__} closing connection as {__name__} with params {self._params}')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
url = data_dict.get('payload', None)
if not url:
self.logger.error(f'can not send without url parameter from data_dict {data_dict}, aborting')
Expand All @@ -439,7 +452,7 @@ def _send(self, data_dict):

# check for additional data
par = {}
for arg in (REQUEST_DICT_ARGS):
for arg in REQUEST_DICT_ARGS:
par[arg] = data_dict.get(arg, {})

if request_method == 'get':
Expand Down Expand Up @@ -527,7 +540,7 @@ def _close(self):
self.logger.debug(f'{self.__class__.__name__} closing connection')
self._tcp.close()

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
self._tcp.send(data_dict['payload'])

# we receive only via callback, so we return "no reply".
Expand All @@ -539,6 +552,7 @@ def _on_abort(self):
else:
self.logger.warning('suspend callback wanted, but not set by plugin. Check plugin code...')


class UDPServer(socket.socket):
"""
This class sets up a UDP unicast socket listener on local_port
Expand Down Expand Up @@ -737,7 +751,7 @@ def _close(self):
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](self)

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
send data. data_dict needs to contain the following information:
Expand Down
9 changes: 7 additions & 2 deletions lib/model/sdp/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
PLUGIN_ATTR_CONN_RETRY_CYCLE = 'retry_cycle' # if autoreconnect: how many seconds to wait between retry rounds
PLUGIN_ATTR_CONN_RETRY_SUSPD = 'retry_suspend' # after this number of failed connect cycles, activate suspend mode (if enabled)

PLUGIN_ATTR_SEND_RETRIES = 'send_retries' # how often should a command be resent (when not receiving expected answer)
PLUGIN_ATTR_SEND_RETRIES_CYCLE= 'send_retries_cycle' # if using resend protocol: how many seconds to wait between resend rounds

# network attributes
PLUGIN_ATTR_NET_HOST = 'host' # hostname / IP for network connection
PLUGIN_ATTR_NET_PORT = 'port' # port for network connection
Expand Down Expand Up @@ -90,7 +93,8 @@
PLUGIN_ATTR_CONN_RETRY_CYCLE, PLUGIN_ATTR_CONN_RETRY_SUSPD, PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT,
PLUGIN_ATTR_SERIAL_PORT, PLUGIN_ATTR_SERIAL_BAUD, PLUGIN_ATTR_SERIAL_BSIZE, PLUGIN_ATTR_SERIAL_PARITY,
PLUGIN_ATTR_SERIAL_STOP, PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_MSG_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT,
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND)
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND,
PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)

# connection types for PLUGIN_ATTR_CONNECTION
CONN_NULL = '' # use base connection class without real connection functionality, for testing
Expand All @@ -107,8 +111,9 @@
PROTO_NULL = '' # use base protocol class without added functionality (why??)
PROTO_JSONRPC = 'jsonrpc' # JSON-RPC 2.0 support with send queue, msgid and resend of unanswered commands
PROTO_VIESSMANN = 'viessmann' # Viessmann P300 / KW
PROTO_RESEND = 'resend'

PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN)
PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN, PROTO_RESEND)

# item attribute suffixes (as defined with individual prefix in plugin.yaml)
ITEM_ATTR_COMMAND = '_command' # command to issue/read for the item
Expand Down
156 changes: 151 additions & 5 deletions lib/model/sdp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CONNECTION,
PLUGIN_ATTR_CONN_AUTO_CONN, PLUGIN_ATTR_CONN_CYCLE, PLUGIN_ATTR_CONN_RETRIES,
PLUGIN_ATTR_CONN_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT, PLUGIN_ATTR_MSG_TIMEOUT,
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT)
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT, PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)
from lib.model.sdp.connection import SDPConnection

from collections import OrderedDict
Expand Down Expand Up @@ -97,9 +97,12 @@ def _close(self):
self._connection.close()
self._is_connected = False

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
self.logger.debug(f'{self.__class__.__name__} _send called with {data_dict}')
return self._connection.send(data_dict)
return self._connection.send(data_dict, **kwargs)

def _check_reply(self, command, value):
return False

def _get_connection(self, use_callbacks=False, name=None):
conn_params = self._params.copy()
Expand Down Expand Up @@ -192,7 +195,7 @@ def on_data_received(self, connection, response):
"""
Handle received data
Data is handed over as byte/bytearray and needs to be converted to
Data is handed over as byte/bytearray and needs to be converted to
utf8 strings. As packets can be fragmented, all data is written into
a buffer and then checked for complete json expressions. Those are
separated, converted to dict and processed with respect to saved
Expand Down Expand Up @@ -335,7 +338,7 @@ def check_chunk(data):
else:
self.logger.debug(f'Skipping stale check {time() - self._last_stale_check} seconds after last check')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
wrapper to prepare json rpc message to send. extracts command, id, repeat and
params (data) from data_dict and call send_rpc_message(command, params, id, repeat)
Expand Down Expand Up @@ -425,3 +428,146 @@ def _send_rpc_message(self, command, ddict=None, message_id=None, repeat=0):
response = self._connection.send(ddict)
if response:
self.on_data_received('request', response)


class SDPProtocolResend(SDPProtocol):
""" Protocol supporting resend of command and checking reply_pattern
This class implements a protocol to resend commands if reply does not align with reply_pattern
"""

def __init__(self, data_received_callback, name=None, **kwargs):

# init super, get logger
super().__init__(data_received_callback, name, **kwargs)
# get relevant plugin parameters
self._send_retries = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES) or 0)
self._send_retries_cycle = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES_CYCLE) or 1)
self._sending = {}
self._sending_retries = {}
self._sending_lock = threading.Lock()

# tell someone about our actual class
self.logger.debug(f'protocol initialized from {self.__class__.__name__}')

def on_connect(self, by=None):
"""
When connecting, remove resend scheduler first. If send_retries is set > 0, add new scheduler with given cycle
"""
super().on_connect(by)
self.logger.info(f'connect called, resending queue is {self._sending}')
if self._plugin.scheduler_get('resend'):
self._plugin.scheduler_remove('resend')
self._sending = {}
if self._send_retries >= 1:
self._plugin.scheduler_add('resend', self.resend, cycle=self._send_retries_cycle)
self.logger.dbghigh(
f"Adding resend scheduler with cycle {self._send_retries_cycle}.")

def on_disconnect(self, by=None):
"""
Remove resend scheduler on disconnect
"""
if self._plugin.scheduler_get('resend'):
self._plugin.scheduler_remove('resend')
self._sending = {}
self.logger.info(f'disconnect called.')
super().on_disconnect(by)

def _send(self, data_dict, **kwargs):
"""
Send data, possibly return response
:param data_dict: dict with raw data and possible additional parameters to send
:type data_dict: dict
:param kwargs: additional information needed for checking the reply_pattern
:return: raw response data if applicable, None otherwise.
"""
self._store_commands(kwargs.get('resend_info'), data_dict)
self.logger.debug(f'Sending {data_dict}, kwargs {kwargs}')
return self._connection.send(data_dict, **kwargs)

def _store_commands(self, resend_info, data_dict):
"""
Store the command in _sending dict and the number of retries is _sending_retries dict
:param resend_info: dict with command, returnvalue and read_command
:type resend_info: dict
:param data_dict: dict with raw data and possible additional parameters to send
:type data_dict: dict
:param kwargs: additional information needed for checking the reply_pattern
:return: False by default, True if returnvalue is given in resend_info
:rtype: bool
"""
if resend_info is None:
resend_info = {}
else:
resend_info['data_dict'] = data_dict
if resend_info.get('returnvalue') is not None:
self._sending.update({resend_info.get('command'): resend_info})
if resend_info.get('command') not in self._sending_retries:
self._sending_retries.update({resend_info.get('command'): 0})
self.logger.debug(f'Saving {resend_info}, resending queue is {self._sending}')
return True
return False

def _check_reply(self, command, value):
"""
Check if the command is in _sending dict and if response is same as expected or not
:param command: name of command
:type command: str
:param value: value the command (item) should be set to
:type value: str
:return: False by default, True if received expected response
:rtype: bool
"""
returnvalue = False
if command in self._sending:
with self._sending_lock:
# getting current retries for current command
retry = self._sending_retries.get(command)
# compare the expected returnvalue with the received value after aligning the type of both values
compare = self._sending[command].get('returnvalue')
if type(compare)(value) == compare:
# if received value equals expexted value, remove command from _sending dict
self._sending.pop(command)
self._sending_retries.pop(command)
self.logger.debug(f'Got correct response for {command}, '
f'removing from send. Resending queue is {self._sending}')
returnvalue = True
elif retry is not None and retry <= self._send_retries:
# return False and log info if response is not the same as the expected response
self.logger.debug(f'Should send again {self._sending}...')
return returnvalue

def resend(self):
"""
Resend function that is scheduled with a given cycle.
Send command again if response is not as expected and retries are < given retry parameter
If expected response is not received after given retries, give up sending and query value by sending read_command
"""
if self._sending:
self.logger.debug(f"Resending queue is {self._sending}, retries {self._sending_retries}")
with self._sending_lock:
remove_commands = []
# Iterate through resend queue
for command in list(self._sending.keys()):
retry = self._sending_retries.get(command, 0)
sent = True
if retry < self._send_retries:
self.logger.debug(f'Resending {command}, retries {retry}.')
sent = self._send(self._sending[command].get("data_dict"))
self._sending_retries[command] = retry + 1
elif retry >= self._send_retries:
sent = False
if sent is False:
remove_commands.append(command)
self.logger.info(f"Giving up re-sending {command} after {retry} retries.")
if self._sending[command].get("read_cmd") is not None:
self.logger.info(f"Querying current value.")
self._send(self._sending[command].get("read_cmd"))
for command in remove_commands:
self._sending.pop(command)
self._sending_retries.pop(command)
Loading

0 comments on commit dd4ec55

Please sign in to comment.