diff --git a/lib/model/sdp/protocol.py b/lib/model/sdp/protocol.py index d53e98a96..42c63bff5 100644 --- a/lib/model/sdp/protocol.py +++ b/lib/model/sdp/protocol.py @@ -130,6 +130,10 @@ class SDPProtocolJsonrpc(SDPProtocol): As JSONRPC includes message-ids, replies can be associated to their respective queries and reply tracing and command repeat functions are implemented. + Fragmented packets need to be collected and assembled; + multiple received json packets neet to be split; + processed packets will then be returned as received data. + Data received is dispatched via callback, thus the send()-method does not return any response data. @@ -158,6 +162,8 @@ def __init__(self, data_received_callback, name=None, **kwargs): self._send_queue = queue.Queue() self._stale_lock = threading.Lock() + self._receive_buffer = '' + # make sure we have a basic set of parameters for the TCP connection self._params = {PLUGIN_ATTR_NET_HOST: '', PLUGIN_ATTR_NET_PORT: 9090, @@ -207,30 +213,56 @@ def on_disconnect(self, by=None): self._shutdown_active = False def on_data_received(self, connection, response): - self.logger.dbglow(f'data received before encode: {response}') + """ + Handle received data + + Data is handed over as byte/bytearray and needs to be converted to + utf8 strings. As packets can be fragmented, all data is written into + a buffer and then checked for complete json expressions. Those are + separated, converted to dict and processed with respect to saved + message-ids. Processed data packets are dispatched one by one via + callback. + """ + self.logger.debug(f'data received before encode: {response}') if isinstance(response, (bytes, bytearray)): response = str(response, 'utf-8').strip() - # split multi-response data into list items - self.logger.dbglow(f'data received before split: {response}') + self.logger.debug(f'adding response to buffer: {response}') + self._receive_buffer += response - try: - datalist = response.replace('}{', '}-#-{').split('-#-') - datalist = list(OrderedDict((x, True) for x in datalist).keys()) - except Exception: - datalist = [response] + datalist = [] + if '}{' in self._receive_buffer: - self.logger.dbglow(f'data received after split: {response}') + # split multi-response data into list items + try: + self.logger.debug(f'attempting to split buffer') + tmplist = self._receive_buffer.replace('}{', '}-#-{').split('-#-') + datalist = list(OrderedDict((x, True) for x in tmplist).keys()) + self._receive_buffer = '' + except Exception: + pass + elif self._receive_buffer[0] == '{' and self._receive_buffer[-1] == '}': + datalist = [self._receive_buffer] + self._receive_buffer = '' + elif self._receive_buffer: + self.logger.debug(f'Buffer with incomplete response: {self._receive_buffer}') + + if datalist: + self.logger.debug(f'received {len(datalist)} data items') # process all response items for data in datalist: - self.logger.debug(f'Processing received data item #{datalist.index(data)} ({data})') + self.logger.debug(f'Processing received data item #{datalist.index(data)}: {data}') try: jdata = json.loads(data) except Exception as err: - self.logger.warning(f'Could not json.load data item {data} with error {err}') + if data == datalist[-1] and data[-1] != '}': + self.logger.debug(f'returning incomplete data to buffer: {data}') + self._receive_buffer = data + else: + self.logger.warning(f'Could not json.load data item {data} with error {err}') continue command = None