Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sdp resend feature #667

Merged
merged 14 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions lib/model/sdp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def close(self):
self._close()
self._is_connected = False

def send(self, data_dict):
def send(self, data_dict, **kwargs):
"""
Send data, possibly return response

Expand Down Expand Up @@ -174,7 +174,7 @@ def send(self, data_dict):
self._send_lock.acquire()

if self._send_init_on_send():
response = self._send(data_dict)
response = self._send(data_dict, **kwargs)
except Exception:
raise
finally:
Expand Down Expand Up @@ -209,6 +209,12 @@ def on_disconnect(self, by=None):
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](by)

def check_command(self, command, value):
onkelandy marked this conversation as resolved.
Show resolved Hide resolved
"""
checking commands, e.g. for resend feature
"""
return self._check_command(command, value)

#
#
# overwriting needed for at least some of the following methods...
Expand All @@ -231,14 +237,21 @@ def _close(self):
"""
self.logger.debug(f'simulating closing connection as {__name__} with params {self._params}')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
overwrite with sending of data and - possibly - returning response data
Return None if no response is received or expected.
"""
self.logger.debug(f'simulating to send data {data_dict}...')
return self.dummy

def _check_command(self, command, value):
onkelandy marked this conversation as resolved.
Show resolved Hide resolved
"""
overwrite with checking of data
Return False by default
"""
return False

def _send_init_on_open(self):
"""
This class can be overwritten if anything special is needed to make the
Expand Down Expand Up @@ -427,7 +440,7 @@ def _open(self):
def _close(self):
self.logger.debug(f'{self.__class__.__name__} closing connection as {__name__} with params {self._params}')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
url = data_dict.get('payload', None)
if not url:
self.logger.error(f'can not send without url parameter from data_dict {data_dict}, aborting')
Expand All @@ -438,7 +451,7 @@ def _send(self, data_dict):

# check for additional data
par = {}
for arg in (REQUEST_DICT_ARGS):
for arg in REQUEST_DICT_ARGS:
par[arg] = data_dict.get(arg, {})

if request_method == 'get':
Expand Down Expand Up @@ -526,7 +539,7 @@ def _close(self):
self.logger.debug(f'{self.__class__.__name__} closing connection')
self._tcp.close()

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
self._tcp.send(data_dict['payload'])

# we receive only via callback, so we return "no reply".
Expand All @@ -538,6 +551,7 @@ def _on_abort(self):
else:
self.logger.warning('suspend callback wanted, but not set by plugin. Check plugin code...')


class UDPServer(socket.socket):
"""
This class sets up a UDP unicast socket listener on local_port
Expand Down Expand Up @@ -736,7 +750,7 @@ def _close(self):
if self._params[PLUGIN_ATTR_CB_ON_DISCONNECT]:
self._params[PLUGIN_ATTR_CB_ON_DISCONNECT](self)

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
send data. data_dict needs to contain the following information:

Expand Down
9 changes: 7 additions & 2 deletions lib/model/sdp/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
PLUGIN_ATTR_CONN_RETRY_CYCLE = 'retry_cycle' # if autoreconnect: how many seconds to wait between retry rounds
PLUGIN_ATTR_CONN_RETRY_SUSPD = 'retry_suspend' # after this number of failed connect cycles, activate suspend mode (if enabled)

PLUGIN_ATTR_SEND_RETRIES = 'send_retries' # how often should a command be resent (when not receiving expected answer)
PLUGIN_ATTR_SEND_RETRIES_CYCLE= 'send_retries_cycle' # if using resend protocol: how many seconds to wait between resend rounds

# network attributes
PLUGIN_ATTR_NET_HOST = 'host' # hostname / IP for network connection
PLUGIN_ATTR_NET_PORT = 'port' # port for network connection
Expand Down Expand Up @@ -88,7 +91,8 @@
PLUGIN_ATTR_CONN_RETRY_CYCLE, PLUGIN_ATTR_CONN_RETRY_SUSPD, PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT,
PLUGIN_ATTR_SERIAL_PORT, PLUGIN_ATTR_SERIAL_BAUD, PLUGIN_ATTR_SERIAL_BSIZE, PLUGIN_ATTR_SERIAL_PARITY,
PLUGIN_ATTR_SERIAL_STOP, PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_MSG_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT,
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND)
PLUGIN_ATTR_CB_ON_CONNECT, PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CB_SUSPEND,
PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)

# connection types for PLUGIN_ATTR_CONNECTION
CONN_NULL = '' # use base connection class without real connection functionality, for testing
Expand All @@ -105,8 +109,9 @@
PROTO_NULL = '' # use base protocol class without added functionality (why??)
PROTO_JSONRPC = 'jsonrpc' # JSON-RPC 2.0 support with send queue, msgid and resend of unanswered commands
PROTO_VIESSMANN = 'viessmann' # Viessmann P300 / KW
PROTO_RESEND = 'resend'

PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN)
PROTOCOL_TYPES = (PROTO_NULL, PROTO_JSONRPC, PROTO_VIESSMANN, PROTO_RESEND)

# item attribute suffixes (as defined with individual prefix in plugin.yaml)
ITEM_ATTR_COMMAND = '_command' # command to issue/read for the item
Expand Down
121 changes: 117 additions & 4 deletions lib/model/sdp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
PLUGIN_ATTR_CB_ON_DISCONNECT, PLUGIN_ATTR_CONNECTION,
PLUGIN_ATTR_CONN_AUTO_CONN, PLUGIN_ATTR_CONN_CYCLE, PLUGIN_ATTR_CONN_RETRIES,
PLUGIN_ATTR_CONN_TIMEOUT, PLUGIN_ATTR_MSG_REPEAT, PLUGIN_ATTR_MSG_TIMEOUT,
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT)
PLUGIN_ATTR_NET_HOST, PLUGIN_ATTR_NET_PORT, PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)
from lib.model.sdp.connection import SDPConnection

from collections import OrderedDict
Expand Down Expand Up @@ -97,9 +97,12 @@ def _close(self):
self._connection.close()
self._is_connected = False

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
self.logger.debug(f'{self.__class__.__name__} _send called with {data_dict}')
return self._connection.send(data_dict)
return self._connection.send(data_dict, **kwargs)

def _check_command(self, command, value):
onkelandy marked this conversation as resolved.
Show resolved Hide resolved
return False

def _get_connection(self, use_callbacks=False, name=None):
conn_params = self._params.copy()
Expand Down Expand Up @@ -335,7 +338,7 @@ def check_chunk(data):
else:
self.logger.debug(f'Skipping stale check {time() - self._last_stale_check} seconds after last check')

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
wrapper to prepare json rpc message to send. extracts command, id, repeat and
params (data) from data_dict and call send_rpc_message(command, params, id, repeat)
Expand Down Expand Up @@ -425,3 +428,113 @@ def _send_rpc_message(self, command, ddict=None, message_id=None, repeat=0):
response = self._connection.send(ddict)
if response:
self.on_data_received('request', response)


class SDPProtocolResend(SDPProtocol):
""" Protocol providing resend option


