Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from rosbridge_library.capability import Capability
from rosbridge_library.internal import message_conversion
from rosbridge_library.internal.executor_helpers import run_on_executor
from rosbridge_library.internal.ros_loader import get_action_class
from rosbridge_library.internal.type_support import (
ROSActionFeedbackT,
Expand Down Expand Up @@ -75,17 +76,22 @@ def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> No
self.action_type = action_type
self.protocol = protocol
self._shutting_down = False
# setup the action
self.action_server = ActionServer[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
](
# Create the ActionServer on the executor thread; concurrent entity
# registration from a worker thread races with the executor's wait-set
# rebuild and can SIGSEGV inside rclpy/action/server.py:__init__.
self.action_server = run_on_executor(
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
lambda: ActionServer[
ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT
](
protocol.node_handle,
get_action_class(action_type),
action_name,
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870
),
)

def next_id(self) -> int:
Expand Down
109 changes: 109 additions & 0 deletions rosbridge_library/src/rosbridge_library/internal/executor_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2026, PickNik Robotics, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of PickNik Robotics, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Helpers for serializing rclpy entity lifecycle with the node's executor.

Creating or destroying entities (clients, action servers, subscriptions,
publishers, ...) on a node from a worker thread is not safe against a
``rclpy`` executor that is concurrently spinning the same node: the executor's
wait-set rebuild can race with the worker thread's entity registration or
teardown and leave the executor holding a handle whose underlying PyCapsule
has been freed. The symptom is either a SIGSEGV inside ``rclpy`` or, less
loudly, a later ``TypeError: Object of type 'NoneType' is not an instance of
'capsule'`` from the next binding call on the affected node.

The remedy used throughout this package — and consistent with the
``IncomingQueue`` fix in #1183 — is to route the lifecycle call through
``executor.create_task(...)`` so it is serialized with the executor's own
work. This module centralizes that pattern.
"""

from __future__ import annotations

from threading import Event
from typing import TYPE_CHECKING, Any, TypeVar

if TYPE_CHECKING:
from collections.abc import Callable

from rclpy.node import Node


_T = TypeVar("_T")


def run_on_executor(node_handle: Node, fn: Callable[[], _T]) -> _T:
"""
Run ``fn()`` on the node's executor thread synchronously and return its result.

Use this whenever a worker thread needs to create or destroy an rclpy
entity (``Client``, ``ActionServer``, ``Subscription``, ``Publisher``,
...) on a node that an executor is concurrently spinning.

If the node has no attached executor (rare; mostly unit tests), the
callable is invoked inline. Exceptions raised by ``fn`` are re-raised in
the calling thread.
"""
executor = node_handle.executor
if executor is None:
return fn()

done = Event()
box: dict[str, Any] = {}

def _wrapper() -> None:
try:
box["result"] = fn()
except BaseException as exc:
box["exc"] = exc
finally:
done.set()

executor.create_task(_wrapper)
done.wait()
if "exc" in box:
raise box["exc"]
return box["result"]


def schedule_on_executor(node_handle: Node, fn: Callable[[], object]) -> None:
"""
Schedule ``fn()`` for execution on the node's executor thread.

Fire-and-forget: the caller does not wait for completion. Falls back to
invoking ``fn`` inline if the node has no attached executor.
"""
executor = node_handle.executor
if executor is not None:
executor.create_task(fn)
else:
fn()
27 changes: 22 additions & 5 deletions rosbridge_library/src/rosbridge_library/internal/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

from rclpy.callback_groups import ReentrantCallbackGroup

from rosbridge_library.internal.executor_helpers import run_on_executor, schedule_on_executor
from rosbridge_library.internal.message_conversion import (
extract_values,
populate_instance,
Expand Down Expand Up @@ -128,6 +129,16 @@ def args_to_service_request_instance(inst: ROSMessage, args: list | dict[str, An
populate_instance(msg, inst)


def _destroy_client_async(node_handle: Node, client: Client) -> None:
"""
Schedule destruction of ``client`` on the node's executor thread.

Fire-and-forget; see :mod:`rosbridge_library.internal.executor_helpers`
for the underlying rationale.
"""
schedule_on_executor(node_handle, lambda: node_handle.destroy_client(client))


def call_service(
node_handle: Node,
service: str,
Expand Down Expand Up @@ -155,12 +166,18 @@ def call_service(
# Populate the instance with the provided args
args_to_service_request_instance(inst, args)

client: Client = node_handle.create_client(
service_class, service, callback_group=ReentrantCallbackGroup()
# Create the client on the executor thread; concurrent client creation /
# destruction from worker threads races with the executor's wait-set
# rebuild and can leave handles in an inconsistent state.
client: Client = run_on_executor(
node_handle,
lambda: node_handle.create_client(
service_class, service, callback_group=ReentrantCallbackGroup()
),
)

if not client.wait_for_service(server_ready_timeout):
node_handle.destroy_client(client)
_destroy_client_async(node_handle, client)
raise InvalidServiceException(service)

future = client.call_async(inst)
Expand All @@ -173,11 +190,11 @@ def future_done_callback() -> None:

if not event.wait(timeout=(server_response_timeout if server_response_timeout > 0 else None)):
future.cancel()
node_handle.destroy_client(client)
_destroy_client_async(node_handle, client)
msg = "Timeout exceeded while waiting for service response"
raise Exception(msg)

node_handle.destroy_client(client)
_destroy_client_async(node_handle, client)

result = future.result()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def setUp(self) -> None:
def tearDown(self) -> None:
self.executor.remove_node(self.node)
self.executor.shutdown()
# Join the spin thread so it does not outlive the test and block the
# process from exiting at the end of the suite (the action server
# SIGSEGV used to kill the process before this mattered).
self.exec_thread.join()
self.node.destroy_node()
rclpy.shutdown()

def local_send_cb(
Expand Down
11 changes: 9 additions & 2 deletions rosbridge_library/test/internal/actions/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from rosbridge_library.internal import actions, message_conversion, ros_loader
from rosbridge_library.internal.executor_helpers import run_on_executor
from rosbridge_library.internal.message_conversion import FieldTypeMismatchException

if TYPE_CHECKING:
Expand All @@ -33,8 +34,14 @@ def __init__(self, executor: Executor) -> None:
self.executor = executor
self.node = Node("action_tester")
self.executor.add_node(self.node)
self.action_server = ActionServer(
self.node, Fibonacci, "get_fibonacci_sequence", self.execute_callback
# Run the ActionServer constructor on the executor thread to avoid the
# rclpy wait-set / entity-registration race that SIGSEGVs inside
# rclpy/action/server.py:__init__ when called from a worker thread.
self.action_server = run_on_executor(
self.node,
lambda: ActionServer(
self.node, Fibonacci, "get_fibonacci_sequence", self.execute_callback
),
)

def __del__(self) -> None:
Expand Down
Loading
Loading