diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py index 30ea88309..85ef53563 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_action.py @@ -43,6 +43,7 @@ from rosbridge_library.capability import Capability from rosbridge_library.internal import message_conversion +from rosbridge_library.internal.executor_helpers import run_on_executor from rosbridge_library.internal.ros_loader import get_action_class from rosbridge_library.internal.type_support import ( ROSActionFeedbackT, @@ -75,17 +76,22 @@ def __init__(self, action_name: str, action_type: str, protocol: Protocol) -> No self.action_type = action_type self.protocol = protocol self._shutting_down = False - # setup the action - self.action_server = ActionServer[ - ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT - ]( + # Create the ActionServer on the executor thread; concurrent entity + # registration from a worker thread races with the executor's wait-set + # rebuild and can SIGSEGV inside rclpy/action/server.py:__init__. + self.action_server = run_on_executor( protocol.node_handle, - get_action_class(action_type), - action_name, - self.execute_callback, - goal_callback=self.goal_callback, - cancel_callback=self.cancel_callback, - callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 + lambda: ActionServer[ + ROSActionGoalT, ROSActionResultT, ROSActionFeedbackT, ROSActionImplT + ]( + protocol.node_handle, + get_action_class(action_type), + action_name, + self.execute_callback, + goal_callback=self.goal_callback, + cancel_callback=self.cancel_callback, + callback_group=ReentrantCallbackGroup(), # https://github.com/ros2/rclpy/issues/834#issuecomment-961331870 + ), ) def next_id(self) -> int: diff --git a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py index 3f9711e45..20a6d2a15 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py @@ -58,7 +58,7 @@ from ujson import dumps as encode_json # type: ignore[import-untyped] except ImportError: try: - from simplejson import dumps as encode_json # type: ignore[import-untyped] + from simplejson import dumps as encode_json # type: ignore[import-untyped, no-redef] except ImportError: from json import dumps as encode_json # type: ignore[assignment] diff --git a/rosbridge_library/src/rosbridge_library/internal/executor_helpers.py b/rosbridge_library/src/rosbridge_library/internal/executor_helpers.py new file mode 100644 index 000000000..d479bed87 --- /dev/null +++ b/rosbridge_library/src/rosbridge_library/internal/executor_helpers.py @@ -0,0 +1,109 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2026, PickNik Robotics, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of PickNik Robotics, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +""" +Helpers for serializing rclpy entity lifecycle with the node's executor. + +Creating or destroying entities (clients, action servers, subscriptions, +publishers, ...) on a node from a worker thread is not safe against a +``rclpy`` executor that is concurrently spinning the same node: the executor's +wait-set rebuild can race with the worker thread's entity registration or +teardown and leave the executor holding a handle whose underlying PyCapsule +has been freed. The symptom is either a SIGSEGV inside ``rclpy`` or, less +loudly, a later ``TypeError: Object of type 'NoneType' is not an instance of +'capsule'`` from the next binding call on the affected node. + +The remedy used throughout this package — and consistent with the +``IncomingQueue`` fix in #1183 — is to route the lifecycle call through +``executor.create_task(...)`` so it is serialized with the executor's own +work. This module centralizes that pattern. +""" + +from __future__ import annotations + +from threading import Event +from typing import TYPE_CHECKING, Any, TypeVar + +if TYPE_CHECKING: + from collections.abc import Callable + + from rclpy.node import Node + + +_T = TypeVar("_T") + + +def run_on_executor(node_handle: Node, fn: Callable[[], _T]) -> _T: + """ + Run ``fn()`` on the node's executor thread synchronously and return its result. + + Use this whenever a worker thread needs to create or destroy an rclpy + entity (``Client``, ``ActionServer``, ``Subscription``, ``Publisher``, + ...) on a node that an executor is concurrently spinning. + + If the node has no attached executor (rare; mostly unit tests), the + callable is invoked inline. Exceptions raised by ``fn`` are re-raised in + the calling thread. + """ + executor = node_handle.executor + if executor is None: + return fn() + + done = Event() + box: dict[str, Any] = {} + + def _wrapper() -> None: + try: + box["result"] = fn() + except BaseException as exc: + box["exc"] = exc + finally: + done.set() + + executor.create_task(_wrapper) + done.wait() + if "exc" in box: + raise box["exc"] + return box["result"] + + +def schedule_on_executor(node_handle: Node, fn: Callable[[], object]) -> None: + """ + Schedule ``fn()`` for execution on the node's executor thread. + + Fire-and-forget: the caller does not wait for completion. Falls back to + invoking ``fn`` inline if the node has no attached executor. + """ + executor = node_handle.executor + if executor is not None: + executor.create_task(fn) + else: + fn() diff --git a/rosbridge_library/src/rosbridge_library/internal/services.py b/rosbridge_library/src/rosbridge_library/internal/services.py index 4bb1f8dff..aa756d42a 100644 --- a/rosbridge_library/src/rosbridge_library/internal/services.py +++ b/rosbridge_library/src/rosbridge_library/internal/services.py @@ -36,6 +36,7 @@ from rclpy.callback_groups import ReentrantCallbackGroup +from rosbridge_library.internal.executor_helpers import run_on_executor, schedule_on_executor from rosbridge_library.internal.message_conversion import ( extract_values, populate_instance, @@ -128,6 +129,16 @@ def args_to_service_request_instance(inst: ROSMessage, args: list | dict[str, An populate_instance(msg, inst) +def _destroy_client_async(node_handle: Node, client: Client) -> None: + """ + Schedule destruction of ``client`` on the node's executor thread. + + Fire-and-forget; see :mod:`rosbridge_library.internal.executor_helpers` + for the underlying rationale. + """ + schedule_on_executor(node_handle, lambda: node_handle.destroy_client(client)) + + def call_service( node_handle: Node, service: str, @@ -155,12 +166,18 @@ def call_service( # Populate the instance with the provided args args_to_service_request_instance(inst, args) - client: Client = node_handle.create_client( - service_class, service, callback_group=ReentrantCallbackGroup() + # Create the client on the executor thread; concurrent client creation / + # destruction from worker threads races with the executor's wait-set + # rebuild and can leave handles in an inconsistent state. + client: Client = run_on_executor( + node_handle, + lambda: node_handle.create_client( + service_class, service, callback_group=ReentrantCallbackGroup() + ), ) if not client.wait_for_service(server_ready_timeout): - node_handle.destroy_client(client) + _destroy_client_async(node_handle, client) raise InvalidServiceException(service) future = client.call_async(inst) @@ -173,11 +190,11 @@ def future_done_callback() -> None: if not event.wait(timeout=(server_response_timeout if server_response_timeout > 0 else None)): future.cancel() - node_handle.destroy_client(client) + _destroy_client_async(node_handle, client) msg = "Timeout exceeded while waiting for service response" raise Exception(msg) - node_handle.destroy_client(client) + _destroy_client_async(node_handle, client) result = future.result() diff --git a/rosbridge_library/src/rosbridge_library/util/ros.py b/rosbridge_library/src/rosbridge_library/util/ros.py index b82c66af5..bc1b9e3d7 100644 --- a/rosbridge_library/src/rosbridge_library/util/ros.py +++ b/rosbridge_library/src/rosbridge_library/util/ros.py @@ -32,23 +32,51 @@ from __future__ import annotations +from threading import Event from typing import TYPE_CHECKING if TYPE_CHECKING: + from rclpy.executors import Executor from rclpy.node import Node def is_topic_published(node: Node, topic_name: str) -> bool: - """Check if a topic is published on a node.""" - published_topic_data = node.get_publisher_names_and_types_by_node( - node.get_name(), node.get_namespace() - ) - return any(topic[0] == topic_name for topic in published_topic_data) + """ + Check if a node has at least one publisher on the given topic. + + Reads ``node.publishers`` (the local entity list) rather than the rmw graph + cache. The local list is populated synchronously inside ``create_publisher`` + and cleared inside ``destroy_publisher``, so the result is deterministic + with respect to the calling thread — no DDS-discovery round-trip is + involved. + """ + return any(pub.topic_name == topic_name for pub in node.publishers) def is_topic_subscribed(node: Node, topic_name: str) -> bool: - """Check if a topic is subscribed to by a node.""" - subscribed_topic_data = node.get_subscriber_names_and_types_by_node( - node.get_name(), node.get_namespace() - ) - return any(topic[0] == topic_name for topic in subscribed_topic_data) + """ + Check if a node has at least one subscription on the given topic. + + Reads ``node.subscriptions`` (the local entity list) rather than the rmw + graph cache; see ``is_topic_published`` for why this matters. + """ + return any(sub.topic_name == topic_name for sub in node.subscriptions) + + +def wait_for_executor_idle(executor: Executor, timeout: float = 5.0) -> None: + """ + Block until all tasks already queued on ``executor`` have been processed. + + Used by tests that schedule work on the executor and then need to assert + on the post-condition from a different thread. Submits a no-op task and + waits for it to run: tasks are FIFO on ``SingleThreadedExecutor``, so when + the no-op completes every task enqueued before it has also completed. + + Raises ``TimeoutError`` if the executor does not drain within ``timeout`` + seconds, which usually means it is not being spun. + """ + done = Event() + executor.create_task(done.set) + if not done.wait(timeout=timeout): + msg = f"Executor did not become idle within {timeout}s" + raise TimeoutError(msg) diff --git a/rosbridge_library/test/capabilities/test_action_capabilities.py b/rosbridge_library/test/capabilities/test_action_capabilities.py index e35d7739d..042434840 100755 --- a/rosbridge_library/test/capabilities/test_action_capabilities.py +++ b/rosbridge_library/test/capabilities/test_action_capabilities.py @@ -57,6 +57,11 @@ def setUp(self) -> None: def tearDown(self) -> None: self.executor.remove_node(self.node) self.executor.shutdown() + # Join the spin thread so it does not outlive the test and block the + # process from exiting at the end of the suite (the action server + # SIGSEGV used to kill the process before this mattered). + self.exec_thread.join() + self.node.destroy_node() rclpy.shutdown() def local_send_cb( @@ -135,7 +140,7 @@ def test_execute_advertised_action(self) -> None: } ) ) - Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start() + Thread(target=self.send_goal.send_action_goal, args=(goal_msg,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -244,7 +249,7 @@ def test_cancel_advertised_action(self) -> None: } ) ) - Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start() + Thread(target=self.send_goal.send_action_goal, args=(goal_msg,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -335,7 +340,7 @@ def test_unadvertise_action(self) -> None: } ) ) - Thread(target=self.send_goal.send_action_goal, args=(goal_msg,)).start() + Thread(target=self.send_goal.send_action_goal, args=(goal_msg,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -381,7 +386,7 @@ def test_unadvertise_action(self) -> None: } ) ) - Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,)).start() + Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -410,7 +415,11 @@ def test_unadvertise_action(self) -> None: ) ) - Thread(target=self.send_goal.send_action_goal, args=(goal_msg_after_readvertise,)).start() + Thread( + target=self.send_goal.send_action_goal, + args=(goal_msg_after_readvertise,), + daemon=True, + ).start() start_time = time.monotonic() while self.received_message is None: diff --git a/rosbridge_library/test/capabilities/test_qos.py b/rosbridge_library/test/capabilities/test_qos.py index 78884418d..54a632e0d 100755 --- a/rosbridge_library/test/capabilities/test_qos.py +++ b/rosbridge_library/test/capabilities/test_qos.py @@ -3,7 +3,7 @@ import time import unittest -from threading import Thread +from threading import Event, Thread from typing import Any import rclpy @@ -170,14 +170,22 @@ def test_backward_compatibility_latch(self) -> None: self.assertEqual(qos_profile.durability, DurabilityPolicy.TRANSIENT_LOCAL) self.assertEqual(qos_profile.depth, 1) - # Late-joining subscriber should receive the latched message + # Late-joining subscriber should receive the latched message. Block on + # a threading.Event set in the callback instead of sleeping, so the + # test does not flake when DDS discovery + late-joining delivery + # overrun a short sleep on slower middleware setups. received: dict[str, Any] = {"msg": None} + msg_received = Event() def cb(msg: String) -> None: received["msg"] = msg + msg_received.set() self.node.create_subscription(String, topic, cb, qos_profile) - time.sleep(0.1) + self.assertTrue( + msg_received.wait(timeout=5.0), + "Late-joining subscriber did not receive latched message within 5 s", + ) self.assertEqual(received["msg"].data, msg["data"]) def test_publish_qos_works(self) -> None: diff --git a/rosbridge_library/test/capabilities/test_service_capabilities.py b/rosbridge_library/test/capabilities/test_service_capabilities.py index d19cf1942..df5192622 100755 --- a/rosbridge_library/test/capabilities/test_service_capabilities.py +++ b/rosbridge_library/test/capabilities/test_service_capabilities.py @@ -122,7 +122,7 @@ def test_call_advertised_service(self) -> None: } ) ) - Thread(target=self.call_service.call_service, args=(call_msg,)).start() + Thread(target=self.call_service.call_service, args=(call_msg,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -188,7 +188,7 @@ def test_call_advertised_service_with_timeout(self) -> None: } ) ) - Thread(target=self.call_service.call_service, args=(call_msg,)).start() + Thread(target=self.call_service.call_service, args=(call_msg,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -243,7 +243,7 @@ def test_unadvertise_with_live_request(self) -> None: ) ) self.received_message = None - Thread(target=self.call_service.call_service, args=(call_msg,)).start() + Thread(target=self.call_service.call_service, args=(call_msg,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: @@ -295,7 +295,7 @@ def test_unadvertise_with_live_request(self) -> None: } ) ) - Thread(target=self.call_service.call_service, args=(call_msg2,)).start() + Thread(target=self.call_service.call_service, args=(call_msg2,), daemon=True).start() start_time = time.monotonic() while self.received_message is None: diff --git a/rosbridge_library/test/internal/actions/test_actions.py b/rosbridge_library/test/internal/actions/test_actions.py index 2cd5beedc..b40337360 100755 --- a/rosbridge_library/test/internal/actions/test_actions.py +++ b/rosbridge_library/test/internal/actions/test_actions.py @@ -13,6 +13,7 @@ from rclpy.executors import SingleThreadedExecutor from rclpy.node import Node from rosbridge_library.internal import actions, message_conversion, ros_loader +from rosbridge_library.internal.executor_helpers import run_on_executor from rosbridge_library.internal.message_conversion import FieldTypeMismatchException if TYPE_CHECKING: @@ -33,8 +34,14 @@ def __init__(self, executor: Executor) -> None: self.executor = executor self.node = Node("action_tester") self.executor.add_node(self.node) - self.action_server = ActionServer( - self.node, Fibonacci, "get_fibonacci_sequence", self.execute_callback + # Run the ActionServer constructor on the executor thread to avoid the + # rclpy wait-set / entity-registration race that SIGSEGVs inside + # rclpy/action/server.py:__init__ when called from a worker thread. + self.action_server = run_on_executor( + self.node, + lambda: ActionServer( + self.node, Fibonacci, "get_fibonacci_sequence", self.execute_callback + ), ) def __del__(self) -> None: diff --git a/rosbridge_library/test/internal/publishers/test_publisher_manager.py b/rosbridge_library/test/internal/publishers/test_publisher_manager.py index 6bbcef500..90d49c359 100755 --- a/rosbridge_library/test/internal/publishers/test_publisher_manager.py +++ b/rosbridge_library/test/internal/publishers/test_publisher_manager.py @@ -170,7 +170,6 @@ def test_register_infer_topictype(self) -> None: durability=DurabilityPolicy.TRANSIENT_LOCAL, ) self.node.create_publisher(String, topic, publisher_qos) - time.sleep(0.1) self.assertTrue(is_topic_published(self.node, topic)) self.assertFalse(topic in manager._publishers) diff --git a/rosbridge_library/test/internal/services/test_stress_service_clients.py b/rosbridge_library/test/internal/services/test_stress_service_clients.py new file mode 100755 index 000000000..6af6349ac --- /dev/null +++ b/rosbridge_library/test/internal/services/test_stress_service_clients.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +""" +Stress tests for concurrent service-call client lifecycle. + +Each ``services.call_service`` invocation creates and destroys a ``rclpy.Client`` +on a node that the executor is also spinning. If those lifecycle operations are +not serialized against the executor, the wait-set rebuild can race with the +client teardown and leave a handle in an inconsistent state. The symptom is a +subsequent rclpy binding call raising +``TypeError: Object of type 'NoneType' is not an instance of 'capsule'`` on the +same node, which poisons further service calls for the lifetime of the process. + +These tests run many concurrent ``ServiceCaller`` threads against a real +service and then probe the executor to make sure it is still healthy. +""" + +from __future__ import annotations + +import threading +import time +import unittest +from typing import Any + +import rclpy +from rclpy.executors import SingleThreadedExecutor +from rclpy.node import Node +from rosbridge_library.internal import services + + +class TestStressServiceClients(unittest.TestCase): + """Hammer the service-call lifecycle from many worker threads.""" + + def setUp(self) -> None: + rclpy.init() + self.executor = SingleThreadedExecutor() + self.node = Node("test_stress_service_clients") + self.node.declare_parameter("test_parameter", 1.0) + self.executor.add_node(self.node) + + self.executor_errors: list[BaseException] = [] + self._stop_spinning = threading.Event() + self.exec_thread = threading.Thread(target=self._spin_executor, daemon=True) + self.exec_thread.start() + + # Service the calls below will target. + self.service_name = self.node.get_name() + "/list_parameters" + + def _spin_executor(self) -> None: + while not self._stop_spinning.is_set(): + try: + self.executor.spin_once(timeout_sec=0.05) + except Exception as exc: # noqa: PERF203 - keep spinning + self.executor_errors.append(exc) + + def tearDown(self) -> None: + self._stop_spinning.set() + self.exec_thread.join(timeout=10) + self.executor.remove_node(self.node) + self.node.destroy_node() + self.executor.shutdown() + rclpy.shutdown() + + def _assert_executor_healthy(self) -> None: + self.assertTrue(self.exec_thread.is_alive(), "Executor thread died") + self.assertEqual( + self.executor_errors, + [], + f"Executor raised {len(self.executor_errors)} error(s): {self.executor_errors[:5]}", + ) + + def _assert_executor_functional(self, msg: str = "Executor cannot process tasks") -> None: + done = threading.Event() + self.executor.create_task(done.set) + self.assertTrue(done.wait(timeout=5), msg) + + def _call_collecting_errors( + self, + errors: list[BaseException], + successes: list[dict[str, Any]], + ) -> None: + def success(json_: dict[str, Any]) -> None: + successes.append(json_) + + def error(exc: Exception) -> None: + errors.append(exc) + + caller = services.ServiceCaller( + self.service_name, + None, + 5.0, + success, + error, + self.node, + ) + caller.start() + caller.join(timeout=10) + + def test_many_parallel_service_calls(self) -> None: + """40 ServiceCallers run concurrently against the same service.""" + num_callers = 40 + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + + threads = [ + threading.Thread( + target=self._call_collecting_errors, args=(errors, successes), daemon=True + ) + for _ in range(num_callers) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=15) + + self.assertEqual(errors, [], f"Service callers raised: {errors[:5]}") + self.assertEqual( + len(successes), + num_callers, + f"Expected {num_callers} successes, got {len(successes)}", + ) + self._assert_executor_healthy() + self._assert_executor_functional("Executor died after parallel service calls") + + def test_barrier_synchronized_service_calls(self) -> None: + """20 ServiceCallers start at the same instant via a barrier.""" + num_callers = 20 + barrier = threading.Barrier(num_callers) + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + + def worker() -> None: + barrier.wait(timeout=10) + self._call_collecting_errors(errors, successes) + + threads = [threading.Thread(target=worker, daemon=True) for _ in range(num_callers)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=15) + + self.assertEqual(errors, [], f"Service callers raised: {errors[:5]}") + self.assertEqual(len(successes), num_callers) + self._assert_executor_healthy() + self._assert_executor_functional("Executor died after barrier-synced calls") + + def test_repeated_rapid_service_calls(self) -> None: + """100 sequential calls with no settle time exercise rapid create/destroy.""" + for iteration in range(100): + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + self._call_collecting_errors(errors, successes) + self.assertEqual(errors, [], f"Iteration {iteration} raised: {errors}") + self.assertEqual(len(successes), 1, f"Iteration {iteration} produced no response") + self._assert_executor_functional(f"Executor died at iteration {iteration}") + self._assert_executor_healthy() + + def test_interleaved_waves_of_service_calls(self) -> None: + """Each wave starts before the previous one has finished tearing down clients.""" + wave_size = 10 + waves = 10 + errors: list[BaseException] = [] + successes: list[dict[str, Any]] = [] + + prev_threads: list[threading.Thread] = [] + for _ in range(waves): + new_threads = [ + threading.Thread( + target=self._call_collecting_errors, + args=(errors, successes), + daemon=True, + ) + for _ in range(wave_size) + ] + for t in new_threads: + t.start() + # Wait briefly so the previous wave is mid-teardown when this one starts. + time.sleep(0.05) + for t in prev_threads: + t.join(timeout=10) + prev_threads = new_threads + + for t in prev_threads: + t.join(timeout=10) + + self.assertEqual(errors, [], f"Service callers raised: {errors[:5]}") + self.assertEqual(len(successes), wave_size * waves) + self._assert_executor_healthy() + self._assert_executor_functional("Executor died after interleaved waves") + + +if __name__ == "__main__": + unittest.main() diff --git a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py index 332583101..24c40c1c8 100755 --- a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py +++ b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py @@ -12,7 +12,7 @@ from rclpy.qos import DurabilityPolicy, QoSProfile from rosbridge_library.internal.subscribers import MultiSubscriber from rosbridge_library.internal.topics import TypeConflictException -from rosbridge_library.util.ros import is_topic_subscribed +from rosbridge_library.util.ros import is_topic_subscribed, wait_for_executor_idle from std_msgs.msg import Int32, String if TYPE_CHECKING: @@ -37,41 +37,30 @@ def tearDown(self) -> None: self.executor.shutdown() rclpy.shutdown() - def assert_topic_subscribed(self, topic: str, timeout: float = 1.0) -> None: - start_time = time.monotonic() - while not is_topic_subscribed(self.node, topic): - time.sleep(0.05) - if time.monotonic() - start_time > timeout: - self.fail(f"Timed out waiting for topic '{topic}' to be subscribed.") - - def assert_topic_not_subscribed(self, topic: str, timeout: float = 1.0) -> None: - start_time = time.monotonic() - while is_topic_subscribed(self.node, topic): - time.sleep(0.05) - if time.monotonic() - start_time > timeout: - self.fail(f"Timed out waiting for topic '{topic}' to be unsubscribed.") - def test_register_multisubscriber(self) -> None: """Register a subscriber on a clean topic with a good msg type.""" topic = "/test_register_multisubscriber" msg_type = "std_msgs/String" - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) MultiSubscriber(topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) def test_unregister_multisubscriber(self) -> None: """Register and unregister a subscriber on a clean topic with a good msg type.""" topic = "/test_unregister_multisubscriber" msg_type = "std_msgs/String" - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) multi: MultiSubscriber[String] = MultiSubscriber( topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type ) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) multi.unregister() - self.assert_topic_not_subscribed(topic) + # MultiSubscriber.unregister schedules destroy_subscription on the executor; + # drain the executor before asserting the entity is gone. + wait_for_executor_idle(self.executor) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_verify_type(self) -> None: topic = "/test_verify_type" @@ -101,11 +90,11 @@ def test_subscribe_unsubscribe(self) -> None: topic = "/test_subscribe_unsubscribe" msg_type = "std_msgs/String" - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) multi: MultiSubscriber[String] = MultiSubscriber( topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type ) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) self.assertEqual(len(multi.new_subscriptions), 0) multi.subscribe(self.client_id, lambda _: None) @@ -115,7 +104,8 @@ def test_subscribe_unsubscribe(self) -> None: self.assertEqual(len(multi.new_subscriptions), 0) multi.unregister() - self.assert_topic_not_subscribed(topic) + wait_for_executor_idle(self.executor) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_subscribe_receive_json(self) -> None: topic = "/test_subscribe_receive_json" diff --git a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py index 4f623efd8..a8364b596 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py +++ b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py @@ -15,7 +15,7 @@ TopicNotEstablishedException, TypeConflictException, ) -from rosbridge_library.util.ros import is_topic_subscribed +from rosbridge_library.util.ros import is_topic_subscribed, wait_for_executor_idle from std_msgs.msg import String if TYPE_CHECKING: @@ -38,20 +38,6 @@ def tearDown(self) -> None: self.executor.shutdown() rclpy.shutdown() - def assert_topic_subscribed(self, topic: str, timeout: float = 1.0) -> None: - start_time = time.monotonic() - while not is_topic_subscribed(self.node, topic): - time.sleep(0.05) - if time.monotonic() - start_time > timeout: - self.fail(f"Timed out waiting for topic '{topic}' to be subscribed.") - - def assert_topic_not_subscribed(self, topic: str, timeout: float = 1.0) -> None: - start_time = time.monotonic() - while is_topic_subscribed(self.node, topic): - time.sleep(0.05) - if time.monotonic() - start_time > timeout: - self.fail(f"Timed out waiting for topic '{topic}' to be unsubscribed.") - def test_subscribe(self) -> None: """Register a publisher on a clean topic with a good msg type.""" topic = "/test_subscribe" @@ -59,14 +45,17 @@ def test_subscribe(self) -> None: client = "client_test_subscribe" self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) manager.subscribe(client, topic, lambda _: None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client, topic) + # manager.unsubscribe schedules destroy_subscription on the executor; + # drain the executor before asserting the entity is gone. + wait_for_executor_idle(self.executor) self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_register_subscriber_multiclient(self) -> None: topic = "/test_register_subscriber_multiclient" @@ -75,22 +64,23 @@ def test_register_subscriber_multiclient(self) -> None: client2 = "client_test_register_subscriber_multiclient_2" self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) manager.subscribe(client1, topic, lambda _: None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.subscribe(client2, topic, lambda _: None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client1, topic) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client2, topic) + wait_for_executor_idle(self.executor) self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_register_publisher_conflicting_types(self) -> None: topic = "/test_register_publisher_conflicting_types" @@ -99,10 +89,10 @@ def test_register_publisher_conflicting_types(self) -> None: client = "client_test_register_publisher_conflicting_types" self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) manager.subscribe(client, topic, lambda _: None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) self.assertRaises( TypeConflictException, @@ -122,39 +112,41 @@ def test_register_multiple_publishers(self) -> None: self.assertFalse(topic1 in manager._subscribers) self.assertFalse(topic2 in manager._subscribers) - self.assert_topic_not_subscribed(topic1) - self.assert_topic_not_subscribed(topic2) + self.assertFalse(is_topic_subscribed(self.node, topic1)) + self.assertFalse(is_topic_subscribed(self.node, topic2)) manager.subscribe(client, topic1, lambda _: None, self.node, msg_type) self.assertTrue(topic1 in manager._subscribers) - self.assert_topic_subscribed(topic1) + self.assertTrue(is_topic_subscribed(self.node, topic1)) self.assertFalse(topic2 in manager._subscribers) - self.assert_topic_not_subscribed(topic2) + self.assertFalse(is_topic_subscribed(self.node, topic2)) manager.subscribe(client, topic2, lambda _: None, self.node, msg_type) self.assertTrue(topic1 in manager._subscribers) - self.assert_topic_subscribed(topic1) + self.assertTrue(is_topic_subscribed(self.node, topic1)) self.assertTrue(topic2 in manager._subscribers) - self.assert_topic_subscribed(topic2) + self.assertTrue(is_topic_subscribed(self.node, topic2)) manager.unsubscribe(client, topic1) + wait_for_executor_idle(self.executor) self.assertFalse(topic1 in manager._subscribers) - self.assert_topic_not_subscribed(topic1) + self.assertFalse(is_topic_subscribed(self.node, topic1)) self.assertTrue(topic2 in manager._subscribers) - self.assert_topic_subscribed(topic2) + self.assertTrue(is_topic_subscribed(self.node, topic2)) manager.unsubscribe(client, topic2) + wait_for_executor_idle(self.executor) self.assertFalse(topic1 in manager._subscribers) - self.assert_topic_not_subscribed(topic1) + self.assertFalse(is_topic_subscribed(self.node, topic1)) self.assertFalse(topic2 in manager._subscribers) - self.assert_topic_not_subscribed(topic2) + self.assertFalse(is_topic_subscribed(self.node, topic2)) def test_register_no_msgtype(self) -> None: topic = "/test_register_no_msgtype" client = "client_test_register_no_msgtype" self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) self.assertRaises( TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node ) @@ -163,7 +155,7 @@ def test_register_infer_topictype(self) -> None: topic = "/test_register_infer_topictype" client = "client_test_register_infer_topictype" - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) subscriber_qos = QoSProfile( depth=10, @@ -171,16 +163,18 @@ def test_register_infer_topictype(self) -> None: ) self.node.create_subscription(String, topic, lambda *_args: None, subscriber_qos) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) self.assertFalse(topic in manager._subscribers) manager.subscribe(client, topic, lambda _: None, self.node) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client, topic) + wait_for_executor_idle(self.executor) self.assertFalse(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + # The direct create_subscription above still holds the topic. + self.assertTrue(is_topic_subscribed(self.node, topic)) def test_register_multiple_notopictype(self) -> None: topic = "/test_register_multiple_notopictype" @@ -189,30 +183,31 @@ def test_register_multiple_notopictype(self) -> None: client2 = "client_test_register_multiple_notopictype_2" self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) manager.subscribe(client1, topic, lambda _: None, self.node, msg_type) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.subscribe(client2, topic, lambda _: None, self.node) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client1, topic) self.assertTrue(topic in manager._subscribers) - self.assert_topic_subscribed(topic) + self.assertTrue(is_topic_subscribed(self.node, topic)) manager.unsubscribe(client2, topic) + wait_for_executor_idle(self.executor) self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) def test_subscribe_not_registered(self) -> None: topic = "/test_subscribe_not_registered" client = "client_test_subscribe_not_registered" self.assertFalse(topic in manager._subscribers) - self.assert_topic_not_subscribed(topic) + self.assertFalse(is_topic_subscribed(self.node, topic)) self.assertRaises( TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node ) diff --git a/rosbridge_server/src/rosbridge_server/websocket_handler.py b/rosbridge_server/src/rosbridge_server/websocket_handler.py index 31fd59604..03e0245e3 100644 --- a/rosbridge_server/src/rosbridge_server/websocket_handler.py +++ b/rosbridge_server/src/rosbridge_server/websocket_handler.py @@ -40,7 +40,7 @@ from asyncio.events import AbstractEventLoop from collections import deque from functools import wraps -from typing import TYPE_CHECKING, ClassVar, ParamSpec, TypeVar +from typing import TYPE_CHECKING, Any, ClassVar, ParamSpec, TypeVar from rclpy.node import Node from rosbridge_library.rosbridge_protocol import RosbridgeProtocol @@ -56,7 +56,9 @@ def _log_exception() -> None: """Log the most recent exception to ROS.""" exc = traceback.format_exception(*sys.exc_info()) - RosbridgeWebSocket.node_handle.get_logger().error("".join(exc)) + node_handle = RosbridgeWebSocket.node_handle + assert isinstance(node_handle, Node), "Node handle was not set" + node_handle.get_logger().error("".join(exc)) P = ParamSpec("P") @@ -140,7 +142,7 @@ class RosbridgeWebSocket(WebSocketHandler): node_handle: ClassVar[Node | None] = None # Parameters to pass to RosbridgeProtocol when opening a connection - protocol_parameters: ClassVar = {} + protocol_parameters: ClassVar[dict[str, Any]] = {} # Parameters for the WebSocket handler use_compression: ClassVar[bool] = False @@ -164,7 +166,7 @@ def open(self, *args: str, **kwargs: str) -> None: # noqa: ARG002 self.set_nodelay(True) cls.clients_connected += 1 if cls.client_manager: - cls.client_manager.add_client(self.client_id, self.request.remote_ip) + cls.client_manager.add_client(self.client_id, self.request.remote_ip or "") except Exception as exc: cls.node_handle.get_logger().error( f"Unable to accept incoming connection. Reason: {exc}" @@ -191,7 +193,7 @@ def on_close(self) -> None: cls.clients_connected -= 1 if cls.client_manager: - cls.client_manager.remove_client(self.client_id, self.request.remote_ip) + cls.client_manager.remove_client(self.client_id, self.request.remote_ip or "") cls.node_handle.get_logger().info( f"Client disconnected. {cls.clients_connected} clients total." )