Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions rclpy/rclpy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Callable
import threading
import time
from types import TracebackType
Expand Down Expand Up @@ -46,12 +47,16 @@ def __init__(
srv_type: type[Srv[SrvRequestT, SrvResponseT]],
srv_name: str,
qos_profile: QoSProfile,
*,
on_destroy: Optional[Callable[['BaseClient[SrvRequestT, SrvResponseT]'], None]] = None,
) -> None:
self.context = context
self.__client = client_impl
self.srv_type = srv_type
self.srv_name = srv_name
self.qos_profile = qos_profile
self._on_destroy = on_destroy
self._destroyed = False

def service_is_ready(self) -> bool:
"""
Expand Down Expand Up @@ -95,13 +100,17 @@ def logger_name(self) -> str:
return self.__client.get_logger_name()

def destroy(self) -> None:
"""
Destroy a container for a ROS service client.

.. warning:: Users should not destroy a service client with this destructor, instead they
should call :meth:`.Node.destroy_client`.
"""
self.__client.destroy_when_not_in_use()
"""Destroy the client, notifying the owning node and releasing the handle."""
if self._destroyed:
return
self._destroyed = True
if self._on_destroy is not None:
self._on_destroy(self)
self._on_destroy = None
self._destroy()

def _destroy(self) -> None:
self.handle.destroy_when_not_in_use()

def __enter__(self) -> Self:
return self
Expand All @@ -124,6 +133,7 @@ def __init__(
srv_name: str,
qos_profile: QoSProfile,
*,
on_destroy: Optional[Callable[['BaseClient[SrvRequestT, SrvResponseT]'], None]] = None,
callback_group: CallbackGroup
) -> None:
"""
Expand All @@ -145,7 +155,8 @@ def __init__(
client_impl=client_impl,
srv_type=srv_type,
srv_name=srv_name,
qos_profile=qos_profile
qos_profile=qos_profile,
on_destroy=on_destroy
)
# Key is a sequence number, value is an instance of a Future
self._pending_requests: Dict[int, Future[SrvResponseT]] = {}
Expand Down
5 changes: 5 additions & 0 deletions rclpy/rclpy/lifecycle/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,34 +144,39 @@ def __init__(
self._state_machine.service_change_state.name,
self.__on_change_state,
QoSProfile(**self._state_machine.service_change_state.qos),
on_destroy=self._on_destroy_service,
callback_group=callback_group)
self._service_get_state = Service(
self._state_machine.service_get_state,
lifecycle_msgs.srv.GetState,
self._state_machine.service_get_state.name,
self.__on_get_state,
QoSProfile(**self._state_machine.service_get_state.qos),
on_destroy=self._on_destroy_service,
callback_group=callback_group)
self._service_get_available_states = Service(
self._state_machine.service_get_available_states,
lifecycle_msgs.srv.GetAvailableStates,
self._state_machine.service_get_available_states.name,
self.__on_get_available_states,
QoSProfile(**self._state_machine.service_get_available_states.qos),
on_destroy=self._on_destroy_service,
callback_group=callback_group)
self._service_get_available_transitions = Service(
self._state_machine.service_get_available_transitions,
lifecycle_msgs.srv.GetAvailableTransitions,
self._state_machine.service_get_available_transitions.name,
self.__on_get_available_transitions,
QoSProfile(**self._state_machine.service_get_available_transitions.qos),
on_destroy=self._on_destroy_service,
callback_group=callback_group)
self._service_get_transition_graph = Service(
self._state_machine.service_get_transition_graph,
lifecycle_msgs.srv.GetAvailableTransitions,
self._state_machine.service_get_transition_graph.name,
self.__on_get_transition_graph,
QoSProfile(**self._state_machine.service_get_transition_graph.qos),
on_destroy=self._on_destroy_service,
callback_group=callback_group)

