Skip to content

Commit

Permalink
More attempts to replicate cpp implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jmblixt3 committed Jun 29, 2024
1 parent 562f1d4 commit ac7f409
Showing 1 changed file with 90 additions and 84 deletions.
174 changes: 90 additions & 84 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,10 @@ def __init__(
self._node.add_waitable(self)
self._logger = self._node.get_logger().get_child('action_client')

self._lock_goal_request = threading.Lock()
self._lock_cancel_request = threading.Lock()
self._lock_result_request = threading.Lock()
self._lock_feedback = threading.Lock()
self._lock_status = threading.Lock()
self._lock_goal_handles = threading.Lock()
self._lock_goal_requests = threading.Lock()
self._lock_cancel_requests = threading.Lock()
self._lock_result_requests = threading.Lock()

def _generate_random_uuid(self):
return UUID(uuid=list(uuid.uuid4().bytes))
Expand Down Expand Up @@ -216,21 +215,24 @@ def _remove_pending_request(self, future, pending_requests):
return None

def _remove_pending_goal_request(self, future):
seq = self._remove_pending_request(future, self._pending_goal_requests)
if seq in self._goal_sequence_number_to_goal_id:
del self._goal_sequence_number_to_goal_id[seq]
with self._lock_goal_requests:
seq = self._remove_pending_request(future, self._pending_goal_requests)
if seq in self._goal_sequence_number_to_goal_id:
del self._goal_sequence_number_to_goal_id[seq]

def _remove_pending_cancel_request(self, future):
self._remove_pending_request(future, self._pending_cancel_requests)
with self._lock_cancel_requests:
self._remove_pending_request(future, self._pending_cancel_requests)

def _remove_pending_result_request(self, future):
seq = self._remove_pending_request(future, self._pending_result_requests)
if seq in self._result_sequence_number_to_goal_id:
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
del self._result_sequence_number_to_goal_id[seq]
# remove feeback_callback if user is aware of result and it's been received
if goal_uuid in self._feedback_callbacks:
del self._feedback_callbacks[goal_uuid]
with self._lock_result_requests:
seq = self._remove_pending_request(future, self._pending_result_requests)
if seq in self._result_sequence_number_to_goal_id:
goal_uuid = bytes(self._result_sequence_number_to_goal_id[seq].uuid)
del self._result_sequence_number_to_goal_id[seq]
# remove feeback_callback if user is aware of result and it's been received
if goal_uuid in self._feedback_callbacks:
del self._feedback_callbacks[goal_uuid]

# Start Waitable API
def is_ready(self, wait_set):
Expand All @@ -247,39 +249,39 @@ def take_data(self):
"""Take stuff from lower level so the wait set doesn't immediately wake again."""
data = {}
if self._is_goal_response_ready:
with self._lock_goal_request:
with self._node.handle:
taken_data = self._client_handle.take_goal_response(
self._action_type.Impl.SendGoalService.Response)
# If take fails, then we get (None, None)
if all(taken_data):
data['goal'] = taken_data

if self._is_cancel_response_ready:
with self._lock_cancel_request:
with self._node.handle:
taken_data = self._client_handle.take_cancel_response(
self._action_type.Impl.CancelGoalService.Response)
# If take fails, then we get (None, None)
if all(taken_data):
data['cancel'] = taken_data

if self._is_result_response_ready:
with self._lock_result_request:
with self._node.handle:
taken_data = self._client_handle.take_result_response(
self._action_type.Impl.GetResultService.Response)
# If take fails, then we get (None, None)
if all(taken_data):
data['result'] = taken_data

if self._is_feedback_ready:
with self._lock_feedback:
with self._node.handle:
taken_data = self._client_handle.take_feedback(
self._action_type.Impl.FeedbackMessage)
# If take fails, then we get None
if taken_data is not None:
data['feedback'] = taken_data

if self._is_status_ready:
with self._lock_status:
with self._node.handle:
taken_data = self._client_handle.take_status(
self._action_type.Impl.GoalStatusMessage)
# If take fails, then we get None
Expand All @@ -296,46 +298,50 @@ async def execute(self, taken_data):
call any user-defined callbacks (e.g. feedback).
"""
if 'goal' in taken_data:
sequence_number, goal_response = taken_data['goal']
if sequence_number in self._goal_sequence_number_to_goal_id:
goal_handle = ClientGoalHandle(
self,
self._goal_sequence_number_to_goal_id[sequence_number],
goal_response)

if goal_handle.accepted:
goal_uuid = bytes(goal_handle.goal_id.uuid)
if goal_uuid in self._goal_handles:
raise RuntimeError(
'Two goals were accepted with the same ID ({})'.format(goal_handle))
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)

self._pending_goal_requests[sequence_number].set_result(goal_handle)
else:
self._logger.warning(
'Ignoring unexpected goal response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
with self._lock_goal_requests:
sequence_number, goal_response = taken_data['goal']
if sequence_number in self._goal_sequence_number_to_goal_id:
goal_handle = ClientGoalHandle(
self,
self._goal_sequence_number_to_goal_id[sequence_number],
goal_response)

if goal_handle.accepted:
goal_uuid = bytes(goal_handle.goal_id.uuid)
with self._lock_goal_handles:
if goal_uuid in self._goal_handles:
raise RuntimeError(
'Two goals were accepted with the same ID ({})'.format(goal_handle))
self._goal_handles[goal_uuid] = weakref.ref(goal_handle)

self._pending_goal_requests[sequence_number].set_result(goal_handle)
else:
self._logger.warning(
'Ignoring unexpected goal response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'cancel' in taken_data:
sequence_number, cancel_response = taken_data['cancel']
if sequence_number in self._pending_cancel_requests:
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
else:
self._logger.warning(
'Ignoring unexpected cancel response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
with self._lock_cancel_requests:
sequence_number, cancel_response = taken_data['cancel']
if sequence_number in self._pending_cancel_requests:
self._pending_cancel_requests[sequence_number].set_result(cancel_response)
else:
self._logger.warning(
'Ignoring unexpected cancel response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'result' in taken_data:
sequence_number, result_response = taken_data['result']
if sequence_number in self._pending_result_requests:
self._pending_result_requests[sequence_number].set_result(result_response)
else:
self._logger.warning(
'Ignoring unexpected result response. There may be more than '
f"one action server for the action '{self._action_name}'"
)
with self._lock_result_requests:
sequence_number, result_response = taken_data['result']
if sequence_number in self._pending_result_requests:
self._pending_result_requests[sequence_number].set_result(result_response)
else:
self._logger.warning(
'Ignoring unexpected result response. There may be more than '
f"one action server for the action '{self._action_name}'"
)

if 'feedback' in taken_data:
feedback_msg = taken_data['feedback']
Expand All @@ -349,19 +355,19 @@ async def execute(self, taken_data):
for status_msg in taken_data['status'].status_list:
goal_uuid = bytes(status_msg.goal_info.goal_id.uuid)
status = status_msg.status

if goal_uuid in self._goal_handles:
goal_handle = self._goal_handles[goal_uuid]()
if goal_handle is not None:
goal_handle._status = status
# Remove "done" goals from the list
if (GoalStatus.STATUS_SUCCEEDED == status or
GoalStatus.STATUS_CANCELED == status or
GoalStatus.STATUS_ABORTED == status):
with self._lock_goal_handles:
if goal_uuid in self._goal_handles:
goal_handle = self._goal_handles[goal_uuid]()
if goal_handle is not None:
goal_handle._status = status
# Remove "done" goals from the list
if (GoalStatus.STATUS_SUCCEEDED == status or
GoalStatus.STATUS_CANCELED == status or
GoalStatus.STATUS_ABORTED == status):
del self._goal_handles[goal_uuid]
else:
# Weak reference is None
del self._goal_handles[goal_uuid]
else:
# Weak reference is None
del self._goal_handles[goal_uuid]

def get_num_entities(self):
"""Return number of each type of entity used in the wait set."""
Expand All @@ -370,7 +376,8 @@ def get_num_entities(self):

def add_to_wait_set(self, wait_set):
"""Add entities to wait set."""
self._client_handle.add_to_waitset(wait_set)
with self._node.handle:
self._client_handle.add_to_waitset(wait_set)

def __enter__(self):
return self._client_handle.__enter__()
Expand Down Expand Up @@ -449,23 +456,22 @@ def send_goal_async(self, goal, feedback_callback=None, goal_uuid=None):
request.goal_id = self._generate_random_uuid() if goal_uuid is None else goal_uuid
request.goal = goal
future = Future()
with self._lock_goal_request:
with self._lock_goal_requests:
sequence_number = self._client_handle.send_goal_request(request)
if sequence_number in self._pending_goal_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending goal request'.format(sequence_number))
self._pending_goal_requests[sequence_number] = future
self._goal_sequence_number_to_goal_id[sequence_number] = request.goal_id
future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

if feedback_callback is not None:
# TODO(jacobperron): Move conversion function to a general-use package
goal_uuid = bytes(request.goal_id.uuid)
self._feedback_callbacks[goal_uuid] = feedback_callback

future.add_done_callback(self._remove_pending_goal_request)
# Add future so executor is aware
self.add_future(future)

return future

def _cancel_goal(self, goal_handle):
Expand Down Expand Up @@ -508,16 +514,15 @@ def _cancel_goal_async(self, goal_handle):
cancel_request = CancelGoal.Request()
cancel_request.goal_info.goal_id = goal_handle.goal_id
future = Future()
with self._lock_cancel_request:
with self._lock_cancel_requests:
sequence_number = self._client_handle.send_cancel_request(cancel_request)
if sequence_number in self._pending_cancel_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending cancel request'.format(sequence_number))
self._pending_cancel_requests[sequence_number] = future

future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)
future.add_done_callback(self._remove_pending_cancel_request)
# Add future so executor is aware
self.add_future(future)

return future

Expand Down Expand Up @@ -561,17 +566,18 @@ def _get_result_async(self, goal_handle):
result_request = self._action_type.Impl.GetResultService.Request()
result_request.goal_id = goal_handle.goal_id
future = Future()
with self._lock_result_request:
with self._lock_result_requests:
sequence_number = self._client_handle.send_result_request(result_request)
if sequence_number in self._pending_result_requests:
raise RuntimeError(
'Sequence ({}) conflicts with pending result request'.format(sequence_number))

self._pending_result_requests[sequence_number] = future
self._result_sequence_number_to_goal_id[sequence_number] = result_request.goal_id

future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)
future.add_done_callback(self._remove_pending_result_request)
# Add future so executor is aware
self.add_future(future)

return future

Expand Down

0 comments on commit ac7f409

Please sign in to comment.