"""

def __init__(self, data_received_callback, name=None, **kwargs):

# init super, get logger
super().__init__(data_received_callback, name, **kwargs)

self._send_retries = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES) or 0)
self._send_retries_cycle = int(self._params.get(PLUGIN_ATTR_SEND_RETRIES_CYCLE) or 1)
self._sending = {}
self._sending_retries = {}
self._sending_lock = threading.Lock()

# tell someone about our actual class
self.logger.debug(f'protocol initialized from {self.__class__.__name__}')

def on_connect(self, by=None):
super().on_connect(by)
self.logger.info(f'connect called, retry_sends {self._sending}')
if self._plugin.scheduler_get('resend'):
self._plugin.scheduler_remove('resend')
self._sending = {}
if self._send_retries >= 1:
self._plugin.scheduler_add('resend', self.resend, cycle=self._send_retries_cycle)
self.logger.dbghigh(
f"Adding resend scheduler with cycle {self._send_retries_cycle}.")

def on_disconnect(self, by=None):
if self._plugin.scheduler_get('resend'):
self._plugin.scheduler_remove('resend')
self._sending = {}
self.logger.info(f'disconnect called, retry_sends {self._sending}')
super().on_disconnect(by)

def _send(self, data_dict, **kwargs):
"""
This method acts as a overwritable intermediate between the handling
logic of send_command() and the connection layer.
If you need any special arrangements for or reaction to events on sending,
you can implement this method in your plugin class.

