Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion
from rosbridge_library.internal.executor_helpers import run_on_executor
from rosbridge_library.internal.ros_loader import get_action_class
from rosbridge_library.internal.type_support import (
ROSActionFeedbackT,
Expand Down Expand Up @@ -75,17 +76,22 @@ def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> No
self.action_type = action_type
self.protocol = protocol
self._shutting_down = False
# setup the action
self.action_server = ActionServer[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
](
# Create the ActionServer on the executor thread; concurrent entity
# registration from a worker thread races with the executor's wait-set
# rebuild and can SIGSEGV inside rclpy/action/server.py:__init__.
self.action_server = run_on_executor(
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
lambda: ActionServer[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
](
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
),
)

def next_id(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from ujson import dumps as encode_json # type: ignore[import-untyped]
except ImportError:
try:
from simplejson import dumps as encode_json # type: ignore[import-untyped]
from simplejson import dumps as encode_json # type: ignore[import-untyped, no-redef]
except ImportError:
from json import dumps as encode_json # type: ignore[assignment]

Expand Down
109 changes: 109 additions & 0 deletions rosbridge_library/src/rosbridge_library/internal/executor_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2026, PickNik Robotics, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of PickNik Robotics, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Helpers for serializing rclpy entity lifecycle with the node's executor.

Creating or destroying entities (clients, action servers, subscriptions,
publishers, ...) on a node from a worker thread is not safe against a
``rclpy`` executor that is concurrently spinning the same node: the executor's
wait-set rebuild can race with the worker thread's entity registration or
teardown and leave the executor holding a handle whose underlying PyCapsule
has been freed. The symptom is either a SIGSEGV inside ``rclpy`` or, less
loudly, a later ``TypeError: Object of type 'NoneType' is not an instance of
'capsule'`` from the next binding call on the affected node.

The remedy used throughout this package — and consistent with the
``IncomingQueue`` fix in #1183 — is to route the lifecycle call through
``executor.create_task(...)`` so it is serialized with the executor's own
work. This module centralizes that pattern.
"""

from __future__ import annotations

from threading import Event
from typing import TYPE_CHECKING, Any, TypeVar

if TYPE_CHECKING:
from collections.abc import Callable

from rclpy.node import Node


_T = TypeVar("_T")


def run_on_executor(node_handle: Node, fn: Callable[[], _T]) -> _T:
"""
Run ``fn()`` on the node's executor thread synchronously and return its result.

Use this whenever a worker thread needs to create or destroy an rclpy
entity (``Client``, ``ActionServer``, ``Subscription``, ``Publisher``,
...) on a node that an executor is concurrently spinning.

If the node has no attached executor (rare; mostly unit tests), the
callable is invoked inline. Exceptions raised by ``fn`` are re-raised in
the calling thread.
"""
executor = node_handle.executor
if executor is None:
return fn()

done = Event()
box: dict[str, Any] = {}

def _wrapper() -> None:
try:
box["result"] = fn()
except BaseException as exc:
box["exc"] = exc
finally:
done.set()

executor.create_task(_wrapper)
done.wait()
if "exc" in box:
raise box["exc"]
return box["result"]


def schedule_on_executor(node_handle: Node, fn: Callable[[], object]) -> None:
"""
Schedule ``fn()`` for execution on the node's executor thread.

Fire-and-forget: the caller does not wait for completion. Falls back to
invoking ``fn`` inline if the node has no attached executor.
"""
executor = node_handle.executor
if executor is not None:
executor.create_task(fn)
else:
fn()
27 changes: 22 additions & 5 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from rclpy.callback_groups import ReentrantCallbackGroup

from rosbridge_library.internal.executor_helpers import run_on_executor, schedule_on_executor
from rosbridge_library.internal.message_conversion import (
extract_values,
populate_instance,
Expand Down Expand Up @@ -128,6 +129,16 @@ def args_to_service_request_instance(inst: ROSMessage, args: list | dict[str, An
populate_instance(msg, inst)


def _destroy_client_async(node_handle: Node, client: Client) -> None:
"""
Schedule destruction of ``client`` on the node's executor thread.

Fire-and-forget; see :mod:`rosbridge_library.internal.executor_helpers`
for the underlying rationale.
"""
schedule_on_executor(node_handle, lambda: node_handle.destroy_client(client))


def call_service(
node_handle: Node,
service: str,
Expand Down Expand Up @@ -155,12 +166,18 @@ def call_service(
# Populate the instance with the provided args
args_to_service_request_instance(inst, args)

client: Client = node_handle.create_client(
service_class, service, callback_group=ReentrantCallbackGroup()
# Create the client on the executor thread; concurrent client creation /
# destruction from worker threads races with the executor's wait-set
# rebuild and can leave handles in an inconsistent state.
client: Client = run_on_executor(
node_handle,
lambda: node_handle.create_client(
service_class, service, callback_group=ReentrantCallbackGroup()
),
)

if not client.wait_for_service(server_ready_timeout):
node_handle.destroy_client(client)
_destroy_client_async(node_handle, client)
raise InvalidServiceException(service)

future = client.call_async(inst)
Expand All @@ -173,11 +190,11 @@ def future_done_callback() -> None:

if not event.wait(timeout=(server_response_timeout if server_response_timeout > 0 else None)):
future.cancel()
node_handle.destroy_client(client)
_destroy_client_async(node_handle, client)
msg = "Timeout exceeded while waiting for service response"
raise Exception(msg)

node_handle.destroy_client(client)
_destroy_client_async(node_handle, client)

result = future.result()

Expand Down
48 changes: 38 additions & 10 deletions rosbridge_library/src/rosbridge_library/util/ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,51 @@

from __future__ import annotations

from threading import Event
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from rclpy.executors import Executor
from rclpy.node import Node


def is_topic_published(node: Node, topic_name: str) -> bool:
"""Check if a topic is published on a node."""
published_topic_data = node.get_publisher_names_and_types_by_node(
node.get_name(), node.get_namespace()
)
return any(topic[0] == topic_name for topic in published_topic_data)
"""
Check if a node has at least one publisher on the given topic.

Reads ``node.publishers`` (the local entity list) rather than the rmw graph
cache. The local list is populated synchronously inside ``create_publisher``
and cleared inside ``destroy_publisher``, so the result is deterministic
with respect to the calling thread — no DDS-discovery round-trip is
involved.
"""
return any(pub.topic_name == topic_name for pub in node.publishers)


def is_topic_subscribed(node: Node, topic_name: str) -> bool:
"""Check if a topic is subscribed to by a node."""
subscribed_topic_data = node.get_subscriber_names_and_types_by_node(
node.get_name(), node.get_namespace()
)
return any(topic[0] == topic_name for topic in subscribed_topic_data)
"""
Check if a node has at least one subscription on the given topic.

Reads ``node.subscriptions`` (the local entity list) rather than the rmw
graph cache; see ``is_topic_published`` for why this matters.
"""
return any(sub.topic_name == topic_name for sub in node.subscriptions)


def wait_for_executor_idle(executor: Executor, timeout: float = 5.0) -> None:
"""
Block until all tasks already queued on ``executor`` have been processed.

Used by tests that schedule work on the executor and then need to assert
on the post-condition from a different thread. Submits a no-op task and
waits for it to run: tasks are FIFO on ``SingleThreadedExecutor``, so when
the no-op completes every task enqueued before it has also completed.

Raises ``TimeoutError`` if the executor does not drain within ``timeout``
seconds, which usually means it is not being spun.
"""
done = Event()
executor.create_task(done.set)
if not done.wait(timeout=timeout):
msg = f"Executor did not become idle within {timeout}s"
raise TimeoutError(msg)
19 changes: 14 additions & 5 deletions rosbridge_library/test/capabilities/test_action_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def setUp(self) -> None:
def tearDown(self) -> None:
self.executor.remove_node(self.node)
self.executor.shutdown()
# Join the spin thread so it does not outlive the test and block the
# process from exiting at the end of the suite (the action server
# SIGSEGV used to kill the process before this mattered).
self.exec_thread.join()
self.node.destroy_node()
rclpy.shutdown()

def local_send_cb(
Expand Down Expand Up @@ -135,7 +140,7 @@ def test_execute_advertised_action(self) -> None:
}
)
)
Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start()
Thread(target=self.send_goal.send_action_goal, args=(goal_msg,), daemon=True).start()

start_time = time.monotonic()
while self.received_message is None:
Expand Down Expand Up @@ -244,7 +249,7 @@ def test_cancel_advertised_action(self) -> None:
}
)
)
Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start()
Thread(target=self.send_goal.send_action_goal, args=(goal_msg,), daemon=True).start()

start_time = time.monotonic()
while self.received_message is None:
Expand Down Expand Up @@ -335,7 +340,7 @@ def test_unadvertise_action(self) -> None:
}
)
)
Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start()
Thread(target=self.send_goal.send_action_goal, args=(goal_msg,), daemon=True).start()

start_time = time.monotonic()
while self.received_message is None:
Expand Down Expand Up @@ -381,7 +386,7 @@ def test_unadvertise_action(self) -> None:
}
)
)
Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,)).start()
Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,), daemon=True).start()

start_time = time.monotonic()
while self.received_message is None:
Expand Down Expand Up @@ -410,7 +415,11 @@ def test_unadvertise_action(self) -> None:
)
)

Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after_readvertise,)).start()
Thread(
target=self.send_goal.send_action_goal,
args=(goal_msg_after_readvertise,),
daemon=True,
).start()

start_time = time.monotonic()
while self.received_message is None:
Expand Down
14 changes: 11 additions & 3 deletions rosbridge_library/test/capabilities/test_qos.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import time
import unittest
from threading import Thread
from threading import Event, Thread
from typing import Any

import rclpy
Expand Down Expand Up @@ -170,14 +170,22 @@ def test_backward_compatibility_latch(self) -> None:
self.assertEqual(qos_profile.durability, DurabilityPolicy.TRANSIENT_LOCAL)
self.assertEqual(qos_profile.depth, 1)

# Late-joining subscriber should receive the latched message
# Late-joining subscriber should receive the latched message. Block on
# a threading.Event set in the callback instead of sleeping, so the
# test does not flake when DDS discovery + late-joining delivery
# overrun a short sleep on slower middleware setups.
received: dict[str, Any] = {"msg": None}
msg_received = Event()

def cb(msg: String) -> None:
received["msg"] = msg
msg_received.set()

self.node.create_subscription(String, topic, cb, qos_profile)
time.sleep(0.1)
self.assertTrue(
msg_received.wait(timeout=5.0),
"Late-joining subscriber did not receive latched message within 5 s",
)
self.assertEqual(received["msg"].data, msg["data"])

def test_publish_qos_works(self) -> None:
Expand Down
Loading
Loading