Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@

from action_msgs.msg import GoalStatus
from rclpy.action import ActionServer
<<<<<<< HEAD
from rclpy.action.server import CancelResponse, ServerGoalHandle
=======
from rclpy.action.server import CancelResponse, GoalResponse
>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248))
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.task import Future

Expand All @@ -58,24 +62,34 @@
class AdvertisedActionHandler(Generic[ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT]):
id_counter = 1

<<<<<<< HEAD
def __init__(
self, action_name: str, action_type: str, protocol: Protocol, sleep_time: float = 0.001
) -> None:
self.goal_futures: dict[str, Future] = {}
self.goal_handles: dict[str, ServerGoalHandle] = {}
=======
def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> None:
self.goal_futures: dict[str, Future[ROSActionResultT]] = {}
self.goal_handles: dict[
str,
ServerGoalHandle[ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT],
] = {}
>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248))
self.goal_statuses: dict[str, int] = {}

self.action_name = action_name
self.action_type = action_type
self.protocol = protocol
self.sleep_time = sleep_time
self._shutting_down = False
# setup the action
self.action_server = ActionServer(
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback, # type: ignore[arg-type] # rclpy type hint does not support coroutines
cancel_callback=self.cancel_callback, # type: ignore[arg-type] # rclpy type hint is incorrect
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
)

Expand All @@ -84,7 +98,31 @@ def next_id(self) -> int:
self.id_counter += 1
return next_id_value

<<<<<<< HEAD
async def execute_callback(self, goal: ServerGoalHandle) -> ROSActionResultT:
=======
def goal_callback(self, _goal_request: ROSActionGoalT) -> GoalResponse:
"""
Handle new action goal request.

ActionServer callback for receiving a new action goal request.
"""
if self._shutting_down:
self.protocol.log(
"warning",
f"Received new goal request for action {self.action_name} while shutting down, rejecting.",
)
return GoalResponse.REJECT

return GoalResponse.ACCEPT

async def execute_callback(
self,
goal: ServerGoalHandle[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
],
) -> ROSActionResultT:
>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248))
"""
Execute action goal.

Expand All @@ -93,8 +131,13 @@ async def execute_callback(self, goal: ServerGoalHandle) -> ROSActionResultT:
# generate a unique ID
goal_id = f"action_goal:{self.action_name}:{self.next_id()}"

<<<<<<< HEAD
def done_callback(fut: Future) -> None:
if fut.cancelled():
=======
def done_callback(fut: Future[ROSActionResultT]) -> None:
if fut.cancelled() or fut.exception() is not None:
>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248))
goal.abort()
self.protocol.log("info", f"Aborted goal {goal_id}")
else:
Expand Down Expand Up @@ -132,11 +175,32 @@ def done_callback(fut: Future) -> None:
# Return empty result when cancelled/aborted
return cast("ROSActionResultT", get_action_class(self.action_type).Result())
return result
except Exception as e:
self.protocol.log(
"error", f"Error while waiting for result of action goal with id {goal_id}: {e}"
)
# Goal should be aborted by the done_callback when the future exception is set
# Return empty result
return cast("ROSActionResultT", get_action_class(self.action_type).Result())
finally:
del self.goal_futures[goal_id]
del self.goal_handles[goal_id]

<<<<<<< HEAD
def cancel_callback(self, goal: ServerGoalHandle) -> CancelResponse:
=======
if self._shutting_down and not self.goal_futures:
# Action is shutting down and no more goal futures are pending,
# schedule destruction of the action server
self._schedule_action_server_destruction()

def cancel_callback(
self,
goal: ServerGoalHandle[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
],
) -> CancelResponse:
>>>>>>> cfd0646 (feat: Improve action unadvertising (#1248))
"""
Cancel action goal.

Expand Down Expand Up @@ -191,6 +255,8 @@ def handle_abort(self, goal_id: str) -> None:

def graceful_shutdown(self) -> None:
"""Signal the AdvertisedActionHandler to shutdown."""
self._shutting_down = True

if self.goal_futures:
incomplete_ids = ", ".join(self.goal_futures.keys())
self.protocol.log(
Expand All @@ -201,10 +267,23 @@ def graceful_shutdown(self) -> None:
for future_id in self.goal_futures:
future = self.goal_futures[future_id]
future.set_exception(RuntimeError(f"Action {self.action_name} was unadvertised"))
else:
self._schedule_action_server_destruction()

def _schedule_action_server_destruction(self) -> None:
executor = self.action_server._node.executor
assert executor is not None

async def destroy_action_server() -> None:
# Sleep briefly to allow any in-flight callbacks to complete before destroying the action server
future = executor.create_future()
timer = self.protocol.node_handle.create_timer(1.0, lambda: future.set_result(None))
await future
timer.destroy()

self.action_server.destroy()

# Uncommenting this, you may get a segfault.
# See https://github.com/ros2/rclcpp/issues/2163#issuecomment-1850925883
# self.action_server.destroy()
executor.create_task(destroy_action_server)


class AdvertiseAction(Capability):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ def _success(
self.protocol.send(outgoing_message)

def _failure(self, cid: str | None, action: str, exc: Exception) -> None:
self.protocol.log("error", f"send_action_goal {type(exc).__name__}: {cid}")
self.protocol.log(
"error",
f"send_action_goal to '{action}' failed: {exc}"
+ (f" [id: {cid}]" if cid is not None else ""),
)
# send response with result: false
outgoing_message = {
"op": "action_result",
Expand Down
65 changes: 64 additions & 1 deletion rosbridge_library/test/capabilities/test_action_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,70 @@ def test_unadvertise_action(self) -> None:
while self.received_message is None:
time.sleep(0.1)
if time.monotonic() - start_time > 1.0:
self.fail("Timed out waiting for unadvertise action message.")
self.fail("Timed out waiting for action result.")

# After unadvertising the action, the goal should be aborted and an action result with
# status ABORTED should be sent.
self.assertIsNotNone(self.received_message)
self.assertEqual(self.received_message["op"], "action_result")
self.assertEqual(self.received_message["status"], GoalStatus.STATUS_ABORTED)

# Sleep briefly to allow the action server to be fully destroyed before proceeding with the test
time.sleep(1.0)

# Now try sending another goal after unadvertising — it should be rejected
self.received_message = None
goal_msg_after = loads(
dumps(
{
"op": "send_action_goal",
"id": "foo4_after_unadvertise",
"action": action_path,
"action_type": "example_interfaces/Fibonacci",
"args": {"order": 3},
}
)
)
Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,)).start()

start_time = time.monotonic()
while self.received_message is None:
time.sleep(0.1)
if time.monotonic() - start_time > 2.0:
self.fail("Timed out waiting for rejected action goal response.")

self.assertIsNotNone(self.received_message)
self.assertEqual(self.received_message["op"], "action_result")
self.assertFalse(self.received_message["result"])

# Now advertise the same action again, and verify we can send goals to it successfully
self.received_message = None
self.advertise.advertise_action(advertise_msg)
time.sleep(0.1)

goal_msg_after_readvertise = loads(
dumps(
{
"op": "send_action_goal",
"id": "foo4_after_readvertise",
"action": action_path,
"action_type": "example_interfaces/Fibonacci",
"args": {"order": 4},
}
)
)

Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after_readvertise,)).start()

start_time = time.monotonic()
while self.received_message is None:
time.sleep(0.1)
if time.monotonic() - start_time > 2.0:
self.fail("Timed out waiting for action goal message after readvertise.")

self.assertIsNotNone(self.received_message)
self.assertEqual(self.received_message["op"], "send_action_goal")
self.assertTrue("id" in self.received_message)


if __name__ == "__main__":
Expand Down
Loading