Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from action_msgs.msg import GoalStatus
from rclpy.action import ActionServer
from rclpy.action.server import CancelResponse
from rclpy.action.server import CancelResponse, GoalResponse
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.task import Future

Expand All @@ -63,9 +63,7 @@ class AdvertisedActionHandler(
):
id_counter = 1

def __init__(
self, action_name: str, action_type: str, protocol: Protocol, sleep_time: float = 0.001
) -> None:
def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> None:
self.goal_futures: dict[str, Future[ROSActionResultT]] = {}
self.goal_handles: dict[
str,
Expand All @@ -76,16 +74,17 @@ def __init__(
self.action_name = action_name
self.action_type = action_type
self.protocol = protocol
self.sleep_time = sleep_time
self._shutting_down = False
# setup the action
self.action_server = ActionServer[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
](
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback, # type: ignore[arg-type] # rclpy type hint does not support coroutines
cancel_callback=self.cancel_callback, # type: ignore[arg-type] # rclpy type hint is incorrect
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
)

Expand All @@ -94,6 +93,21 @@ def next_id(self) -> int:
self.id_counter += 1
return next_id_value

def goal_callback(self, _goal_request: ROSActionGoalT) -> GoalResponse:
"""
Handle new action goal request.

ActionServer callback for receiving a new action goal request.
"""
if self._shutting_down:
self.protocol.log(
"warning",
f"Received new goal request for action {self.action_name} while shutting down, rejecting.",
)
return GoalResponse.REJECT

return GoalResponse.ACCEPT

async def execute_callback(
self,
goal: ServerGoalHandle[
Expand All @@ -109,7 +123,7 @@ async def execute_callback(
goal_id = f"action_goal:{self.action_name}:{self.next_id()}"

def done_callback(fut: Future[ROSActionResultT]) -> None:
if fut.cancelled():
if fut.cancelled() or fut.exception() is not None:
goal.abort()
self.protocol.log("info", f"Aborted goal {goal_id}")
else:
Expand Down Expand Up @@ -147,10 +161,22 @@ def done_callback(fut: Future[ROSActionResultT]) -> None:
# Return empty result when cancelled/aborted
return cast("ROSActionResultT", get_action_class(self.action_type).Result())
return result
except Exception as e:
self.protocol.log(
"error", f"Error while waiting for result of action goal with id {goal_id}: {e}"
)
# Goal should be aborted by the done_callback when the future exception is set
# Return empty result
return cast("ROSActionResultT", get_action_class(self.action_type).Result())
finally:
del self.goal_futures[goal_id]
del self.goal_handles[goal_id]

if self._shutting_down and not self.goal_futures:
# Action is shutting down and no more goal futures are pending,
# schedule destruction of the action server
self._schedule_action_server_destruction()

def cancel_callback(
self,
goal: ServerGoalHandle[
Expand Down Expand Up @@ -211,6 +237,8 @@ def handle_abort(self, goal_id: str) -> None:

def graceful_shutdown(self) -> None:
"""Signal the AdvertisedActionHandler to shutdown."""
self._shutting_down = True

if self.goal_futures:
incomplete_ids = ", ".join(self.goal_futures.keys())
self.protocol.log(
Expand All @@ -221,10 +249,23 @@ def graceful_shutdown(self) -> None:
for future_id in self.goal_futures:
future = self.goal_futures[future_id]
future.set_exception(RuntimeError(f"Action {self.action_name} was unadvertised"))
else:
self._schedule_action_server_destruction()

def _schedule_action_server_destruction(self) -> None:
executor = self.action_server._node.executor
assert executor is not None

async def destroy_action_server() -> None:
# Sleep briefly to allow any in-flight callbacks to complete before destroying the action server
future = executor.create_future()
timer = self.protocol.node_handle.create_timer(1.0, lambda: future.set_result(None))
await future
timer.destroy()

self.action_server.destroy()

# Uncommenting this, you may get a segfault.
# See https://github.com/ros2/rclcpp/issues/2163#issuecomment-1850925883
# self.action_server.destroy()
executor.create_task(destroy_action_server)


class AdvertiseAction(Capability):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ def _success(
self.protocol.send(outgoing_message)

def _failure(self, cid: str | None, action: str, exc: Exception) -> None:
self.protocol.log("error", f"send_action_goal {type(exc).__name__}: {cid}")
self.protocol.log(
"error",
f"send_action_goal to '{action}' failed: {exc}"
+ (f" [id: {cid}]" if cid is not None else ""),
)
# send response with result: false
outgoing_message = {
"op": "action_result",
Expand Down
65 changes: 64 additions & 1 deletion rosbridge_library/test/capabilities/test_action_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,70 @@ def test_unadvertise_action(self) -> None:
while self.received_message is None:
time.sleep(0.1)
if time.monotonic() - start_time > 1.0:
self.fail("Timed out waiting for unadvertise action message.")
self.fail("Timed out waiting for action result.")

# After unadvertising the action, the goal should be aborted and an action result with
# status ABORTED should be sent.
self.assertIsNotNone(self.received_message)
self.assertEqual(self.received_message["op"], "action_result")
self.assertEqual(self.received_message["status"], GoalStatus.STATUS_ABORTED)

# Sleep briefly to allow the action server to be fully destroyed before proceeding with the test
time.sleep(1.0)

# Now try sending another goal after unadvertising — it should be rejected
self.received_message = None
goal_msg_after = loads(
dumps(
{
"op": "send_action_goal",
"id": "foo4_after_unadvertise",
"action": action_path,
"action_type": "example_interfaces/Fibonacci",
"args": {"order": 3},
}
)
)
Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,)).start()

start_time = time.monotonic()
while self.received_message is None:
time.sleep(0.1)
if time.monotonic() - start_time > 2.0:
self.fail("Timed out waiting for rejected action goal response.")

self.assertIsNotNone(self.received_message)
self.assertEqual(self.received_message["op"], "action_result")
self.assertFalse(self.received_message["result"])

# Now advertise the same action again, and verify we can send goals to it successfully
self.received_message = None
self.advertise.advertise_action(advertise_msg)
time.sleep(0.1)

goal_msg_after_readvertise = loads(
dumps(
{
"op": "send_action_goal",
"id": "foo4_after_readvertise",
"action": action_path,
"action_type": "example_interfaces/Fibonacci",
"args": {"order": 4},
}
)
)

Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after_readvertise,)).start()

start_time = time.monotonic()
while self.received_message is None:
time.sleep(0.1)
if time.monotonic() - start_time > 2.0:
self.fail("Timed out waiting for action goal message after readvertise.")

self.assertIsNotNone(self.received_message)
self.assertEqual(self.received_message["op"], "send_action_goal")
self.assertTrue("id" in self.received_message)


if __name__ == "__main__":
Expand Down
Loading