Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sdp resend feature #667

Merged
merged 14 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions lib/model/sdp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def close(self):
self._close()
self._is_connected = False

def send(self, data_dict):
def send(self, data_dict, **kwargs):
"""
Send data, possibly return response

Expand Down Expand Up @@ -174,7 +174,7 @@ def send(self, data_dict):
self._send_lock.acquire()

if self._send_init_on_send():
response = self._send(data_dict)
response = self._send(data_dict, **kwargs)
except Exception:
raise
finally:
Expand Down Expand Up @@ -209,6 +209,12 @@ def on_disconnect(self, by=None):
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](by)

def check_reply(self, command, value):
"""
checking reply, e.g. for resend feature
"""
return self._check_reply(command, value)

#
#
# overwriting needed for at least some of the following methods...
Expand All @@ -231,14 +237,21 @@ def _close(self):
"""
self.logger.debug(f'simulating closing connection as {__name__} with params {self._params}')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
overwrite with sending of data and - possibly - returning response data
Return None if no response is received or expected.
"""
self.logger.debug(f'simulating to send data {data_dict}...')
return self.dummy

def _check_reply(self, command, value):
"""
overwrite with checking of data
Return False by default
"""
return False

def _send_init_on_open(self):
"""
This class can be overwritten if anything special is needed to make the
Expand Down Expand Up @@ -427,7 +440,7 @@ def _open(self):
def _close(self):
self.logger.debug(f'{self.__class__.__name__} closing connection as {__name__} with params {self._params}')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
url = data_dict.get('payload', None)
if not url:
self.logger.error(f'can not send without url parameter from data_dict {data_dict}, aborting')
Expand All @@ -438,7 +451,7 @@ def _send(self, data_dict):

# check for additional data
par = {}
for arg in (REQUEST_DICT_ARGS):
for arg in REQUEST_DICT_ARGS:
par[arg] = data_dict.get(arg, {})

if request_method == 'get':
Expand Down Expand Up @@ -526,7 +539,7 @@ def _close(self):
self.logger.debug(f'{self.__class__.__name__} closing connection')
self._tcp.close()

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
self._tcp.send(data_dict['payload'])

# we receive only via callback, so we return "no reply".
Expand All @@ -538,6 +551,7 @@ def _on_abort(self):
else:
self.logger.warning('suspend callback wanted, but not set by plugin. Check plugin code...')


class UDPServer(socket.socket):
"""
This class sets up a UDP unicast socket listener on local_port
Expand Down Expand Up @@ -736,7 +750,7 @@ def _close(self):
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](self)

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
send data. data_dict needs to contain the following information:

Expand Down
9 changes: 7 additions & 2 deletions lib/model/sdp/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
PLUGIN_ATTR_CONN_RETRY_CYCLE = 'retry_cycle' # if autoreconnect: how many seconds to wait between retry rounds
PLUGIN_ATTR_CONN_RETRY_SUSPD = 'retry_suspend' # after this number of failed connect cycles, activate suspend mode (if enabled)

PLUGIN_ATTR_SEND_RETRIES = 'send_retries' # how often should a command be resent (when not receiving expected answer)
PLUGIN_ATTR_SEND_RETRIES_CYCLE= 'send_retries_cycle' # if using resend protocol: how many seconds to wait between resend rounds

# network attributes
PLUGIN_ATTR_NET_HOST = 'host' # hostname / IP for network connection
PLUGIN_ATTR_NET_PORT = 'port' # port for network connection
Expand Down Expand Up @@ -88,7 +91,8 @@
PLUGIN_ATTR_CONN_RETRY_CYCLE, PLUGIN_ATTR_CONN_RETRY_SUSPD, PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT,
PLUGIN_ATTR_SERIAL_PORT, PLUGIN_ATTR_SERIAL_BAUD, PLUGIN_ATTR_SERIAL_BSIZE, PLUGIN_ATTR_SERIAL_PARITY,
PLUGIN_ATTR_SERIAL_STOP, PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_MSG_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT,
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND)
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND,
PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)

# connection types for PLUGIN_ATTR_CONNECTION
CONN_NULL = '' # use base connection class without real connection functionality, for testing
Expand All @@ -105,8 +109,9 @@
PROTO_NULL = '' # use base protocol class without added functionality (why??)
PROTO_JSONRPC = 'jsonrpc' # JSON-RPC 2.0 support with send queue, msgid and resend of unanswered commands
PROTO_VIESSMANN = 'viessmann' # Viessmann P300 / KW
PROTO_RESEND = 'resend'

PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN)
PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN, PROTO_RESEND)

# item attribute suffixes (as defined with individual prefix in plugin.yaml)
ITEM_ATTR_COMMAND = '_command' # command to issue/read for the item
Expand Down
156 changes: 151 additions & 5 deletions lib/model/sdp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CONNECTION,
PLUGIN_ATTR_CONN_AUTO_CONN, PLUGIN_ATTR_CONN_CYCLE, PLUGIN_ATTR_CONN_RETRIES,
PLUGIN_ATTR_CONN_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT, PLUGIN_ATTR_MSG_TIMEOUT,
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT)
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT, PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)
from lib.model.sdp.connection import SDPConnection

from collections import OrderedDict
Expand Down Expand Up @@ -97,9 +97,12 @@ def _close(self):
self._connection.close()
self._is_connected = False

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
self.logger.debug(f'{self.__class__.__name__} _send called with {data_dict}')
return self._connection.send(data_dict)
return self._connection.send(data_dict, **kwargs)

def _check_reply(self, command, value):
return False

def _get_connection(self, use_callbacks=False, name=None):
conn_params = self._params.copy()
Expand Down Expand Up @@ -192,7 +195,7 @@ def on_data_received(self, connection, response):
"""
Handle received data

