Skip to content

Commit

Permalink
Updated handling status of rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Dec 2, 2024
1 parent ca9d6ea commit 9c6f7a2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
20 changes: 14 additions & 6 deletions thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,19 +919,20 @@ def __process_rpc_request(self, content, rpc_config):

try:
self.__log.info("Publishing to: %s with data %s", request_topic, data_to_send)
result = None
try:
self._publish(request_topic, data_to_send, rpc_config.get('retain', False))
result = self._publish(request_topic, data_to_send, rpc_config.get('retain', False))
except Exception as e:
self.__log.exception("Error during publishing to target broker: %r", e)
self.__gateway.send_rpc_reply(device=content["device"],
self.__gateway.send_rpc_reply(device=content.get("device"),
req_id=content["data"]["id"],
content={"error": str.format("Error during publishing to target broker: %r",str(e))},
success_sent=False)
success_sent=False, to_connector_rpc=True if content.get('device') is None else False)
return
if not expects_response or not defines_timeout:
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
self.__gateway.send_rpc_reply(device=content.get('device', ''), req_id=content["data"]["id"],
success_sent=True)
self.__gateway.send_rpc_reply(device=content.get('device'), req_id=content["data"]["id"],
success_sent= result is not None, to_connector_rpc=True if content.get('device') is None else False)

# Everything went out smoothly: RPC is served
return
Expand Down Expand Up @@ -988,7 +989,14 @@ def server_side_rpc_handler(self, content):

@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
def _publish(self, request_topic, data_to_send, retain):
self._client.publish(request_topic, data_to_send, retain).wait_for_publish()
result = False
try:
if self._connected and self._client.is_connected():
self._client.publish(request_topic, data_to_send, retain).wait_for_publish()
result = True
except Exception as e:
self.__log.error("Error during publishing to target broker: %r", e)
return result

def rpc_cancel_processing(self, topic):
self.__log.info("RPC canceled or terminated. Unsubscribing from %s", topic)
Expand Down
2 changes: 1 addition & 1 deletion thingsboard_gateway/connectors/mqtt/mqtt_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ def inner(*args, **kwargs):
except ValueError:
pass

func(*args, **kwargs)
return func(*args, **kwargs)

return inner
15 changes: 8 additions & 7 deletions thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,8 +1673,9 @@ def rpc_with_reply_processing(self, topic, content):
@CollectRPCReplyStatistics(start_stat_type='allBytesSentToTB')
@CountMessage('msgsSentToPlatform')
def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None,
quality_of_service=0):
self.__rpc_processing_queue.put((device, req_id, content, success_sent, wait_for_publish, quality_of_service))
quality_of_service=0, to_connector_rpc=False):
self.__rpc_processing_queue.put((device, req_id, content, success_sent,
wait_for_publish, quality_of_service, to_connector_rpc))

def __send_rpc_reply_processing(self):
while not self.stopped:
Expand All @@ -1685,22 +1686,22 @@ def __send_rpc_reply_processing(self):
pass

def __send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None,
quality_of_service=0):
quality_of_service=0, to_connector_rpc=False):
try:
self.__rpc_reply_sent = True
rpc_response = {"success": False}
if success_sent is not None:
if success_sent:
rpc_response["success"] = True
if device is not None and success_sent is not None:
if device is not None and success_sent is not None and not to_connector_rpc:
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response),
quality_of_service=quality_of_service)
elif device is not None and req_id is not None and content is not None:
elif device is not None and req_id is not None and content is not None and not to_connector_rpc:
self.tb_client.client.gw_send_rpc_reply(device, req_id, content, quality_of_service=quality_of_service)
elif device is None and success_sent is not None:
elif (device is None or to_connector_rpc) and success_sent is not None:
self.tb_client.client.send_rpc_reply(req_id, dumps(rpc_response), quality_of_service=quality_of_service,
wait_for_publish=wait_for_publish)
elif device is None and content is not None:
elif (device is None and content is not None) or to_connector_rpc:
self.tb_client.client.send_rpc_reply(req_id, content, quality_of_service=quality_of_service,
wait_for_publish=wait_for_publish)
self.__rpc_reply_sent = False
Expand Down

0 comments on commit 9c6f7a2

Please sign in to comment.