diff --git a/lib/model/sdp/connection.py b/lib/model/sdp/connection.py index 9fa52abcf..78e401f59 100644 --- a/lib/model/sdp/connection.py +++ b/lib/model/sdp/connection.py @@ -147,7 +147,7 @@ def close(self): self._close() self._is_connected = False - def send(self, data_dict): + def send(self, data_dict, **kwargs): """ Send data, possibly return response @@ -175,7 +175,7 @@ def send(self, data_dict): self._send_lock.acquire() if self._send_init_on_send(): - response = self._send(data_dict) + response = self._send(data_dict, **kwargs) except Exception: raise finally: @@ -210,6 +210,12 @@ def on_disconnect(self, by=None): if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]: self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](by) + def check_reply(self, command, value): + """ + checking reply, e.g. for resend feature + """ + return self._check_reply(command, value) + # # # overwriting needed for at least some of the following methods... @@ -232,7 +238,7 @@ def _close(self): """ self.logger.debug(f'simulating closing connection as {__name__} with params {self._params}') - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): """ overwrite with sending of data and - possibly - returning response data Return None if no response is received or expected. @@ -240,6 +246,13 @@ def _send(self, data_dict): self.logger.debug(f'simulating to send data {data_dict}...') return self.dummy + def _check_reply(self, command, value): + """ + overwrite with checking of data + Return False by default + """ + return False + def _send_init_on_open(self): """ This class can be overwritten if anything special is needed to make the @@ -428,7 +441,7 @@ def _open(self): def _close(self): self.logger.debug(f'{self.__class__.__name__} closing connection as {__name__} with params {self._params}') - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): url = data_dict.get('payload', None) if not url: self.logger.error(f'can not send without url parameter from data_dict {data_dict}, aborting') @@ -439,7 +452,7 @@ def _send(self, data_dict): # check for additional data par = {} - for arg in (REQUEST_DICT_ARGS): + for arg in REQUEST_DICT_ARGS: par[arg] = data_dict.get(arg, {}) if request_method == 'get': @@ -527,7 +540,7 @@ def _close(self): self.logger.debug(f'{self.__class__.__name__} closing connection') self._tcp.close() - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): self._tcp.send(data_dict['payload']) # we receive only via callback, so we return "no reply". @@ -539,6 +552,7 @@ def _on_abort(self): else: self.logger.warning('suspend callback wanted, but not set by plugin. Check plugin code...') + class UDPServer(socket.socket): """ This class sets up a UDP unicast socket listener on local_port @@ -737,7 +751,7 @@ def _close(self): if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]: self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](self) - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): """ send data. data_dict needs to contain the following information: diff --git a/lib/model/sdp/globals.py b/lib/model/sdp/globals.py index bbb9bb52e..b24053549 100644 --- a/lib/model/sdp/globals.py +++ b/lib/model/sdp/globals.py @@ -62,6 +62,9 @@ PLUGIN_ATTR_CONN_RETRY_CYCLE = 'retry_cycle' # if autoreconnect: how many seconds to wait between retry rounds PLUGIN_ATTR_CONN_RETRY_SUSPD = 'retry_suspend' # after this number of failed connect cycles, activate suspend mode (if enabled) +PLUGIN_ATTR_SEND_RETRIES = 'send_retries' # how often should a command be resent (when not receiving expected answer) +PLUGIN_ATTR_SEND_RETRIES_CYCLE= 'send_retries_cycle' # if using resend protocol: how many seconds to wait between resend rounds + # network attributes PLUGIN_ATTR_NET_HOST = 'host' # hostname / IP for network connection PLUGIN_ATTR_NET_PORT = 'port' # port for network connection @@ -90,7 +93,8 @@ PLUGIN_ATTR_CONN_RETRY_CYCLE, PLUGIN_ATTR_CONN_RETRY_SUSPD, PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT, PLUGIN_ATTR_SERIAL_PORT, PLUGIN_ATTR_SERIAL_BAUD, PLUGIN_ATTR_SERIAL_BSIZE, PLUGIN_ATTR_SERIAL_PARITY, PLUGIN_ATTR_SERIAL_STOP, PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_MSG_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT, - PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND) + PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND, + PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE) # connection types for PLUGIN_ATTR_CONNECTION CONN_NULL = '' # use base connection class without real connection functionality, for testing @@ -107,8 +111,9 @@ PROTO_NULL = '' # use base protocol class without added functionality (why??) PROTO_JSONRPC = 'jsonrpc' # JSON-RPC 2.0 support with send queue, msgid and resend of unanswered commands PROTO_VIESSMANN = 'viessmann' # Viessmann P300 / KW +PROTO_RESEND = 'resend' -PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN) +PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN, PROTO_RESEND) # item attribute suffixes (as defined with individual prefix in plugin.yaml) ITEM_ATTR_COMMAND = '_command' # command to issue/read for the item diff --git a/lib/model/sdp/protocol.py b/lib/model/sdp/protocol.py index 7e6d7d33b..325275fb9 100644 --- a/lib/model/sdp/protocol.py +++ b/lib/model/sdp/protocol.py @@ -31,7 +31,7 @@ PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CONNECTION, PLUGIN_ATTR_CONN_AUTO_CONN, PLUGIN_ATTR_CONN_CYCLE, PLUGIN_ATTR_CONN_RETRIES, PLUGIN_ATTR_CONN_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT, PLUGIN_ATTR_MSG_TIMEOUT, - PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT) + PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT, PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE) from lib.model.sdp.connection import SDPConnection from collections import OrderedDict @@ -97,9 +97,12 @@ def _close(self): self._connection.close() self._is_connected = False - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): self.logger.debug(f'{self.__class__.__name__} _send called with {data_dict}') - return self._connection.send(data_dict) + return self._connection.send(data_dict, **kwargs) + + def _check_reply(self, command, value): + return False def _get_connection(self, use_callbacks=False, name=None): conn_params = self._params.copy() @@ -192,7 +195,7 @@ def on_data_received(self, connection, response): """ Handle received data - Data is handed over as byte/bytearray and needs to be converted to + Data is handed over as byte/bytearray and needs to be converted to utf8 strings. As packets can be fragmented, all data is written into a buffer and then checked for complete json expressions. Those are separated, converted to dict and processed with respect to saved @@ -335,7 +338,7 @@ def check_chunk(data): else: self.logger.debug(f'Skipping stale check {time() - self._last_stale_check} seconds after last check') - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): """ wrapper to prepare json rpc message to send. extracts command, id, repeat and params (data) from data_dict and call send_rpc_message(command, params, id, repeat) @@ -425,3 +428,146 @@ def _send_rpc_message(self, command, ddict=None, message_id=None, repeat=0): response = self._connection.send(ddict) if response: self.on_data_received('request', response) + + +class SDPProtocolResend(SDPProtocol): + """ Protocol supporting resend of command and checking reply_pattern + + This class implements a protocol to resend commands if reply does not align with reply_pattern + + """ + + def __init__(self, data_received_callback, name=None, **kwargs): + + # init super, get logger + super().__init__(data_received_callback, name, **kwargs) + # get relevant plugin parameters + self._send_retries = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES) or 0) + self._send_retries_cycle = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES_CYCLE) or 1) + self._sending = {} + self._sending_retries = {} + self._sending_lock = threading.Lock() + + # tell someone about our actual class + self.logger.debug(f'protocol initialized from {self.__class__.__name__}') + + def on_connect(self, by=None): + """ + When connecting, remove resend scheduler first. If send_retries is set > 0, add new scheduler with given cycle + """ + super().on_connect(by) + self.logger.info(f'connect called, resending queue is {self._sending}') + if self._plugin.scheduler_get('resend'): + self._plugin.scheduler_remove('resend') + self._sending = {} + if self._send_retries >= 1: + self._plugin.scheduler_add('resend', self.resend, cycle=self._send_retries_cycle) + self.logger.dbghigh( + f"Adding resend scheduler with cycle {self._send_retries_cycle}.") + + def on_disconnect(self, by=None): + """ + Remove resend scheduler on disconnect + """ + if self._plugin.scheduler_get('resend'): + self._plugin.scheduler_remove('resend') + self._sending = {} + self.logger.info(f'disconnect called.') + super().on_disconnect(by) + + def _send(self, data_dict, **kwargs): + """ + Send data, possibly return response + + :param data_dict: dict with raw data and possible additional parameters to send + :type data_dict: dict + :param kwargs: additional information needed for checking the reply_pattern + :return: raw response data if applicable, None otherwise. + """ + self._store_commands(kwargs.get('resend_info'), data_dict) + self.logger.debug(f'Sending {data_dict}, kwargs {kwargs}') + return self._connection.send(data_dict, **kwargs) + + def _store_commands(self, resend_info, data_dict): + """ + Store the command in _sending dict and the number of retries is _sending_retries dict + + :param resend_info: dict with command, returnvalue and read_command + :type resend_info: dict + :param data_dict: dict with raw data and possible additional parameters to send + :type data_dict: dict + :param kwargs: additional information needed for checking the reply_pattern + :return: False by default, True if returnvalue is given in resend_info + :rtype: bool + """ + if resend_info is None: + resend_info = {} + else: + resend_info['data_dict'] = data_dict + if resend_info.get('returnvalue') is not None: + self._sending.update({resend_info.get('command'): resend_info}) + if resend_info.get('command') not in self._sending_retries: + self._sending_retries.update({resend_info.get('command'): 0}) + self.logger.debug(f'Saving {resend_info}, resending queue is {self._sending}') + return True + return False + + def _check_reply(self, command, value): + """ + Check if the command is in _sending dict and if response is same as expected or not + + :param command: name of command + :type command: str + :param value: value the command (item) should be set to + :type value: str + :return: False by default, True if received expected response + :rtype: bool + """ + returnvalue = False + if command in self._sending: + with self._sending_lock: + # getting current retries for current command + retry = self._sending_retries.get(command) + # compare the expected returnvalue with the received value after aligning the type of both values + compare = self._sending[command].get('returnvalue') + if type(compare)(value) == compare: + # if received value equals expexted value, remove command from _sending dict + self._sending.pop(command) + self._sending_retries.pop(command) + self.logger.debug(f'Got correct response for {command}, ' + f'removing from send. Resending queue is {self._sending}') + returnvalue = True + elif retry is not None and retry <= self._send_retries: + # return False and log info if response is not the same as the expected response + self.logger.debug(f'Should send again {self._sending}...') + return returnvalue + + def resend(self): + """ + Resend function that is scheduled with a given cycle. + Send command again if response is not as expected and retries are < given retry parameter + If expected response is not received after given retries, give up sending and query value by sending read_command + """ + if self._sending: + self.logger.debug(f"Resending queue is {self._sending}, retries {self._sending_retries}") + with self._sending_lock: + remove_commands = [] + # Iterate through resend queue + for command in list(self._sending.keys()): + retry = self._sending_retries.get(command, 0) + sent = True + if retry < self._send_retries: + self.logger.debug(f'Resending {command}, retries {retry}.') + sent = self._send(self._sending[command].get("data_dict")) + self._sending_retries[command] = retry + 1 + elif retry >= self._send_retries: + sent = False + if sent is False: + remove_commands.append(command) + self.logger.info(f"Giving up re-sending {command} after {retry} retries.") + if self._sending[command].get("read_cmd") is not None: + self.logger.info(f"Querying current value.") + self._send(self._sending[command].get("read_cmd")) + for command in remove_commands: + self._sending.pop(command) + self._sending_retries.pop(command) diff --git a/lib/model/smartdeviceplugin.py b/lib/model/smartdeviceplugin.py index ffa930b1b..edf8885e3 100644 --- a/lib/model/smartdeviceplugin.py +++ b/lib/model/smartdeviceplugin.py @@ -56,7 +56,8 @@ PLUGIN_ATTR_CMD_CLASS, PLUGIN_ATTR_CONNECTION, PLUGIN_ATTR_SUSPEND_ITEM, PLUGIN_ATTR_CONN_AUTO_RECONN, PLUGIN_ATTR_CONN_AUTO_CONN, PLUGIN_ATTR_REREAD_INITIAL, PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_RECURSIVE, PLUGIN_PATH, PLUGIN_ATTR_CYCLE, - PLUGIN_ATTR_CB_SUSPEND, CMD_IATTR_CYCLIC, ITEM_ATTR_READAFTERWRITE, ITEM_ATTR_CYCLIC) + PLUGIN_ATTR_CB_SUSPEND, CMD_IATTR_CYCLIC, ITEM_ATTR_READAFTERWRITE, ITEM_ATTR_CYCLIC, + PROTO_RESEND,PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE) from lib.model.sdp.commands import SDPCommands from lib.model.sdp.command import SDPCommand @@ -191,6 +192,20 @@ def __init__(self, sh, logger=None, **kwargs): self._shtime = Shtime.get_instance() + #resend + self._parameters[PLUGIN_ATTR_SEND_RETRIES] = self.get_parameter_value(PLUGIN_ATTR_SEND_RETRIES) + self._parameters[PLUGIN_ATTR_SEND_RETRIES_CYCLE] = self.get_parameter_value(PLUGIN_ATTR_SEND_RETRIES_CYCLE) + + resend = self._parameters.get(PLUGIN_ATTR_SEND_RETRIES, 0) or 0 + protocol = self._parameters.get(PLUGIN_ATTR_PROTOCOL) + if resend > 0: + # Set protocol to resend if send_retries is > 0 and protocol is not defined + if not protocol: + self._parameters[PLUGIN_ATTR_PROTOCOL] = 'resend' + # if send_retries is set and protocl is not set to resend, log info that protocol is overruling the parameter + elif protocol != 'resend': + self.logger.info(f'send_retries is set to {resend}, however, protocol is overruled to {protocol}') + # init parameters in standalone mode if SDP_standalone: self._parameters = kwargs @@ -766,14 +781,25 @@ def send_command(self, command, value=None, return_result=False, **kwargs): data_dict = self._transform_send_data(data_dict, **kwargs) self.logger.debug(f'command {command} with value {value} yielded send data_dict {data_dict}') - # if an error occurs on sending, an exception is thrown "below" + # creating resend info, necessary for resend protocol result = None + reply_pattern = self._commands.get_commandlist(command).get('reply_pattern') + read_cmd = self._transform_send_data(self._commands.get_send_data(command, None)) + # if no reply_pattern given, no response is expected + if reply_pattern is None: + resend_info = {'command': command, 'returnvalue': None, 'read_cmd': read_cmd} + # if no reply_pattern has lookup or capture group, put it in resend_info + elif '(' not in reply_pattern and '{' not in reply_pattern: + resend_info = {'command': command, 'returnvalue': reply_pattern, 'read_cmd': read_cmd} + # if reply pattern does not expect a specific value, use value as expected reply + else: + resend_info = {'command': command, 'returnvalue': value, 'read_cmd': read_cmd} + # if an error occurs on sending, an exception is thrown "below" try: - result = self._send(data_dict) + result = self._send(data_dict, resend_info=resend_info) except (RuntimeError, OSError) as e: # Exception as e: self.logger.debug(f'error on sending command {command}, error was {e}') return False - if result: by = kwargs.get('by') self.logger.debug(f'command {command} received result {result} by {by}') @@ -836,6 +862,7 @@ def on_data_received(self, by, data, command=None): else: if custom: command = command + CUSTOM_SEP + custom + self._connection.check_reply(command, value) # needed for resend protocol self._dispatch_callback(command, value, by) self._process_additional_data(command, data, value, custom, by) @@ -997,7 +1024,7 @@ def _do_before_send(self, command, value, kwargs): return (True, True) # return (False, True) - def _send(self, data_dict): + def _send(self, data_dict, **kwargs): """ This method acts as a overwritable intermediate between the handling logic of send_command() and the connection layer. @@ -1007,8 +1034,8 @@ def _send(self, data_dict): By default, this just forwards the data_dict to the connection instance and return the result. """ - self.logger.debug(f'sending {data_dict}') - return self._connection.send(data_dict) + self.logger.debug(f'sending {data_dict}, kwargs {kwargs}') + return self._connection.send(data_dict, **kwargs) def on_connect(self, by=None): """ callback if connection is made. """ @@ -1730,7 +1757,7 @@ def create_struct_yaml(self): self.yaml['item_structs'] = OrderedDict() - # this means the commands dict has 'ALL' and model names at the top level + # this means the commands dict has 'ALL' and model names at the top level # otherwise, the top level nodes are commands or sections cmds_has_models = INDEX_GENERIC in top_level_entries