Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ async def handle_request(
result = await future
assert result is not None, "Service response cannot be None"
return result
except Exception as e:
self.protocol.log(
"error",
f"Error while waiting for response to service request with id {request_id}: {e}",
)
raise e
Comment thread
bjsowa marked this conversation as resolved.
Outdated
finally:
del self.request_futures[request_id]

Expand Down Expand Up @@ -100,7 +106,7 @@ def graceful_shutdown(self) -> None:
for future_id in self.request_futures:
future = self.request_futures[future_id]
future.set_exception(RuntimeError(f"Service {self.service_name} was unadvertised"))
self.service_handle.destroy()
self.protocol.node_handle.destroy_service(self.service_handle)

Comment thread
bjsowa marked this conversation as resolved.

class AdvertiseService(Capability):
Expand Down
60 changes: 50 additions & 10 deletions rosbridge_library/test/capabilities/test_service_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def setUp(self) -> None:

protocol_parameters = {
"call_services_in_new_thread": False,
"default_call_service_timeout": 5.0,
"default_call_service_timeout": 0.5,
"send_action_goals_in_new_thread": False,
}

Expand Down Expand Up @@ -257,20 +257,60 @@ def test_unadvertise_with_live_request(self) -> None:
self.assertTrue("id" in self.received_message)

# Now unadvertise the service
# TODO: This raises an exception, likely because of the following rclpy issue:
# https://github.com/ros2/rclpy/issues/1098
unadvertise_msg = loads(dumps({"op": "unadvertise_service", "service": service_path}))
self.received_message = None
self.unadvertise.unadvertise_service(unadvertise_msg)

with self.assertRaises(RuntimeError) as context:
start_time = time.monotonic()
while self.received_message is None:
rclpy.spin_once(self.node, timeout_sec=0.1)
if time.monotonic() - start_time > 0.3:
self.fail("Timed out waiting for unadvertise service message.")
# The in-flight request should time out and return a service_response (result False)
# TODO: Ideally, the in-flight request should be cancelled immediately and return a
# response indicating the service was unadvertised. Right now, the service handler from
# advertise_service does not differentiate between internal and remote service calls and
# rclpy does not provide a way to cancel an in-flight service request, so we just let the
# request time out on the client side and return a timeout response.
self.received_message = None
start_time = time.monotonic()
while self.received_message is None:
rclpy.spin_once(self.node, timeout_sec=0.1)
if time.monotonic() - start_time > 1.0:
self.fail(
"Timed out waiting for service_response after unadvertise (expected timeout response)."
)

self.assertEqual(self.received_message["op"], "service_response")
self.assertFalse(self.received_message["result"])
self.assertEqual(
self.received_message["values"],
"Timeout exceeded while waiting for service response",
)

self.assertTrue(f"Service {service_path} was unadvertised" in context.exception)
# Subsequent calls should be rejected immediately
self.received_message = None
call_msg2 = loads(
dumps(
{
"op": "call_service",
"id": "bar",
"service": service_path,
"args": {"data": False},
}
)
)
Thread(target=self.call_service.call_service, args=(call_msg2,)).start()

start_time = time.monotonic()
while self.received_message is None:
rclpy.spin_once(self.node, timeout_sec=0.1)
if time.monotonic() - start_time > 0.5:
self.fail(
"Timed out waiting for immediate rejection of call to unadvertised service."
)

self.assertEqual(self.received_message["op"], "service_response")
self.assertFalse(self.received_message["result"])
self.assertEqual(
self.received_message["values"],
f"Service {service_path} does not exist",
)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,41 @@ def tearDown(self) -> None:
self.executor.shutdown()
rclpy.shutdown()

def assertTopicSubscribed(self, topic: str, timeout: float = 1.0) -> None:
start_time = time.monotonic()
while not is_topic_subscribed(self.node, topic):
time.sleep(0.05)
if time.monotonic() - start_time > timeout:
self.fail(f"Timed out waiting for topic '{topic}' to be subscribed.")

def assertTopicNotSubscribed(self, topic: str, timeout: float = 1.0) -> None:
start_time = time.monotonic()
while is_topic_subscribed(self.node, topic):
time.sleep(0.05)
if time.monotonic() - start_time > timeout:
self.fail(f"Timed out waiting for topic '{topic}' to be unsubscribed.")
Comment thread
bjsowa marked this conversation as resolved.

def test_register_multisubscriber(self) -> None:
"""Register a subscriber on a clean topic with a good msg type."""
topic = "/test_register_multisubscriber"
msg_type = "std_msgs/String"

self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
MultiSubscriber(topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type)
time.sleep(0.05)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

def test_unregister_multisubscriber(self) -> None:
"""Register and unregister a subscriber on a clean topic with a good msg type."""
topic = "/test_unregister_multisubscriber"
msg_type = "std_msgs/String"

self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
multi: MultiSubscriber[String] = MultiSubscriber(
topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type
)
time.sleep(0.05)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)
multi.unregister()
time.sleep(0.05)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

def test_verify_type(self) -> None:
topic = "/test_verify_type"
Expand Down Expand Up @@ -90,12 +101,11 @@ def test_subscribe_unsubscribe(self) -> None:
topic = "/test_subscribe_unsubscribe"
msg_type = "std_msgs/String"

self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
multi: MultiSubscriber[String] = MultiSubscriber(
topic, self.client_id, lambda *_args: None, self.node, msg_type=msg_type
)
time.sleep(0.05)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)
self.assertEqual(len(multi.new_subscriptions), 0)

multi.subscribe(self.client_id, lambda _: None)
Expand All @@ -105,8 +115,7 @@ def test_subscribe_unsubscribe(self) -> None:
self.assertEqual(len(multi.new_subscriptions), 0)