lifecycle_services = [
Expand Down
4 changes: 3 additions & 1 deletion rclpy/rclpy/lifecycle/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

from __future__ import annotations

from typing import Generic, Tuple, Type, TYPE_CHECKING, TypedDict, Union
from collections.abc import Callable
from typing import Generic, Optional, Tuple, Type, TYPE_CHECKING, TypedDict, Union

from rclpy.callback_groups import CallbackGroup
from rclpy.event_handler import PublisherEventCallbacks
Expand All @@ -37,6 +38,7 @@ class LifecyclePublisherKWArgs(TypedDict, Generic[MsgT]):
msg_type: Type[MsgT]
topic: str
qos_profile: QoSProfile
on_destroy: Optional[Callable[['Publisher[MsgT]'], None]]
event_callbacks: PublisherEventCallbacks
callback_group: CallbackGroup

Expand Down
74 changes: 37 additions & 37 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from rclpy.callback_groups import CallbackGroup
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.client import Client
from rclpy.client import BaseClient, Client
from rclpy.clock import Clock
from rclpy.clock_type import ClockType
from rclpy.constants import S_TO_NS
Expand Down Expand Up @@ -85,14 +85,15 @@
from rclpy.qos import QoSProfile
from rclpy.qos_overriding_options import _declare_qos_parameters
from rclpy.qos_overriding_options import QoSOverridingOptions
from rclpy.service import BaseService
from rclpy.service import Service
from rclpy.service import ServiceCallbackUnion
from rclpy.subscription import GenericSubscriptionCallbackUnion
from rclpy.subscription import Subscription
from rclpy.subscription import SubscriptionCallbackUnion
from rclpy.subscription_content_filter_options import ContentFilterOptions
from rclpy.time_source import TimeSource
from rclpy.timer import Rate
from rclpy.timer import BaseTimer, Rate
from rclpy.timer import Timer
from rclpy.timer import TimerCallbackUnion
from rclpy.type_description_service import TypeDescriptionService
Expand Down Expand Up @@ -1622,6 +1623,7 @@ def create_publisher(
try:
publisher = publisher_class(
publisher_object, msg_type, topic, qos_profile,
on_destroy=self._on_destroy_publisher,
event_callbacks=event_callbacks or PublisherEventCallbacks(),
callback_group=callback_group)
except Exception:
Expand Down Expand Up @@ -1748,6 +1750,7 @@ def create_subscription(
subscription = Subscription(
subscription_object, msg_type,
topic, callback, qos_profile, raw,
on_destroy=self._on_destroy_subscription,
callback_group=callback_group,
event_callbacks=event_callbacks or SubscriptionEventCallbacks())
except Exception:
Expand Down Expand Up @@ -1798,6 +1801,7 @@ def create_client(
client = Client(
self.context,
client_impl, srv_type, srv_name, qos_profile,
on_destroy=self._on_destroy_client,
callback_group=callback_group)
callback_group.add_entity(client)
self._clients.append(client)
Expand Down Expand Up @@ -1843,6 +1847,7 @@ def create_service(
service = Service(
service_impl,
srv_type, srv_name, callback, qos_profile,
on_destroy=self._on_destroy_service,
callback_group=callback_group)
callback_group.add_entity(service)
self._services.append(service)
Expand Down Expand Up @@ -1881,6 +1886,7 @@ def create_timer(
timer = Timer(
callback, timer_period_nsec, clock,
callback_group=callback_group,
on_destroy=self._on_destroy_timer,
context=self.context,
autostart=autostart
)
Expand Down Expand Up @@ -1934,21 +1940,38 @@ def create_rate(
timer = self.create_timer(period, callback, group, clock)
return Rate(timer, context=self.context)

def _on_destroy_publisher(self, publisher: Publisher) -> None:
self._publishers.remove(publisher)
for event_handler in publisher.event_handlers:
self.__waitables.remove(event_handler)
self._wake_executor()

def _on_destroy_subscription(self, subscription: Subscription[Any]) -> None:
self._subscriptions.remove(subscription)
for event_handler in subscription.event_handlers:
self.__waitables.remove(event_handler)
self._wake_executor()

def _on_destroy_client(self, client: BaseClient[Any, Any]) -> None:
self._clients.remove(client)
self._wake_executor()

def _on_destroy_service(self, service: BaseService[Any, Any]) -> None:
self._services.remove(service)
self._wake_executor()

def _on_destroy_timer(self, timer: BaseTimer) -> None:
self._timers.remove(timer)
self._wake_executor()

def destroy_publisher(self, publisher: Publisher[Any]) -> bool:
"""
Destroy a publisher created by the node.

:return: ``True`` if successful, ``False`` otherwise.
"""
if publisher in self._publishers:
self._publishers.remove(publisher)
for event_handler in publisher.event_handlers:
self.__waitables.remove(event_handler)
try:
publisher.destroy()
except InvalidHandle:
return False
self._wake_executor()
publisher.destroy()
return True
return False

Expand All @@ -1959,14 +1982,7 @@ def destroy_subscription(self, subscription: Subscription[Any]) -> bool:
:return: ``True`` if successful, ``False`` otherwise.
"""
if subscription in self._subscriptions:
self._subscriptions.remove(subscription)
for event_handler in subscription.event_handlers:
self.__waitables.remove(event_handler)
try:
subscription.destroy()
except InvalidHandle:
return False
self._wake_executor()
subscription.destroy()
return True
return False

Expand All @@ -1977,12 +1993,7 @@ def destroy_client(self, client: Client[Any, Any]) -> bool:
:return: ``True`` if successful, ``False`` otherwise.
"""
if client in self._clients:
self._clients.remove(client)
try:
client.destroy()
except InvalidHandle:
return False
self._wake_executor()
client.destroy()
return True
return False

Expand All @@ -1993,12 +2004,7 @@ def destroy_service(self, service: Service[Any, Any]) -> bool:
:return: ``True`` if successful, ``False`` otherwise.
"""
if service in self._services:
self._services.remove(service)
try:
service.destroy()
except InvalidHandle:
return False
self._wake_executor()
service.destroy()
return True
return False

Expand All @@ -2009,12 +2015,7 @@ def destroy_timer(self, timer: Timer) -> bool:
:return: ``True`` if successful, ``False`` otherwise.
"""
if timer in self._timers:
self._timers.remove(timer)
try:
timer.destroy()
except InvalidHandle:
return False
self._wake_executor()
timer.destroy()
return True
return False

Expand Down Expand Up @@ -2078,7 +2079,6 @@ def destroy_node(self) -> None:
self.destroy_timer(self._timers[0])
while self._guards:
self.destroy_guard_condition(self._guards[0])
self._type_description_service.destroy()
self.__node.destroy_when_not_in_use()
self._wake_executor()

Expand Down
31 changes: 21 additions & 10 deletions rclpy/rclpy/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Callable
from types import TracebackType
from typing import Generic, List, Optional, Type, TypeVar, Union

Expand All @@ -37,11 +38,15 @@ def __init__(
msg_type: Type[MsgT],
topic: str,
qos_profile: QoSProfile,
*,
on_destroy: Optional[Callable[['BasePublisher[MsgT]'], None]] = None,
) -> None:
self.__publisher = publisher_impl
self.msg_type = msg_type
self.topic = topic
self.qos_profile = qos_profile
self._on_destroy = on_destroy
self._destroyed = False

def publish(self, msg: Union[MsgT, bytes]) -> None:
"""
Expand Down Expand Up @@ -80,7 +85,17 @@ def logger_name(self) -> str:
return self.__publisher.get_logger_name()

def destroy(self) -> None:
self.__publisher.destroy_when_not_in_use()
"""Destroy the publisher, notifying the owning node and releasing the handle."""
if self._destroyed:
return
self._destroyed = True
if self._on_destroy is not None:
self._on_destroy(self)
self._on_destroy = None
self._destroy()

def _destroy(self) -> None:
self.handle.destroy_when_not_in_use()

def assert_liveliness(self) -> None:
"""
Expand Down Expand Up @@ -113,6 +128,7 @@ def __init__(
topic: str,
qos_profile: QoSProfile,
*,
on_destroy: Optional[Callable[['Publisher[MsgT]'], None]] = None,
event_callbacks: PublisherEventCallbacks,
callback_group: CallbackGroup,
) -> None:
Expand All @@ -134,8 +150,9 @@ def __init__(
publisher_impl=publisher_impl,
msg_type=msg_type,
topic=topic,
qos_profile=qos_profile
qos_profile=qos_profile,
)
self._on_destroy = on_destroy

self.event_handlers: List[EventHandler] = event_callbacks.create_event_handlers(
callback_group, publisher_impl, topic)
Expand All @@ -160,13 +177,7 @@ def wait_for_all_acked(self, timeout: Duration = Duration(seconds=-1)) -> bool:
with self.handle:
return self.handle.wait_for_all_acked(timeout._duration_handle)

def destroy(self) -> None:
"""
Destroy a container for a ROS publisher.

.. warning:: Users should not destroy a publisher with this method, instead they should
call :meth:`.Node.destroy_publisher`.
"""
def _destroy(self) -> None:
for handler in self.event_handlers:
handler.destroy()
super().destroy()
super()._destroy()
Loading