By default, this just forwards the data_dict to the connection instance
and return the result.
"""
self._store_commands(kwargs.get('resend_info'), data_dict)
self.logger.debug(f'sending {data_dict}, kwargs {kwargs}')
return self._connection.send(data_dict, **kwargs)

def _store_commands(self, resend_info, data_dict):
"""
overwrite with storing of data
Return None by default
"""
if resend_info is None:
resend_info = {}
else:
resend_info['data_dict'] = data_dict
if resend_info.get('returnvalue') is not None:
self._sending.update({resend_info.get('command'): resend_info})
if resend_info.get('command') not in self._sending_retries:
self._sending_retries.update({resend_info.get('command'): 0})
self.logger.debug(f'saving {resend_info}, self._sending {self._sending}')
return True
return False

def _check_command(self, command, value):
returnvalue = False
if command in self._sending:
with self._sending_lock:
retry = self._sending_retries.get(command)
compare = self._sending[command].get('returnvalue')
if type(compare)(value) == compare:
self._sending.pop(command)
self._sending_retries.pop(command)
self.logger.debug(f'Correct answer for {command}, removing from send. Sending {self._sending}')
returnvalue = True
elif retry is not None and retry <= self._send_retries:
self.logger.debug(f'Should send again {self._sending}...')

return returnvalue

def resend(self):
if self._sending:
self.logger.debug(f"resending queue is {self._sending} retries {self._sending_retries}")
with self._sending_lock:
remove_commands = []
for command in list(self._sending.keys()):
retry = self._sending_retries.get(command, 0)
sent = True
if retry < self._send_retries:
self.logger.debug(f'Re-sending {command}, retry {retry}.')
sent = self._send(self._sending[command].get("data_dict"))
self._sending_retries[command] = retry + 1
elif retry >= self._send_retries:
sent = False
if sent is False:
remove_commands.append(command)
self.logger.info(f"Giving up re-sending {command} after {retry} retries.")
if self._sending[command].get("read_cmd") is not None:
self.logger.info(f"Querying current value.")
self._send(self._sending[command].get("read_cmd"))
for command in remove_commands:
self._sending.pop(command)
self._sending_retries.pop(command)
28 changes: 22 additions & 6 deletions lib/model/smartdeviceplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
PLUGIN_ATTR_CMD_CLASS, PLUGIN_ATTR_CONNECTION, PLUGIN_ATTR_SUSPEND_ITEM,
PLUGIN_ATTR_CONN_AUTO_RECONN, PLUGIN_ATTR_CONN_AUTO_CONN,
PLUGIN_ATTR_PROTOCOL, PLUGIN_ATTR_RECURSIVE, PLUGIN_PATH, PLUGIN_ATTR_CYCLE,
PLUGIN_ATTR_CB_SUSPEND, CMD_IATTR_CYCLIC, ITEM_ATTR_READAFTERWRITE, ITEM_ATTR_CYCLIC)
PLUGIN_ATTR_CB_SUSPEND, CMD_IATTR_CYCLIC, ITEM_ATTR_READAFTERWRITE, ITEM_ATTR_CYCLIC,
PROTO_RESEND,PLUGIN_ATTR_SEND_RETRIES, PLUGIN_ATTR_SEND_RETRIES_CYCLE)

from lib.model.sdp.commands import SDPCommands
from lib.model.sdp.command import SDPCommand
Expand Down Expand Up @@ -185,6 +186,12 @@ def __init__(self, sh, logger=None, **kwargs):

self._shtime = Shtime.get_instance()

#resend
self._parameters[PLUGIN_ATTR_SEND_RETRIES] = self.get_parameter_value(PLUGIN_ATTR_SEND_RETRIES)
self._parameters[PLUGIN_ATTR_SEND_RETRIES_CYCLE] = self.get_parameter_value(PLUGIN_ATTR_SEND_RETRIES_CYCLE)
# Set protocol to resend if send_retries is > 0
if self._parameters.get(PLUGIN_ATTR_SEND_RETRIES, 0) > 0:
onkelandy marked this conversation as resolved.
Show resolved Hide resolved
self._parameters[PLUGIN_ATTR_PROTOCOL] = 'resend'
# init parameters in standalone mode
if SDP_standalone:
self._parameters = kwargs
Expand Down Expand Up @@ -724,12 +731,20 @@ def send_command(self, command, value=None, return_result=False, **kwargs):

# if an error occurs on sending, an exception is thrown "below"
result = None
reply_pattern = self._commands.get_commandlist(command).get('reply_pattern')
read_cmd = self._transform_send_data(self._commands.get_send_data(command, None))
if reply_pattern is None:
resend_info = {'command': command, 'returnvalue': None, 'read_cmd': read_cmd}
elif not any(x in reply_pattern for x in ['(', '{']):
onkelandy marked this conversation as resolved.
Show resolved Hide resolved
resend_info = {'command': command, 'returnvalue': reply_pattern, 'read_cmd': read_cmd}
else:
resend_info = {'command': command, 'returnvalue': value, 'read_cmd': read_cmd}

try:
result = self._send(data_dict)
result = self._send(data_dict, resend_info=resend_info)
except (RuntimeError, OSError) as e: # Exception as e:
self.logger.debug(f'error on sending command {command}, error was {e}')
return False

if result:
by = kwargs.get('by')
self.logger.debug(f'command {command} received result {result} by {by}')
Expand Down Expand Up @@ -792,6 +807,7 @@ def on_data_received(self, by, data, command=None):
else:
if custom:
command = command + CUSTOM_SEP + custom
self._connection.check_command(command, value)
self._dispatch_callback(command, value, by)
self._process_additional_data(command, data, value, custom, by)

Expand Down Expand Up @@ -953,7 +969,7 @@ def _do_before_send(self, command, value, kwargs):
return (True, True)
# return (False, True)

def _send(self, data_dict):
def _send(self, data_dict, **kwargs):
"""
This method acts as a overwritable intermediate between the handling
logic of send_command() and the connection layer.
Expand All @@ -963,8 +979,8 @@ def _send(self, data_dict):
By default, this just forwards the data_dict to the connection instance
and return the result.
"""
self.logger.debug(f'sending {data_dict}')
return self._connection.send(data_dict)
self.logger.debug(f'sending {data_dict}, kwargs {kwargs}')
return self._connection.send(data_dict, **kwargs)

def on_connect(self, by=None):
""" callback if connection is made. """
Expand Down
Loading