From 9c6f7a22da9c4e1e2c1ac3efc7b918849b26044e Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 2 Dec 2024 12:21:18 +0200 Subject: [PATCH] Updated handling status of rpcs --- .../connectors/mqtt/mqtt_connector.py | 20 +++++++++++++------ .../connectors/mqtt/mqtt_decorators.py | 2 +- .../gateway/tb_gateway_service.py | 15 +++++++------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 5d84b7e0e..c52555090 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -919,19 +919,20 @@ def __process_rpc_request(self, content, rpc_config): try: self.__log.info("Publishing to: %s with data %s", request_topic, data_to_send) + result = None try: - self._publish(request_topic, data_to_send, rpc_config.get('retain', False)) + result = self._publish(request_topic, data_to_send, rpc_config.get('retain', False)) except Exception as e: self.__log.exception("Error during publishing to target broker: %r", e) - self.__gateway.send_rpc_reply(device=content["device"], + self.__gateway.send_rpc_reply(device=content.get("device"), req_id=content["data"]["id"], content={"error": str.format("Error during publishing to target broker: %r",str(e))}, - success_sent=False) + success_sent=False, to_connector_rpc=True if content.get('device') is None else False) return if not expects_response or not defines_timeout: self.__log.info("One-way RPC: sending ack to ThingsBoard immediately") - self.__gateway.send_rpc_reply(device=content.get('device', ''), req_id=content["data"]["id"], - success_sent=True) + self.__gateway.send_rpc_reply(device=content.get('device'), req_id=content["data"]["id"], + success_sent= result is not None, to_connector_rpc=True if content.get('device') is None else False) # Everything went out smoothly: RPC is served return @@ -988,7 +989,14 @@ def server_side_rpc_handler(self, content): @CustomCollectStatistics(start_stat_type='allBytesSentToDevices') def _publish(self, request_topic, data_to_send, retain): - self._client.publish(request_topic, data_to_send, retain).wait_for_publish() + result = False + try: + if self._connected and self._client.is_connected(): + self._client.publish(request_topic, data_to_send, retain).wait_for_publish() + result = True + except Exception as e: + self.__log.error("Error during publishing to target broker: %r", e) + return result def rpc_cancel_processing(self, topic): self.__log.info("RPC canceled or terminated. Unsubscribing from %s", topic) diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py b/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py index 48221cae1..095f479f6 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py @@ -10,6 +10,6 @@ def inner(*args, **kwargs): except ValueError: pass - func(*args, **kwargs) + return func(*args, **kwargs) return inner diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index f92b1bc73..29dfbb591 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1673,8 +1673,9 @@ def rpc_with_reply_processing(self, topic, content): @CollectRPCReplyStatistics(start_stat_type='allBytesSentToTB') @CountMessage('msgsSentToPlatform') def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None, - quality_of_service=0): - self.__rpc_processing_queue.put((device, req_id, content, success_sent, wait_for_publish, quality_of_service)) + quality_of_service=0, to_connector_rpc=False): + self.__rpc_processing_queue.put((device, req_id, content, success_sent, + wait_for_publish, quality_of_service, to_connector_rpc)) def __send_rpc_reply_processing(self): while not self.stopped: @@ -1685,22 +1686,22 @@ def __send_rpc_reply_processing(self): pass def __send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None, - quality_of_service=0): + quality_of_service=0, to_connector_rpc=False): try: self.__rpc_reply_sent = True rpc_response = {"success": False} if success_sent is not None: if success_sent: rpc_response["success"] = True - if device is not None and success_sent is not None: + if device is not None and success_sent is not None and not to_connector_rpc: self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response), quality_of_service=quality_of_service) - elif device is not None and req_id is not None and content is not None: + elif device is not None and req_id is not None and content is not None and not to_connector_rpc: self.tb_client.client.gw_send_rpc_reply(device, req_id, content, quality_of_service=quality_of_service) - elif device is None and success_sent is not None: + elif (device is None or to_connector_rpc) and success_sent is not None: self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=quality_of_service, wait_for_publish=wait_for_publish) - elif device is None and content is not None: + elif (device is None and content is not None) or to_connector_rpc: self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=quality_of_service, wait_for_publish=wait_for_publish) self.__rpc_reply_sent = False