diff --git a/rclpy/rclpy/logging_service.py b/rclpy/rclpy/logging_service.py index e4b6fb4b9..a7d22cc9b 100644 --- a/rclpy/rclpy/logging_service.py +++ b/rclpy/rclpy/logging_service.py @@ -23,12 +23,12 @@ from rclpy.validate_topic_name import TOPIC_SEPARATOR_STRING if TYPE_CHECKING: - from rclpy.node import Node + from rclpy.node import BaseNode class LoggingService: - def __init__(self, node: 'Node'): + def __init__(self, node: 'BaseNode'): node_name = node.get_name() get_logger_name_service_name = \ diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index 9974fab1a..a7c92eb0d 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -89,7 +89,7 @@ from rclpy.service import BaseService from rclpy.service import Service from rclpy.service import ServiceCallbackUnion -from rclpy.subscription import GenericSubscriptionCallbackUnion +from rclpy.subscription import BaseSubscription, GenericSubscriptionCallbackUnion from rclpy.subscription import Subscription from rclpy.subscription import SubscriptionCallbackUnion from rclpy.subscription_content_filter_options import ContentFilterOptions @@ -148,7 +148,11 @@ def __init__( use_global_arguments: bool = True, enable_rosout: bool = True, rosout_qos_profile: Union[QoSProfile, int] = qos_profile_rosout_default, + start_parameter_services: bool = True, + parameter_overrides: Optional[List[Parameter[Any]]] = None, allow_undeclared_parameters: bool = False, + automatically_declare_parameters_from_overrides: bool = False, + enable_logger_service: bool = False, ) -> None: self._context = get_default_context() if context is None else context self._parameters: Dict[str, Parameter[Any]] = {} @@ -160,7 +164,6 @@ def __init__( self._allow_undeclared_parameters = allow_undeclared_parameters self._parameter_overrides: Dict[str, Parameter[Any]] = {} self._descriptors: Dict[str, ParameterDescriptor] = {} - self._parameter_event_publisher: Optional[BasePublisher[ParameterEvent]] = None namespace = namespace or '' @@ -195,6 +198,42 @@ def __init__( with self.handle: self._logger = get_logger(self.__node.logger_name()) + self._parameter_event_publisher: Optional[BasePublisher[ParameterEvent]] = \ + self.create_publisher(ParameterEvent, '/parameter_events', + qos_profile_parameter_events) + + with self.handle: + self._parameter_overrides = self.handle.get_parameters(Parameter) + # Combine parameters from params files with those from the node constructor and + # use the set_parameters_atomically API so a parameter event is published. + if parameter_overrides is not None: + self._parameter_overrides.update({p.name: p for p in parameter_overrides}) + + if automatically_declare_parameters_from_overrides: + self.declare_parameters( + '', + [ + (name, param.value, ParameterDescriptor()) + for name, param in self._parameter_overrides.items()], + ignore_override=True, + ) + + # Init a time source. + # Note: parameter overrides and parameter event publisher need to be ready at this point + # to be able to declare 'use_sim_time' if it was not declared yet. + self._time_source = TimeSource(node=self) + self._time_source.attach_clock(self.get_clock()) + + if start_parameter_services: + self._parameter_service = ParameterService(self) + + if enable_logger_service: + self._logger_service = LoggingService(self) + + self._type_description_service = TypeDescriptionService(self) + + self._context.track_node(self) + @property def context(self) -> Context: """Get the context associated with the node.""" @@ -229,6 +268,47 @@ def get_namespace(self) -> str: def get_clock(self) -> BaseClock: ... + @abstractmethod + def create_publisher( + self, + msg_type: Type[MsgT], + topic: str, + qos_profile: Union[QoSProfile, int], + ) -> BasePublisher[MsgT]: + ... + + @abstractmethod + def create_subscription( + self, + msg_type: Type[MsgT], + topic: str, + callback: SubscriptionCallbackUnion[MsgT], + qos_profile: Union[QoSProfile, int], + ) -> BaseSubscription[MsgT]: + ... + + @abstractmethod + def create_service( + self, + srv_type: type[Srv[SrvRequestT, SrvResponseT]], + srv_name: str, + callback: ServiceCallbackUnion[SrvRequestT, SrvResponseT], + *, + qos_profile: QoSProfile = qos_profile_services_default, + ) -> BaseService[SrvRequestT, SrvResponseT]: + ... + + @abstractmethod + def _create_service( + self, + service_impl: '_rclpy.Service[SrvRequestT, SrvResponseT]', + srv_type: type[Srv[SrvRequestT, SrvResponseT]], + srv_name: str, + callback: ServiceCallbackUnion[SrvRequestT, SrvResponseT], + qos_profile: QoSProfile, + ) -> BaseService[SrvRequestT, SrvResponseT]: + ... + def get_logger(self) -> RcutilsLogger: """Get the nodes logger.""" return self._logger @@ -1837,7 +1917,133 @@ def get_servers_info_by_service( no_mangle, _rclpy.rclpy_get_servers_info_by_service) + def _create_publisher_handle( + self, + msg_type: Type[MsgT], + topic: str, + qos_profile: Union[QoSProfile, int], + *, + qos_overriding_options: Optional[QoSOverridingOptions] = None, + ) -> '_rclpy.Publisher[MsgT]': + try: + final_topic = self.resolve_topic_name(topic) + except RuntimeError: + # if it's name validation error, raise a more appropriate exception. + try: + self._validate_topic_or_service_name(topic) + except InvalidTopicNameException as ex: + raise ex from None + # else reraise the previous exception + raise + + if qos_overriding_options is None: + qos_overriding_options = QoSOverridingOptions([]) + _declare_qos_parameters( + Publisher, self, final_topic, qos_profile, qos_overriding_options) + + # this line imports the typesupport for the message module if not already done + failed = False + check_is_valid_msg_type(msg_type) + try: + with self.handle: + publisher_object = _rclpy.Publisher( + self.handle, msg_type, topic, qos_profile.get_c_qos_profile()) + except ValueError: + failed = True + if failed: + self._validate_topic_or_service_name(topic) + + return publisher_object + + def _create_subscription_handle( + self, + msg_type: Type[MsgT], + topic: str, + qos_profile: QoSProfile, + *, + qos_overriding_options: Optional[QoSOverridingOptions] = None, + content_filter_options: Optional[ContentFilterOptions] = None, + acceptable_buffer_backends: Optional[str] = None + ) -> '_rclpy.Subscription[MsgT]': + try: + final_topic = self.resolve_topic_name(topic) + except RuntimeError: + # if it's name validation error, raise a more appropriate exception. + try: + self._validate_topic_or_service_name(topic) + except InvalidTopicNameException as ex: + raise ex from None + # else reraise the previous exception + raise + + if qos_overriding_options is None: + qos_overriding_options = QoSOverridingOptions([]) + _declare_qos_parameters( + Subscription, self, final_topic, qos_profile, qos_overriding_options) + + # this line imports the typesupport for the message module if not already done + failed = False + check_is_valid_msg_type(msg_type) + try: + with self.handle: + subscription_object = _rclpy.Subscription( + self.handle, msg_type, topic, qos_profile.get_c_qos_profile(), + content_filter_options, acceptable_buffer_backends) + except ValueError: + failed = True + if failed: + self._validate_topic_or_service_name(topic) + + return subscription_object + + def _create_service_handle( + self, + srv_type: type[Srv[SrvRequestT, SrvResponseT]], + srv_name: str, + *, + qos_profile: QoSProfile = qos_profile_services_default, + ) -> '_rclpy.Service[SrvRequestT, SrvResponseT]': + check_is_valid_srv_type(srv_type) + failed = False + try: + with self.handle: + service_impl: '_rclpy.Service[SrvRequestT, SrvResponseT]' = _rclpy.Service( + self.handle, + srv_type, + srv_name, + qos_profile.get_c_qos_profile()) + except ValueError: + failed = True + if failed: + self._validate_topic_or_service_name(srv_name, is_service=True) + + return service_impl + + def _create_client_handle( + self, + srv_type: type[Srv[SrvRequestT, SrvResponseT]], + srv_name: str, + *, + qos_profile: QoSProfile = qos_profile_services_default, + ) -> '_rclpy.Client[SrvRequestT, SrvResponseT]': + check_is_valid_srv_type(srv_type) + failed = False + try: + with self.handle: + client_impl = _rclpy.Client( + self.handle, + srv_type, + srv_name, + qos_profile.get_c_qos_profile()) + except ValueError: + failed = True + if failed: + self._validate_topic_or_service_name(srv_name, is_service=True) + + return client_impl + def destroy_node(self) -> None: + self._context.untrack_node(self) self._parameter_event_publisher = None self.handle.destroy_when_not_in_use() @@ -1916,45 +2122,14 @@ def __init__( use_global_arguments=use_global_arguments, enable_rosout=enable_rosout, rosout_qos_profile=rosout_qos_profile, + start_parameter_services=start_parameter_services, + parameter_overrides=parameter_overrides, allow_undeclared_parameters=allow_undeclared_parameters, + automatically_declare_parameters_from_overrides=( + automatically_declare_parameters_from_overrides), + enable_logger_service=enable_logger_service, ) - self._parameter_event_publisher: Optional[Publisher[ParameterEvent]] = \ - self.create_publisher(ParameterEvent, '/parameter_events', - qos_profile_parameter_events) - - with self.handle: - self._parameter_overrides = self.handle.get_parameters(Parameter) - # Combine parameters from params files with those from the node constructor and - # use the set_parameters_atomically API so a parameter event is published. - if parameter_overrides is not None: - self._parameter_overrides.update({p.name: p for p in parameter_overrides}) - - if automatically_declare_parameters_from_overrides: - self.declare_parameters( - '', - [ - (name, param.value, ParameterDescriptor()) - for name, param in self._parameter_overrides.items()], - ignore_override=True, - ) - - # Init a time source. - # Note: parameter overrides and parameter event publisher need to be ready at this point - # to be able to declare 'use_sim_time' if it was not declared yet. - self._time_source = TimeSource(node=self) - self._time_source.attach_clock(self._clock) - - if start_parameter_services: - self._parameter_service = ParameterService(self) - - if enable_logger_service: - self._logger_service = LoggingService(self) - - self._type_description_service = TypeDescriptionService(self) - - self._context.track_node(self) - @property def publishers(self) -> Iterator[Publisher[Any]]: """Get publishers that have been created on this node.""" @@ -2077,34 +2252,12 @@ def create_publisher( callback_group = callback_group or self.default_callback_group - failed = False - try: - final_topic = self.resolve_topic_name(topic) - except RuntimeError: - # if it's name validation error, raise a more appropriate exception. - try: - self._validate_topic_or_service_name(topic) - except InvalidTopicNameException as ex: - raise ex from None - # else reraise the previous exception - raise - - if qos_overriding_options is None: - qos_overriding_options = QoSOverridingOptions([]) - _declare_qos_parameters( - Publisher, self, final_topic, qos_profile, qos_overriding_options) - - # this line imports the typesupport for the message module if not already done - failed = False - check_is_valid_msg_type(msg_type) - try: - with self.handle: - publisher_object = _rclpy.Publisher( - self.handle, msg_type, topic, qos_profile.get_c_qos_profile()) - except ValueError: - failed = True - if failed: - self._validate_topic_or_service_name(topic) + publisher_object = self._create_publisher_handle( + msg_type, + topic, + qos_profile, + qos_overriding_options=qos_overriding_options + ) try: publisher = publisher_class( @@ -2210,34 +2363,14 @@ def create_subscription( callback_group = callback_group or self.default_callback_group - try: - final_topic = self.resolve_topic_name(topic) - except RuntimeError: - # if it's name validation error, raise a more appropriate exception. - try: - self._validate_topic_or_service_name(topic) - except InvalidTopicNameException as ex: - raise ex from None - # else reraise the previous exception - raise - - if qos_overriding_options is None: - qos_overriding_options = QoSOverridingOptions([]) - _declare_qos_parameters( - Subscription, self, final_topic, qos_profile, qos_overriding_options) - - # this line imports the typesupport for the message module if not already done - failed = False - check_is_valid_msg_type(msg_type) - try: - with self.handle: - subscription_object = _rclpy.Subscription( - self.handle, msg_type, topic, qos_profile.get_c_qos_profile(), - content_filter_options, acceptable_buffer_backends) - except ValueError: - failed = True - if failed: - self._validate_topic_or_service_name(topic) + subscription_object = self._create_subscription_handle( + msg_type, + topic, + qos_profile, + qos_overriding_options=qos_overriding_options, + content_filter_options=content_filter_options, + acceptable_buffer_backends=acceptable_buffer_backends + ) try: subscription = Subscription( @@ -2277,19 +2410,12 @@ def create_client( """ if callback_group is None: callback_group = self.default_callback_group - check_is_valid_srv_type(srv_type) - failed = False - try: - with self.handle: - client_impl = _rclpy.Client( - self.handle, - srv_type, - srv_name, - qos_profile.get_c_qos_profile()) - except ValueError: - failed = True - if failed: - self._validate_topic_or_service_name(srv_name, is_service=True) + + client_impl = self._create_client_handle( + srv_type, + srv_name, + qos_profile=qos_profile + ) client = Client( self.context, @@ -2321,22 +2447,32 @@ def create_service( :param callback_group: The callback group for the service server. If ``None``, then the default callback group for the node is used. """ + service_impl = self._create_service_handle( + srv_type, + srv_name, + qos_profile=qos_profile + ) + + return self._create_service( + service_impl, + srv_type, + srv_name, + callback, + qos_profile, + callback_group + ) + + def _create_service( + self, + service_impl: '_rclpy.Service[SrvRequestT, SrvResponseT]', + srv_type: type[Srv[SrvRequestT, SrvResponseT]], + srv_name: str, + callback: ServiceCallbackUnion[SrvRequestT, SrvResponseT], + qos_profile: QoSProfile, + callback_group: Optional[CallbackGroup] = None, + ) -> Service[SrvRequestT, SrvResponseT]: if callback_group is None: callback_group = self.default_callback_group - check_is_valid_srv_type(srv_type) - failed = False - try: - with self.handle: - service_impl: '_rclpy.Service[SrvRequestT, SrvResponseT]' = _rclpy.Service( - self.handle, - srv_type, - srv_name, - qos_profile.get_c_qos_profile()) - except ValueError: - failed = True - if failed: - self._validate_topic_or_service_name(srv_name, is_service=True) - service = Service( service_impl, srv_type, srv_name, callback, qos_profile, diff --git a/rclpy/rclpy/parameter_service.py b/rclpy/rclpy/parameter_service.py index 77d5e4a06..e7fc7432f 100644 --- a/rclpy/rclpy/parameter_service.py +++ b/rclpy/rclpy/parameter_service.py @@ -26,12 +26,12 @@ from rclpy.validate_topic_name import TOPIC_SEPARATOR_STRING if TYPE_CHECKING: - from rclpy.node import Node + from rclpy.node import BaseNode class ParameterService: - def __init__(self, node: 'Node'): + def __init__(self, node: 'BaseNode'): self._node_weak_ref = weakref.ref(node) nodename = node.get_name() @@ -166,7 +166,7 @@ def _set_parameters_atomically_callback( ) return response - def _get_node(self) -> 'Node': + def _get_node(self) -> 'BaseNode': node = self._node_weak_ref() if node is None: raise ReferenceError('Expected valid node weak reference') diff --git a/rclpy/rclpy/time_source.py b/rclpy/rclpy/time_source.py index d11c6ca45..51b86f0ba 100644 --- a/rclpy/rclpy/time_source.py +++ b/rclpy/rclpy/time_source.py @@ -25,8 +25,8 @@ import rosgraph_msgs.msg if TYPE_CHECKING: - from rclpy.node import Node - from rclpy.subscription import Subscription + from rclpy.node import BaseNode + from rclpy.subscription import BaseSubscription CLOCK_TOPIC = '/clock' USE_SIM_TIME_NAME = 'use_sim_time' @@ -34,9 +34,9 @@ class TimeSource: - def __init__(self, *, node: Optional['Node'] = None): - self._clock_sub: Optional['Subscription[rosgraph_msgs.msg.Clock]'] = None - self._node_weak_ref: Optional[weakref.ReferenceType['Node']] = None + def __init__(self, *, node: Optional['BaseNode'] = None): + self._clock_sub: Optional['BaseSubscription[rosgraph_msgs.msg.Clock]'] = None + self._node_weak_ref: Optional[weakref.ReferenceType['BaseNode']] = None self._associated_clocks: Set[BaseClock] = set() # Zero time is a special value that means time is uninitialized self._last_time_set = Time(clock_type=ClockType.ROS_TIME) @@ -59,10 +59,8 @@ def ros_time_is_active(self, enabled: bool) -> None: self._subscribe_to_clock_topic() else: if self._clock_sub is not None: - node = self._get_node() - if node is not None: - node.destroy_subscription(self._clock_sub) - self._clock_sub = None + self._clock_sub.destroy() + self._clock_sub = None def _subscribe_to_clock_topic(self) -> None: if self._clock_sub is None: @@ -75,10 +73,10 @@ def _subscribe_to_clock_topic(self) -> None: QoSProfile(depth=1, reliability=ReliabilityPolicy.BEST_EFFORT) ) - def attach_node(self, node: 'Node') -> None: - from rclpy.node import Node - if not isinstance(node, Node): - raise TypeError('Node must be of type rclpy.node.Node') + def attach_node(self, node: 'BaseNode') -> None: + from rclpy.node import BaseNode + if not isinstance(node, BaseNode): + raise TypeError('Node must be of type rclpy.node.BaseNode') # Remove an existing node. if self._node_weak_ref is not None: self.detach_node() @@ -105,10 +103,7 @@ def attach_node(self, node: 'Node') -> None: def detach_node(self) -> None: # Remove the subscription to the clock topic. if self._clock_sub is not None: - node = self._get_node() - if node is None: - raise RuntimeError('Unable to destroy previously created clock subscription') - node.destroy_subscription(self._clock_sub) + self._clock_sub.destroy() self._clock_sub = None self._node_weak_ref = None @@ -147,7 +142,7 @@ def _on_parameter_event(self, parameter_list: List[Parameter[bool]]) -> SetParam return SetParametersResult(successful=successful, reason=reason) - def _get_node(self) -> Optional['Node']: + def _get_node(self) -> Optional['BaseNode']: if self._node_weak_ref is not None: return self._node_weak_ref() return None diff --git a/rclpy/rclpy/type_description_service.py b/rclpy/rclpy/type_description_service.py index 4caf1f661..59ee59bdb 100644 --- a/rclpy/rclpy/type_description_service.py +++ b/rclpy/rclpy/type_description_service.py @@ -21,14 +21,13 @@ from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.parameter import Parameter from rclpy.qos import qos_profile_services_default -from rclpy.service import Service from rclpy.type_support import check_is_valid_srv_type from rclpy.validate_topic_name import TOPIC_SEPARATOR_STRING from type_description_interfaces.srv import GetTypeDescription if TYPE_CHECKING: - from rclpy.node import Node + from rclpy.node import BaseNode START_TYPE_DESCRIPTION_SERVICE_PARAM = 'start_type_description_service' @@ -40,10 +39,10 @@ class TypeDescriptionService: The service is implemented in rcl, but should be enabled via parameter and have its callbacks handled via end-client execution framework, such as callback groups and waitsets. - This is not intended for use by end users, rather it is a component to be used by Node. + This is not intended for use by end users, rather it is a component to be used by BaseNode. """ - def __init__(self, node: 'Node'): + def __init__(self, node: 'BaseNode'): """Initialize the service, if the parameter is set to true.""" self._node_weak_ref = weakref.ref(node) node_name = node.get_name() @@ -80,20 +79,13 @@ def __init__(self, node: 'Node'): def _start_service(self) -> None: node = self._get_node() self._type_description_srv = _rclpy.TypeDescriptionService(node.handle) - # Because we are creating our own service wrapper, must manually add the service - # to the appropriate parts of Node because we cannot call create_service. check_is_valid_srv_type(GetTypeDescription) - service = Service( - service_impl=self._type_description_srv.impl, - srv_type=GetTypeDescription, - srv_name=self.service_name, - callback=self._service_callback, - on_destroy=node._on_destroy_service, - callback_group=node.default_callback_group, - qos_profile=qos_profile_services_default) - node.default_callback_group.add_entity(service) - node._services.append(service) - node._wake_executor() + node._create_service( + self._type_description_srv.impl, + GetTypeDescription, + self.service_name, + self._service_callback, + qos_profile_services_default) def _service_callback( self, @@ -105,7 +97,7 @@ def _service_callback( return self._type_description_srv.handle_request( request, GetTypeDescription.Response, self._get_node().handle) - def _get_node(self) -> 'Node': + def _get_node(self) -> 'BaseNode': node = self._node_weak_ref() if node is None: raise ReferenceError('Expected valid node weak reference')