diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py index 30ea88309..85ef53563 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py @@ -43,6 +43,7 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion +from rosbridge_library.internal.executor_helpers import run_on_executor from rosbridge_library.internal.ros_loader import get_action_class from rosbridge_library.internal.type_support import ( ROSActionFeedbackT, @@ -75,17 +76,22 @@ def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> No self.action_type = action_type self.protocol = protocol self._shutting_down = False - # setup the action - self.action_server = ActionServer[ - ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT - ]( + # Create the ActionServer on the executor thread; concurrent entity + # registration from a worker thread races with the executor's wait-set + # rebuild and can SIGSEGV inside rclpy/action/server.py:__init__. + self.action_server = run_on_executor( protocol.node_handle, - get_action_class(action_type), - action_name, - self.execute_callback, - goal_callback=self.goal_callback, - cancel_callback=self.cancel_callback, - callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 + lambda: ActionServer[ + ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT + ]( + protocol.node_handle, + get_action_class(action_type), + action_name, + self.execute_callback, + goal_callback=self.goal_callback, + cancel_callback=self.cancel_callback, + callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 + ), ) def next_id(self) -> int: diff --git a/rosbridge_library/src/rosbridge_library/internal/executor_helpers.py b/rosbridge_library/src/rosbridge_library/internal/executor_helpers.py new file mode 100644 index 000000000..d479bed87 --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/internal/executor_helpers.py @@ -0,0 +1,109 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2026, PickNik Robotics, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of PickNik Robotics, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +""" +Helpers for serializing rclpy entity lifecycle with the node's executor. + +Creating or destroying entities (clients, action servers, subscriptions, +publishers, ...) on a node from a worker thread is not safe against a +``rclpy`` executor that is concurrently spinning the same node: the executor's +wait-set rebuild can race with the worker thread's entity registration or +teardown and leave the executor holding a handle whose underlying PyCapsule +has been freed. The symptom is either a SIGSEGV inside ``rclpy`` or, less +loudly, a later ``TypeError: Object of type 'NoneType' is not an instance of +'capsule'`` from the next binding call on the affected node. + +The remedy used throughout this package — and consistent with the +``IncomingQueue`` fix in #1183 — is to route the lifecycle call through +``executor.create_task(...)`` so it is serialized with the executor's own +work. This module centralizes that pattern. +""" + +from __future__ import annotations + +from threading import Event +from typing import TYPE_CHECKING, Any, TypeVar + +if TYPE_CHECKING: + from collections.abc import Callable + + from rclpy.node import Node + + +_T = TypeVar("_T") + + +def run_on_executor(node_handle: Node, fn: Callable[[], _T]) -> _T: + """ + Run ``fn()`` on the node's executor thread synchronously and return its result. + + Use this whenever a worker thread needs to create or destroy an rclpy + entity (``Client``, ``ActionServer``, ``Subscription``, ``Publisher``, + ...) on a node that an executor is concurrently spinning. + + If the node has no attached executor (rare; mostly unit tests), the + callable is invoked inline. Exceptions raised by ``fn`` are re-raised in + the calling thread. + """ + executor = node_handle.executor + if executor is None: + return fn() + + done = Event() + box: dict[str, Any] = {} + + def _wrapper() -> None: + try: + box["result"] = fn() + except BaseException as exc: + box["exc"] = exc + finally: + done.set() + + executor.create_task(_wrapper) + done.wait() + if "exc" in box: + raise box["exc"] + return box["result"] + + +def schedule_on_executor(node_handle: Node, fn: Callable[[], object]) -> None: + """ + Schedule ``fn()`` for execution on the node's executor thread. + + Fire-and-forget: the caller does not wait for completion. Falls back to + invoking ``fn`` inline if the node has no attached executor. + """ + executor = node_handle.executor + if executor is not None: + executor.create_task(fn) + else: + fn() diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index 4bb1f8dff..aa756d42a 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -36,6 +36,7 @@ from rclpy.callback_groups import ReentrantCallbackGroup +from rosbridge_library.internal.executor_helpers import run_on_executor, schedule_on_executor from rosbridge_library.internal.message_conversion import ( extract_values, populate_instance, @@ -128,6 +129,16 @@ def args_to_service_request_instance(inst: ROSMessage, args: list | dict[str, An populate_instance(msg, inst) +def _destroy_client_async(node_handle: Node, client: Client) -> None: + """ + Schedule destruction of ``client`` on the node's executor thread. + + Fire-and-forget; see :mod:`rosbridge_library.internal.executor_helpers` + for the underlying rationale. + """ + schedule_on_executor(node_handle, lambda: node_handle.destroy_client(client)) + + def call_service( node_handle: Node, service: str, @@ -155,12 +166,18 @@ def call_service( # Populate the instance with the provided args args_to_service_request_instance(inst, args) - client: Client = node_handle.create_client( - service_class, service, callback_group=ReentrantCallbackGroup() + # Create the client on the executor thread; concurrent client creation / + # destruction from worker threads races with the executor's wait-set + # rebuild and can leave handles in an inconsistent state. + client: Client = run_on_executor( + node_handle, + lambda: node_handle.create_client( + service_class, service, callback_group=ReentrantCallbackGroup() + ), ) if not client.wait_for_service(server_ready_timeout): - node_handle.destroy_client(client) + _destroy_client_async(node_handle, client) raise InvalidServiceException(service) future = client.call_async(inst) @@ -173,11 +190,11 @@ def future_done_callback() -> None: if not event.wait(timeout=(server_response_timeout if server_response_timeout > 0 else None)): future.cancel() - node_handle.destroy_client(client) + _destroy_client_async(node_handle, client) msg = "Timeout exceeded while waiting for service response" raise Exception(msg) - node_handle.destroy_client(client) + _destroy_client_async(node_handle, client) result = future.result() diff --git a/rosbridge_library/test/capabilities/test_action_capabilities.py b/rosbridge_library/test/capabilities/test_action_capabilities.py index e35d7739d..97f0a479d 100755 --- a/rosbridge_library/test/capabilities/test_action_capabilities.py +++ b/rosbridge_library/test/capabilities/test_action_capabilities.py @@ -57,6 +57,11 @@ def setUp(self) -> None: def tearDown(self) -> None: self.executor.remove_node(self.node) self.executor.shutdown() + # Join the spin thread so it does not outlive the test and block the + # process from exiting at the end of the suite (the action server + # SIGSEGV used to kill the process before this mattered). + self.exec_thread.join() + self.node.destroy_node() rclpy.shutdown() def local_send_cb( diff --git a/rosbridge_library/test/internal/actions/test_actions.py b/rosbridge_library/test/internal/actions/test_actions.py index 2cd5beedc..b40337360 100755 --- a/rosbridge_library/test/internal/actions/test_actions.py +++ b/rosbridge_library/test/internal/actions/test_actions.py @@ -13,6 +13,7 @@ from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.internal import actions, message_conversion, ros_loader +from rosbridge_library.internal.executor_helpers import run_on_executor from rosbridge_library.internal.message_conversion import FieldTypeMismatchException if TYPE_CHECKING: @@ -33,8 +34,14 @@ def __init__(self, executor: Executor) -> None: self.executor = executor self.node = Node("action_tester") self.executor.add_node(self.node) - self.action_server = ActionServer( - self.node, Fibonacci, "get_fibonacci_sequence", self.execute_callback + # Run the ActionServer constructor on the executor thread to avoid the + # rclpy wait-set / entity-registration race that SIGSEGVs inside + # rclpy/action/server.py:__init__ when called from a worker thread. + self.action_server = run_on_executor( + self.node, + lambda: ActionServer( + self.node, Fibonacci, "get_fibonacci_sequence", self.execute_callback + ), ) def __del__(self) -> None: diff --git a/rosbridge_library/test/internal/services/test_stress_service_clients.py b/rosbridge_library/test/internal/services/test_stress_service_clients.py new file mode 100755 index 000000000..6af6349ac --- /dev/null +++ b/rosbridge_library/test/internal/services/test_stress_service_clients.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +""" +Stress tests for concurrent service-call client lifecycle. + +Each ``services.call_service`` invocation creates and destroys a ``rclpy.Client`` +on a node that the executor is also spinning. If those lifecycle operations are +not serialized against the executor, the wait-set rebuild can race with the +client teardown and leave a handle in an inconsistent state. The symptom is a +subsequent rclpy binding call raising +``TypeError: Object of type 'NoneType' is not an instance of 'capsule'`` on the +same node, which poisons further service calls for the lifetime of the process. + +These tests run many concurrent ``ServiceCaller`` threads against a real +service and then probe the executor to make sure it is still healthy. +""" + +from __future__ import annotations + +import threading +import time +import unittest +from typing import Any + +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rosbridge_library.internal import services + + +class TestStressServiceClients(unittest.TestCase): + """Hammer the service-call lifecycle from many worker threads.""" + + def setUp(self) -> None: + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_stress_service_clients") + self.node.declare_parameter("test_parameter", 1.0) + self.executor.add_node(self.node) + + self.executor_errors: list[BaseException] = [] + self._stop_spinning = threading.Event() + self.exec_thread = threading.Thread(target=self._spin_executor, daemon=True) + self.exec_thread.start() + + # Service the calls below will target. + self.service_name = self.node.get_name() + "/list_parameters" + + def _spin_executor(self) -> None: + while not self._stop_spinning.is_set(): + try: + self.executor.spin_once(timeout_sec=0.05) + except Exception as exc: # noqa: PERF203 - keep spinning + self.executor_errors.append(exc) + + def tearDown(self) -> None: + self._stop_spinning.set() + self.exec_thread.join(timeout=10) + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() + + def _assert_executor_healthy(self) -> None: + self.assertTrue(self.exec_thread.is_alive(), "Executor thread died") + self.assertEqual( + self.executor_errors, + [], + f"Executor raised {len(self.executor_errors)} error(s): {self.executor_errors[:5]}", + ) + + def _assert_executor_functional(self, msg: str = "Executor cannot process tasks") -> None: + done = threading.Event() + self.executor.create_task(done.set) + self.assertTrue(done.wait(timeout=5), msg) + + def _call_collecting_errors( + self, + errors: list[BaseException], + successes: list[dict[str, Any]], + ) -> None: + def success(json_: dict[str, Any]) -> None: + successes.append(json_) + + def error(exc: Exception) -> None: + errors.append(exc) + + caller = services.ServiceCaller( + self.service_name, + None, + 5.0, + success, + error, + self.node, + ) + caller.start() + caller.join(timeout=10) + + def test_many_parallel_service_calls(self) -> None: + """40 ServiceCallers run concurrently against the same service.""" + num_callers = 40 + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + + threads = [ + threading.Thread( + target=self._call_collecting_errors, args=(errors, successes), daemon=True + ) + for _ in range(num_callers) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=15) + + self.assertEqual(errors, [], f"Service callers raised: {errors[:5]}") + self.assertEqual( + len(successes), + num_callers, + f"Expected {num_callers} successes, got {len(successes)}", + ) + self._assert_executor_healthy() + self._assert_executor_functional("Executor died after parallel service calls") + + def test_barrier_synchronized_service_calls(self) -> None: + """20 ServiceCallers start at the same instant via a barrier.""" + num_callers = 20 + barrier = threading.Barrier(num_callers) + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + + def worker() -> None: + barrier.wait(timeout=10) + self._call_collecting_errors(errors, successes) + + threads = [threading.Thread(target=worker, daemon=True) for _ in range(num_callers)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=15) + + self.assertEqual(errors, [], f"Service callers raised: {errors[:5]}") + self.assertEqual(len(successes), num_callers) + self._assert_executor_healthy() + self._assert_executor_functional("Executor died after barrier-synced calls") + + def test_repeated_rapid_service_calls(self) -> None: + """100 sequential calls with no settle time exercise rapid create/destroy.""" + for iteration in range(100): + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + self._call_collecting_errors(errors, successes) + self.assertEqual(errors, [], f"Iteration {iteration} raised: {errors}") + self.assertEqual(len(successes), 1, f"Iteration {iteration} produced no response") + self._assert_executor_functional(f"Executor died at iteration {iteration}") + self._assert_executor_healthy() + + def test_interleaved_waves_of_service_calls(self) -> None: + """Each wave starts before the previous one has finished tearing down clients.""" + wave_size = 10 + waves = 10 + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + + prev_threads: list[threading.Thread] = [] + for _ in range(waves): + new_threads = [ + threading.Thread( + target=self._call_collecting_errors, + args=(errors, successes), + daemon=True, + ) + for _ in range(wave_size) + ] + for t in new_threads: + t.start() + # Wait briefly so the previous wave is mid-teardown when this one starts. + time.sleep(0.05) + for t in prev_threads: + t.join(timeout=10) + prev_threads = new_threads + + for t in prev_threads: + t.join(timeout=10) + + self.assertEqual(errors, [], f"Service callers raised: {errors[:5]}") + self.assertEqual(len(successes), wave_size * waves) + self._assert_executor_healthy() + self._assert_executor_functional("Executor died after interleaved waves") + + +if __name__ == "__main__": + unittest.main()