multi.unregister()
time.sleep(0.1)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

def test_subscribe_receive_json(self) -> None:
topic = "/test_subscribe_receive_json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,37 @@ def tearDown(self) -> None:
self.executor.shutdown()
rclpy.shutdown()

def assertTopicSubscribed(self, topic: str, timeout: float = 1.0) -> None:
start_time = time.monotonic()
while not is_topic_subscribed(self.node, topic):
time.sleep(0.05)
if time.monotonic() - start_time > timeout:
self.fail(f"Timed out waiting for topic '{topic}' to be subscribed.")

def assertTopicNotSubscribed(self, topic: str, timeout: float = 1.0) -> None:
start_time = time.monotonic()
while is_topic_subscribed(self.node, topic):
time.sleep(0.05)
if time.monotonic() - start_time > timeout:
self.fail(f"Timed out waiting for topic '{topic}' to be unsubscribed.")
Comment thread
bjsowa marked this conversation as resolved.

def test_subscribe(self) -> None:
"""Register a publisher on a clean topic with a good msg type."""
topic = "/test_subscribe"
msg_type = "std_msgs/String"
client = "client_test_subscribe"

self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
manager.subscribe(client, topic, lambda _: None, self.node, msg_type)
time.sleep(0.05)
Comment thread
bjsowa marked this conversation as resolved.
Outdated
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.unsubscribe(client, topic)
time.sleep(0.05)
self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

def test_register_subscriber_multiclient(self) -> None:
topic = "/test_register_subscriber_multiclient"
Expand All @@ -63,26 +77,22 @@ def test_register_subscriber_multiclient(self) -> None:
client2 = "client_test_register_subscriber_multiclient_2"

self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
manager.subscribe(client1, topic, lambda _: None, self.node, msg_type)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.subscribe(client2, topic, lambda _: None, self.node, msg_type)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.unsubscribe(client1, topic)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.unsubscribe(client2, topic)
time.sleep(0.05)
self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

def test_register_publisher_conflicting_types(self) -> None:
topic = "/test_register_publisher_conflicting_types"
Expand All @@ -91,11 +101,10 @@ def test_register_publisher_conflicting_types(self) -> None:
client = "client_test_register_publisher_conflicting_types"

self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
manager.subscribe(client, topic, lambda _: None, self.node, msg_type)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

self.assertRaises(
TypeConflictException,
Expand All @@ -115,43 +124,39 @@ def test_register_multiple_publishers(self) -> None:

self.assertFalse(topic1 in manager._subscribers)
self.assertFalse(topic2 in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic1))
self.assertFalse(is_topic_subscribed(self.node, topic2))
self.assertTopicNotSubscribed(topic1)
self.assertTopicNotSubscribed(topic2)

manager.subscribe(client, topic1, lambda _: None, self.node, msg_type)
time.sleep(0.05)
self.assertTrue(topic1 in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic1))
self.assertTopicSubscribed(topic1)
self.assertFalse(topic2 in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic2))
self.assertTopicNotSubscribed(topic2)

manager.subscribe(client, topic2, lambda _: None, self.node, msg_type)
time.sleep(0.05)
self.assertTrue(topic1 in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic1))
self.assertTopicSubscribed(topic1)
self.assertTrue(topic2 in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic2))
self.assertTopicSubscribed(topic2)

manager.unsubscribe(client, topic1)
time.sleep(0.05)
self.assertFalse(topic1 in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic1))
self.assertTopicNotSubscribed(topic1)
self.assertTrue(topic2 in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic2))
self.assertTopicSubscribed(topic2)

manager.unsubscribe(client, topic2)
time.sleep(0.05)
self.assertFalse(topic1 in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic1))
self.assertTopicNotSubscribed(topic1)
self.assertFalse(topic2 in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic2))
self.assertTopicNotSubscribed(topic2)

def test_register_no_msgtype(self) -> None:
topic = "/test_register_no_msgtype"
client = "client_test_register_no_msgtype"

self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
self.assertRaises(
TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node
)
Expand All @@ -160,26 +165,24 @@ def test_register_infer_topictype(self) -> None:
topic = "/test_register_infer_topictype"
client = "client_test_register_infer_topictype"

self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

subscriber_qos = QoSProfile(
depth=10,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
self.node.create_subscription(String, topic, lambda *_args: None, subscriber_qos)

self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)
self.assertFalse(topic in manager._subscribers)

manager.subscribe(client, topic, lambda _: None, self.node)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.unsubscribe(client, topic)
time.sleep(0.05)
self.assertFalse(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

def test_register_multiple_notopictype(self) -> None:
topic = "/test_register_multiple_notopictype"
Expand All @@ -188,34 +191,30 @@ def test_register_multiple_notopictype(self) -> None:
client2 = "client_test_register_multiple_notopictype_2"

self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

manager.subscribe(client1, topic, lambda _: None, self.node, msg_type)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.subscribe(client2, topic, lambda _: None, self.node)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.unsubscribe(client1, topic)
time.sleep(0.05)
self.assertTrue(topic in manager._subscribers)
self.assertTrue(is_topic_subscribed(self.node, topic))
self.assertTopicSubscribed(topic)

manager.unsubscribe(client2, topic)
time.sleep(0.05)
self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)

def test_subscribe_not_registered(self) -> None:
topic = "/test_subscribe_not_registered"
client = "client_test_subscribe_not_registered"

self.assertFalse(topic in manager._subscribers)
self.assertFalse(is_topic_subscribed(self.node, topic))
self.assertTopicNotSubscribed(topic)
self.assertRaises(
TopicNotEstablishedException, manager.subscribe, client, topic, None, self.node
)
Expand Down
Loading