From 3b4cb749d908788993ea3d73fac0b5a84957934b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 14:54:30 +0000 Subject: [PATCH 1/7] Add error logging when service is unadvertised during in-flight request --- .../src/rosbridge_library/capabilities/advertise_service.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index ee5a09a44..068d1744b 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -66,6 +66,12 @@ async def handle_request( result = await future assert result is not None, "Service response cannot be None" return result + except Exception as e: + self.protocol.log( + "error", + f"Error while waiting for response to service request with id {request_id}: {e}", + ) + raise e finally: del self.request_futures[request_id] From 261d24ea457b178091d25cc8f3b0219da82a1eca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 14:55:22 +0000 Subject: [PATCH 2/7] Fix service destruction method in AdvertisedServiceHandler --- .../src/rosbridge_library/capabilities/advertise_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index 068d1744b..060095fc0 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -106,7 +106,7 @@ def graceful_shutdown(self) -> None: for future_id in self.request_futures: future = self.request_futures[future_id] future.set_exception(RuntimeError(f"Service {self.service_name} was unadvertised")) - self.service_handle.destroy() + self.protocol.node_handle.destroy_service(self.service_handle) class AdvertiseService(Capability): From e4783b632bdab792896b02b2f9f503227a35d6d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 14:56:41 +0000 Subject: [PATCH 3/7] Fix unadvertise service test --- .../capabilities/test_service_capabilities.py | 60 +++++++++++++++---- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/rosbridge_library/test/capabilities/test_service_capabilities.py b/rosbridge_library/test/capabilities/test_service_capabilities.py index c8bcb7695..d19cf1942 100755 --- a/rosbridge_library/test/capabilities/test_service_capabilities.py +++ b/rosbridge_library/test/capabilities/test_service_capabilities.py @@ -27,7 +27,7 @@ def setUp(self) -> None: protocol_parameters = { "call_services_in_new_thread": False, - "default_call_service_timeout": 5.0, + "default_call_service_timeout": 0.5, "send_action_goals_in_new_thread": False, } @@ -257,20 +257,60 @@ def test_unadvertise_with_live_request(self) -> None: self.assertTrue("id" in self.received_message) # Now unadvertise the service - # TODO: This raises an exception, likely because of the following rclpy issue: - # https://github.com/ros2/rclpy/issues/1098 unadvertise_msg = loads(dumps({"op": "unadvertise_service", "service": service_path})) self.received_message = None self.unadvertise.unadvertise_service(unadvertise_msg) - with self.assertRaises(RuntimeError) as context: - start_time = time.monotonic() - while self.received_message is None: - rclpy.spin_once(self.node, timeout_sec=0.1) - if time.monotonic() - start_time > 0.3: - self.fail("Timed out waiting for unadvertise service message.") + # The in-flight request should time out and return a service_response (result False) + # TODO: Ideally, the in-flight request should be cancelled immediately and return a + # response indicating the service was unadvertised. Right now, the service handler from + # advertise_service does not differentiate between internal and remote service calls and + # rclpy does not provide a way to cancel an in-flight service request, so we just let the + # request time out on the client side and return a timeout response. + self.received_message = None + start_time = time.monotonic() + while self.received_message is None: + rclpy.spin_once(self.node, timeout_sec=0.1) + if time.monotonic() - start_time > 1.0: + self.fail( + "Timed out waiting for service_response after unadvertise (expected timeout response)." + ) + + self.assertEqual(self.received_message["op"], "service_response") + self.assertFalse(self.received_message["result"]) + self.assertEqual( + self.received_message["values"], + "Timeout exceeded while waiting for service response", + ) - self.assertTrue(f"Service {service_path} was unadvertised" in context.exception) + # Subsequent calls should be rejected immediately + self.received_message = None + call_msg2 = loads( + dumps( + { + "op": "call_service", + "id": "bar", + "service": service_path, + "args": {"data": False}, + } + ) + ) + Thread(target=self.call_service.call_service, args=(call_msg2,)).start() + + start_time = time.monotonic() + while self.received_message is None: + rclpy.spin_once(self.node, timeout_sec=0.1) + if time.monotonic() - start_time > 0.5: + self.fail( + "Timed out waiting for immediate rejection of call to unadvertised service." + ) + + self.assertEqual(self.received_message["op"], "service_response") + self.assertFalse(self.received_message["result"]) + self.assertEqual( + self.received_message["values"], + f"Service {service_path} does not exist", + ) if __name__ == "__main__": From e08d0b63de661094a0cde4e1c87886376c75b8b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 16:34:59 +0000 Subject: [PATCH 4/7] Try to fix failing topic subscription assertions by adding a timeout --- .../subscribers/test_multi_subscriber.py | 35 ++++--- .../subscribers/test_subscriber_manager.py | 91 +++++++++---------- 2 files changed, 67 insertions(+), 59 deletions(-) diff --git a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py index 60ab0146c..1d91c09b1 100755 --- a/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py +++ b/rosbridge_library/test/internal/subscribers/test_multi_subscriber.py @@ -37,30 +37,41 @@ def tearDown(self) -> None: self.executor.shutdown() rclpy.shutdown() + def assertTopicSubscribed(self, topic: str, timeout: float = 1.0) -> None: + start_time = time.monotonic() + while not is_topic_subscribed(self.node, topic): + time.sleep(0.05) + if time.monotonic() - start_time > timeout: + self.fail(f"Timed out waiting for topic '{topic}' to be subscribed.") + + def assertTopicNotSubscribed(self, topic: str, timeout: float = 1.0) -> None: + start_time = time.monotonic() + while is_topic_subscribed(self.node, topic): + time.sleep(0.05) + if time.monotonic() - start_time > timeout: + self.fail(f"Timed out waiting for topic '{topic}' to be unsubscribed.") + def test_register_multisubscriber(self) -> None: """Register a subscriber on a clean topic with a good msg type.""" topic = "/test_register_multisubscriber" msg_type = "std_msgs/String" - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) MultiSubscriber(topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type) - time.sleep(0.05) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) def test_unregister_multisubscriber(self) -> None: """Register and unregister a subscriber on a clean topic with a good msg type.""" topic = "/test_unregister_multisubscriber" msg_type = "std_msgs/String" - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) multi: MultiSubscriber[String] = MultiSubscriber( topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type ) - time.sleep(0.05) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) multi.unregister() - time.sleep(0.05) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) def test_verify_type(self) -> None: topic = "/test_verify_type" @@ -90,12 +101,11 @@ def test_subscribe_unsubscribe(self) -> None: topic = "/test_subscribe_unsubscribe" msg_type = "std_msgs/String" - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) multi: MultiSubscriber[String] = MultiSubscriber( topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type ) - time.sleep(0.05) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) self.assertEqual(len(multi.new_subscriptions), 0) multi.subscribe(self.client_id, lambda _: None) @@ -105,8 +115,7 @@ def test_subscribe_unsubscribe(self) -> None: self.assertEqual(len(multi.new_subscriptions), 0) multi.unregister() - time.sleep(0.1) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) def test_subscribe_receive_json(self) -> None: topic = "/test_subscribe_receive_json" diff --git a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py index 0932250aa..0cbc98921 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py +++ b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py @@ -38,6 +38,20 @@ def tearDown(self) -> None: self.executor.shutdown() rclpy.shutdown() + def assertTopicSubscribed(self, topic: str, timeout: float = 1.0) -> None: + start_time = time.monotonic() + while not is_topic_subscribed(self.node, topic): + time.sleep(0.05) + if time.monotonic() - start_time > timeout: + self.fail(f"Timed out waiting for topic '{topic}' to be subscribed.") + + def assertTopicNotSubscribed(self, topic: str, timeout: float = 1.0) -> None: + start_time = time.monotonic() + while is_topic_subscribed(self.node, topic): + time.sleep(0.05) + if time.monotonic() - start_time > timeout: + self.fail(f"Timed out waiting for topic '{topic}' to be unsubscribed.") + def test_subscribe(self) -> None: """Register a publisher on a clean topic with a good msg type.""" topic = "/test_subscribe" @@ -45,16 +59,16 @@ def test_subscribe(self) -> None: client = "client_test_subscribe" self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) manager.subscribe(client, topic, lambda _: None, self.node, msg_type) time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.unsubscribe(client, topic) time.sleep(0.05) self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) def test_register_subscriber_multiclient(self) -> None: topic = "/test_register_subscriber_multiclient" @@ -63,26 +77,22 @@ def test_register_subscriber_multiclient(self) -> None: client2 = "client_test_register_subscriber_multiclient_2" self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) manager.subscribe(client1, topic, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.subscribe(client2, topic, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.unsubscribe(client1, topic) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.unsubscribe(client2, topic) - time.sleep(0.05) self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) def test_register_publisher_conflicting_types(self) -> None: topic = "/test_register_publisher_conflicting_types" @@ -91,11 +101,10 @@ def test_register_publisher_conflicting_types(self) -> None: client = "client_test_register_publisher_conflicting_types" self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) manager.subscribe(client, topic, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) self.assertRaises( TypeConflictException, @@ -115,43 +124,39 @@ def test_register_multiple_publishers(self) -> None: self.assertFalse(topic1 in manager._subscribers) self.assertFalse(topic2 in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic1)) - self.assertFalse(is_topic_subscribed(self.node, topic2)) + self.assertTopicNotSubscribed(topic1) + self.assertTopicNotSubscribed(topic2) manager.subscribe(client, topic1, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic1 in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic1)) + self.assertTopicSubscribed(topic1) self.assertFalse(topic2 in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic2)) + self.assertTopicNotSubscribed(topic2) manager.subscribe(client, topic2, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic1 in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic1)) + self.assertTopicSubscribed(topic1) self.assertTrue(topic2 in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic2)) + self.assertTopicSubscribed(topic2) manager.unsubscribe(client, topic1) - time.sleep(0.05) self.assertFalse(topic1 in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic1)) + self.assertTopicNotSubscribed(topic1) self.assertTrue(topic2 in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic2)) + self.assertTopicSubscribed(topic2) manager.unsubscribe(client, topic2) - time.sleep(0.05) self.assertFalse(topic1 in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic1)) + self.assertTopicNotSubscribed(topic1) self.assertFalse(topic2 in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic2)) + self.assertTopicNotSubscribed(topic2) def test_register_no_msgtype(self) -> None: topic = "/test_register_no_msgtype" client = "client_test_register_no_msgtype" self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) self.assertRaises( TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node ) @@ -160,7 +165,7 @@ def test_register_infer_topictype(self) -> None: topic = "/test_register_infer_topictype" client = "client_test_register_infer_topictype" - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) subscriber_qos = QoSProfile( depth=10, @@ -168,18 +173,16 @@ def test_register_infer_topictype(self) -> None: ) self.node.create_subscription(String, topic, lambda *_args: None, subscriber_qos) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) self.assertFalse(topic in manager._subscribers) manager.subscribe(client, topic, lambda _: None, self.node) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.unsubscribe(client, topic) - time.sleep(0.05) self.assertFalse(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) def test_register_multiple_notopictype(self) -> None: topic = "/test_register_multiple_notopictype" @@ -188,34 +191,30 @@ def test_register_multiple_notopictype(self) -> None: client2 = "client_test_register_multiple_notopictype_2" self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) manager.subscribe(client1, topic, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.subscribe(client2, topic, lambda _: None, self.node) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.unsubscribe(client1, topic) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) - self.assertTrue(is_topic_subscribed(self.node, topic)) + self.assertTopicSubscribed(topic) manager.unsubscribe(client2, topic) - time.sleep(0.05) self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) def test_subscribe_not_registered(self) -> None: topic = "/test_subscribe_not_registered" client = "client_test_subscribe_not_registered" self.assertFalse(topic in manager._subscribers) - self.assertFalse(is_topic_subscribed(self.node, topic)) + self.assertTopicNotSubscribed(topic) self.assertRaises( TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node ) From d42a850917fe039782dda7393f7d0e8ea97f2c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 16:56:06 +0000 Subject: [PATCH 5/7] Remove redundant sleeps --- .../test/internal/subscribers/test_subscriber_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py index 0cbc98921..8004a7c59 100755 --- a/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py +++ b/rosbridge_library/test/internal/subscribers/test_subscriber_manager.py @@ -61,12 +61,10 @@ def test_subscribe(self) -> None: self.assertFalse(topic in manager._subscribers) self.assertTopicNotSubscribed(topic) manager.subscribe(client, topic, lambda _: None, self.node, msg_type) - time.sleep(0.05) self.assertTrue(topic in manager._subscribers) self.assertTopicSubscribed(topic) manager.unsubscribe(client, topic) - time.sleep(0.05) self.assertFalse(topic in manager._subscribers) self.assertTopicNotSubscribed(topic) From 6bf451587acafb5570b8644ca967c3697de16ad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 16:57:21 +0000 Subject: [PATCH 6/7] Remove extra call to destroy service after graceful shutdown --- .../src/rosbridge_library/capabilities/unadvertise_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py index ab099aa72..a3bbbbef0 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/unadvertise_service.py @@ -59,7 +59,6 @@ def unadvertise_service(self, message: dict[str, Any]) -> None: # unregister service in ROS if service_name in self.protocol.external_service_list: self.protocol.external_service_list[service_name].graceful_shutdown() - self.protocol.external_service_list[service_name].service_handle.destroy() del self.protocol.external_service_list[service_name] self.protocol.log("info", f"Unadvertised service {service_name}") else: From 326def01a7954f2aa44fa486dde5c850a6d207ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82a=C5=BCej=20Sowa?= Date: Wed, 11 Feb 2026 16:58:08 +0000 Subject: [PATCH 7/7] Re-raise original exception in handle_request --- .../src/rosbridge_library/capabilities/advertise_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py index 060095fc0..8f62ce50d 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/advertise_service.py @@ -71,7 +71,7 @@ async def handle_request( "error", f"Error while waiting for response to service request with id {request_id}: {e}", ) - raise e + raise finally: del self.request_futures[request_id]