Data is handed over as byte/bytearray and needs to be converted to
Data is handed over as byte/bytearray and needs to be converted to
utf8 strings. As packets can be fragmented, all data is written into
a buffer and then checked for complete json expressions. Those are
separated, converted to dict and processed with respect to saved
Expand Down Expand Up @@ -335,7 +338,7 @@ def check_chunk(data):
else:
self.logger.debug(f'Skipping stale check {time() - self._last_stale_check} seconds after last check')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
wrapper to prepare json rpc message to send. extracts command, id, repeat and
params (data) from data_dict and call send_rpc_message(command, params, id, repeat)
Expand Down Expand Up @@ -425,3 +428,146 @@ def _send_rpc_message(self, command, ddict=None, message_id=None, repeat=0):
response = self._connection.send(ddict)
if response:
self.on_data_received('request', response)


class SDPProtocolResend(SDPProtocol):
""" Protocol supporting resend of command and checking reply_pattern

This class implements a protocol to resend commands if reply does not align with reply_pattern

"""

def __init__(self, data_received_callback, name=None, **kwargs):

# init super, get logger
super().__init__(data_received_callback, name, **kwargs)
# get relevant plugin parameters
self._send_retries = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES) or 0)
self._send_retries_cycle = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES_CYCLE) or 1)
self._sending = {}
self._sending_retries = {}
self._sending_lock = threading.Lock()

# tell someone about our actual class
self.logger.debug(f'protocol initialized from {self.__class__.__name__}')

def on_connect(self, by=None):
"""
When connecting, remove resend scheduler first. If send_retries is set > 0, add new scheduler with given cycle
"""
super().on_connect(by)
self.logger.info(f'connect called, resending queue is {self._sending}')
if self._plugin.scheduler_get('resend'):
self._plugin.scheduler_remove('resend')
self._sending = {}
if self._send_retries >= 1:
self._plugin.scheduler_add('resend', self.resend, cycle=self._send_retries_cycle)
self.logger.dbghigh(
f"Adding resend scheduler with cycle {self._send_retries_cycle}.")

def on_disconnect(self, by=None):
"""
Remove resend scheduler on disconnect
"""
if self._plugin.scheduler_get('resend'):
self._plugin.scheduler_remove('resend')
self._sending = {}
self.logger.info(f'disconnect called.')
super().on_disconnect(by)

def _send(self, data_dict, **kwargs):
"""
Send data, possibly return response

:param data_dict: dict with raw data and possible additional parameters to send
:type data_dict: dict
:param kwargs: additional information needed for checking the reply_pattern
:return: raw response data if applicable, None otherwise.
"""
self._store_commands(kwargs.get('resend_info'), data_dict)
self.logger.debug(f'Sending {data_dict}, kwargs {kwargs}')
return self._connection.send(data_dict, **kwargs)

def _store_commands(self, resend_info, data_dict):
"""
Store the command in _sending dict and the number of retries is _sending_retries dict

:param resend_info: dict with command, returnvalue and read_command
:type resend_info: dict
:param data_dict: dict with raw data and possible additional parameters to send
:type data_dict: dict
:param kwargs: additional information needed for checking the reply_pattern
:return: False by default, True if returnvalue is given in resend_info
:rtype: bool
"""
if resend_info is None:
resend_info = {}
else:
resend_info['data_dict'] = data_dict
if resend_info.get('returnvalue') is not None:
self._sending.update({resend_info.get('command'): resend_info})
if resend_info.get('command') not in self._sending_retries:
self._sending_retries.update({resend_info.get('command'): 0})
self.logger.debug(f'Saving {resend_info}, resending queue is {self._sending}')
return True
return False

def _check_reply(self, command, value):
"""
Check if the command is in _sending dict and if response is same as expected or not

:param command: name of command
:type command: str
:param value: value the command (item) should be set to
:type value: str
:return: False by default, True if received expected response
:rtype: bool
"""
returnvalue = False
if command in self._sending:
with self._sending_lock:
# getting current retries for current command
retry = self._sending_retries.get(command)
# compare the expected returnvalue with the received value after aligning the type of both values
compare = self._sending[command].get('returnvalue')
if type(compare)(value) == compare:
# if received value equals expexted value, remove command from _sending dict
self._sending.pop(command)
self._sending_retries.pop(command)
self.logger.debug(f'Got correct response for {command}, '
f'removing from send. Resending queue is {self._sending}')
returnvalue = True
elif retry is not None and retry <= self._send_retries:
# return False and log info if response is not the same as the expected response
self.logger.debug(f'Should send again {self._sending}...')
return returnvalue

def resend(self):
"""
Resend function that is scheduled with a given cycle.
Send command again if response is not as expected and retries are < given retry parameter
If expected response is not received after given retries, give up sending and query value by sending read_command
"""
if self._sending:
self.logger.debug(f"Resending queue is {self._sending}, retries {self._sending_retries}")
with self._sending_lock:
remove_commands = []
# Iterate through resend queue
for command in list(self._sending.keys()):
retry = self._sending_retries.get(command, 0)
sent = True
if retry < self._send_retries:
self.logger.debug(f'Resending {command}, retries {retry}.')
sent = self._send(self._sending[command].get("data_dict"))
self._sending_retries[command] = retry + 1
elif retry >= self._send_retries:
sent = False
if sent is False:
remove_commands.append(command)
self.logger.info(f"Giving up re-sending {command} after {retry} retries.")
if self._sending[command].get("read_cmd") is not None:
self.logger.info(f"Querying current value.")
self._send(self._sending[command].get("read_cmd"))
for command in remove_commands:
self._sending.pop(command)
self._sending_retries.pop(command)
Loading
Loading