diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py index 3c41337c7..9ed16405c 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py @@ -37,7 +37,11 @@ from action_msgs.msg import GoalStatus from rclpy.action import ActionServer +<<<<<<< HEAD from rclpy.action.server import CancelResponse, ServerGoalHandle +======= +from rclpy.action.server import CancelResponse, GoalResponse +>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248)) from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.task import Future @@ -58,24 +62,34 @@ class AdvertisedActionHandler(Generic[ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT]): id_counter = 1 +<<<<<<< HEAD def __init__( self, action_name: str, action_type: str, protocol: Protocol, sleep_time: float = 0.001 ) -> None: self.goal_futures: dict[str, Future] = {} self.goal_handles: dict[str, ServerGoalHandle] = {} +======= + def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> None: + self.goal_futures: dict[str, Future[ROSActionResultT]] = {} + self.goal_handles: dict[ + str, + ServerGoalHandle[ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT], + ] = {} +>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248)) self.goal_statuses: dict[str, int] = {} self.action_name = action_name self.action_type = action_type self.protocol = protocol - self.sleep_time = sleep_time + self._shutting_down = False # setup the action self.action_server = ActionServer( protocol.node_handle, get_action_class(action_type), action_name, - self.execute_callback, # type: ignore[arg-type] # rclpy type hint does not support coroutines - cancel_callback=self.cancel_callback, # type: ignore[arg-type] # rclpy type hint is incorrect + self.execute_callback, + goal_callback=self.goal_callback, + cancel_callback=self.cancel_callback, callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 ) @@ -84,7 +98,31 @@ def next_id(self) -> int: self.id_counter += 1 return next_id_value +<<<<<<< HEAD async def execute_callback(self, goal: ServerGoalHandle) -> ROSActionResultT: +======= + def goal_callback(self, _goal_request: ROSActionGoalT) -> GoalResponse: + """ + Handle new action goal request. + + ActionServer callback for receiving a new action goal request. + """ + if self._shutting_down: + self.protocol.log( + "warning", + f"Received new goal request for action {self.action_name} while shutting down, rejecting.", + ) + return GoalResponse.REJECT + + return GoalResponse.ACCEPT + + async def execute_callback( + self, + goal: ServerGoalHandle[ + ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT + ], + ) -> ROSActionResultT: +>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248)) """ Execute action goal. @@ -93,8 +131,13 @@ async def execute_callback(self, goal: ServerGoalHandle) -> ROSActionResultT: # generate a unique ID goal_id = f"action_goal:{self.action_name}:{self.next_id()}" +<<<<<<< HEAD def done_callback(fut: Future) -> None: if fut.cancelled(): +======= + def done_callback(fut: Future[ROSActionResultT]) -> None: + if fut.cancelled() or fut.exception() is not None: +>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248)) goal.abort() self.protocol.log("info", f"Aborted goal {goal_id}") else: @@ -132,11 +175,32 @@ def done_callback(fut: Future) -> None: # Return empty result when cancelled/aborted return cast("ROSActionResultT", get_action_class(self.action_type).Result()) return result + except Exception as e: + self.protocol.log( + "error", f"Error while waiting for result of action goal with id {goal_id}: {e}" + ) + # Goal should be aborted by the done_callback when the future exception is set + # Return empty result + return cast("ROSActionResultT", get_action_class(self.action_type).Result()) finally: del self.goal_futures[goal_id] del self.goal_handles[goal_id] +<<<<<<< HEAD def cancel_callback(self, goal: ServerGoalHandle) -> CancelResponse: +======= + if self._shutting_down and not self.goal_futures: + # Action is shutting down and no more goal futures are pending, + # schedule destruction of the action server + self._schedule_action_server_destruction() + + def cancel_callback( + self, + goal: ServerGoalHandle[ + ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT + ], + ) -> CancelResponse: +>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248)) """ Cancel action goal. @@ -191,6 +255,8 @@ def handle_abort(self, goal_id: str) -> None: def graceful_shutdown(self) -> None: """Signal the AdvertisedActionHandler to shutdown.""" + self._shutting_down = True + if self.goal_futures: incomplete_ids = ", ".join(self.goal_futures.keys()) self.protocol.log( @@ -201,10 +267,23 @@ def graceful_shutdown(self) -> None: for future_id in self.goal_futures: future = self.goal_futures[future_id] future.set_exception(RuntimeError(f"Action {self.action_name} was unadvertised")) + else: + self._schedule_action_server_destruction() + + def _schedule_action_server_destruction(self) -> None: + executor = self.action_server._node.executor + assert executor is not None + + async def destroy_action_server() -> None: + # Sleep briefly to allow any in-flight callbacks to complete before destroying the action server + future = executor.create_future() + timer = self.protocol.node_handle.create_timer(1.0, lambda: future.set_result(None)) + await future + timer.destroy() + + self.action_server.destroy() - # Uncommenting this, you may get a segfault. - # See https://github.com/ros2/rclcpp/issues/2163#issuecomment-1850925883 - # self.action_server.destroy() + executor.create_task(destroy_action_server) class AdvertiseAction(Capability): diff --git a/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py b/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py index 86bb9ca7e..e7d7a8ec7 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/send_action_goal.py @@ -191,7 +191,11 @@ def _success( self.protocol.send(outgoing_message) def _failure(self, cid: str | None, action: str, exc: Exception) -> None: - self.protocol.log("error", f"send_action_goal {type(exc).__name__}: {cid}") + self.protocol.log( + "error", + f"send_action_goal to '{action}' failed: {exc}" + + (f" [id: {cid}]" if cid is not None else ""), + ) # send response with result: false outgoing_message = { "op": "action_result", diff --git a/rosbridge_library/test/capabilities/test_action_capabilities.py b/rosbridge_library/test/capabilities/test_action_capabilities.py index f36462b50..e35d7739d 100755 --- a/rosbridge_library/test/capabilities/test_action_capabilities.py +++ b/rosbridge_library/test/capabilities/test_action_capabilities.py @@ -357,7 +357,70 @@ def test_unadvertise_action(self) -> None: while self.received_message is None: time.sleep(0.1) if time.monotonic() - start_time > 1.0: - self.fail("Timed out waiting for unadvertise action message.") + self.fail("Timed out waiting for action result.") + + # After unadvertising the action, the goal should be aborted and an action result with + # status ABORTED should be sent. + self.assertIsNotNone(self.received_message) + self.assertEqual(self.received_message["op"], "action_result") + self.assertEqual(self.received_message["status"], GoalStatus.STATUS_ABORTED) + + # Sleep briefly to allow the action server to be fully destroyed before proceeding with the test + time.sleep(1.0) + + # Now try sending another goal after unadvertising — it should be rejected + self.received_message = None + goal_msg_after = loads( + dumps( + { + "op": "send_action_goal", + "id": "foo4_after_unadvertise", + "action": action_path, + "action_type": "example_interfaces/Fibonacci", + "args": {"order": 3}, + } + ) + ) + Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,)).start() + + start_time = time.monotonic() + while self.received_message is None: + time.sleep(0.1) + if time.monotonic() - start_time > 2.0: + self.fail("Timed out waiting for rejected action goal response.") + + self.assertIsNotNone(self.received_message) + self.assertEqual(self.received_message["op"], "action_result") + self.assertFalse(self.received_message["result"]) + + # Now advertise the same action again, and verify we can send goals to it successfully + self.received_message = None + self.advertise.advertise_action(advertise_msg) + time.sleep(0.1) + + goal_msg_after_readvertise = loads( + dumps( + { + "op": "send_action_goal", + "id": "foo4_after_readvertise", + "action": action_path, + "action_type": "example_interfaces/Fibonacci", + "args": {"order": 4}, + } + ) + ) + + Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after_readvertise,)).start() + + start_time = time.monotonic() + while self.received_message is None: + time.sleep(0.1) + if time.monotonic() - start_time > 2.0: + self.fail("Timed out waiting for action goal message after readvertise.") + + self.assertIsNotNone(self.received_message) + self.assertEqual(self.received_message["op"], "send_action_goal") + self.assertTrue("id" in self.received_message) if __name__ == "__main__":