diff --git a/rclpy/rclpy/clock.py b/rclpy/rclpy/clock.py index 8cef3b2c3..db9ebb78f 100644 --- a/rclpy/rclpy/clock.py +++ b/rclpy/rclpy/clock.py @@ -13,10 +13,10 @@ # limitations under the License. from types import TracebackType -from typing import Callable, Literal, Optional, overload, Type, TYPE_CHECKING, TypedDict +from typing import Callable, Optional, Type, TypedDict from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy -from typing_extensions import TypeAlias +from typing_extensions import deprecated, TypeAlias from .clock_type import ClockType from .context import Context @@ -87,7 +87,7 @@ class TimeJumpDictionary(TypedDict): class JumpHandle: - def __init__(self, *, clock: 'Clock', threshold: JumpThreshold, + def __init__(self, *, clock: 'BaseClock', threshold: JumpThreshold, pre_callback: Optional[JumpHandlePreCallbackType], post_callback: Optional[Callable[[TimeJumpDictionary], None]]) -> None: """ @@ -106,7 +106,7 @@ def __init__(self, *, clock: 'Clock', threshold: JumpThreshold, raise ValueError('pre_callback must be callable if given') if post_callback is not None and not callable(post_callback): raise ValueError('post_callback must be callable if given') - self._clock: Optional[Clock] = clock + self._clock: Optional[BaseClock] = clock self._pre_callback = pre_callback self._post_callback = post_callback @@ -137,37 +137,13 @@ def __exit__(self, t: Optional[Type[BaseException]], self.unregister() -class Clock: +class BaseClock: - if TYPE_CHECKING: - __clock: _rclpy.Clock - _clock_type: ClockType - - @overload - def __new__( - cls, - *, - clock_type: Literal[ClockType.ROS_TIME] - ) -> 'ROSClock': ... - - @overload - def __new__( - cls, - *, - clock_type: ClockType = ClockType.SYSTEM_TIME - ) -> 'Clock': ... - - def __new__(cls, *, - clock_type: ClockType = ClockType.SYSTEM_TIME) -> 'Clock': + def __init__(self, *, clock_type: ClockType = ClockType.SYSTEM_TIME) -> None: if not isinstance(clock_type, ClockType): raise TypeError('Clock type must be a ClockType enum') - if clock_type is ClockType.ROS_TIME: - self: 'Clock' = super().__new__(ROSClock) - else: - self = super().__new__(cls) self.__clock = _rclpy.Clock(clock_type.value) self._clock_type = clock_type - return self @property def clock_type(self) -> ClockType: @@ -183,7 +159,7 @@ def handle(self) -> _rclpy.Clock: return self.__clock def __repr__(self) -> str: - return 'Clock(clock_type={0})'.format(self.clock_type.name) + return f'{type(self).__name__}(clock_type={self.clock_type.name})' def now(self) -> Time: """Return the current time of this clock.""" @@ -225,11 +201,41 @@ def callback_shim(jump_dict: TimeJumpDictionary) -> None: clock=self, threshold=threshold, pre_callback=pre_callback, post_callback=post_callback_time_jump_dictionary) + @property + def ros_time_is_active(self) -> bool: + """ + Return ``True`` if ROS time is currently active. + + :raises RCLError: if the clock is not of type ``ROS_TIME``. + """ + with self.handle: + return self.handle.get_ros_time_override_is_enabled() + + def _set_ros_time_is_active(self, enabled: bool) -> None: + # This is not public because it is only to be called by a TimeSource managing the Clock + with self.handle: + self.handle.set_ros_time_override_is_enabled(enabled) + + def set_ros_time_override(self, time: Time) -> None: + """ + Set the next time the ROS clock will report when ROS time is active. + + :raises RCLError: if the clock is not of type ``ROS_TIME``. + """ + if not isinstance(time, Time): + raise TypeError( + 'Time must be specified as rclpy.time.Time. Received type: {0}'.format(type(time))) + with self.handle: + self.handle.set_ros_time_override(time._time_handle) + + +class Clock(BaseClock): + def sleep_until(self, until: Time, context: Optional[Context] = None) -> bool: """ Sleep until a specific time on this Clock is reached. - When using a ``ROSClock``, this may sleep forever if the ``TimeSource`` is misconfigured + When using ``ROS_TIME``, this may sleep forever if the ``TimeSource`` is misconfigured and the context is never shut down. ROS time being activated or deactivated causes this function to cease sleeping and return ``False``. @@ -276,11 +282,11 @@ def on_time_jump(time_jump: TimeJump) -> None: on_clock_change=True) with self.create_jump_callback(threshold, post_callback=on_time_jump): if ClockType.SYSTEM_TIME == self._clock_type: - event.wait_until_system(self.__clock, until._time_handle) + event.wait_until_system(self.handle, until._time_handle) elif ClockType.STEADY_TIME == self._clock_type: - event.wait_until_steady(self.__clock, until._time_handle) + event.wait_until_steady(self.handle, until._time_handle) elif ClockType.ROS_TIME == self._clock_type: - event.wait_until_ros(self.__clock, until._time_handle) + event.wait_until_ros(self.handle, until._time_handle) if not context.ok() or time_source_changed: return False @@ -298,7 +304,7 @@ def sleep_for(self, rel_time: Duration, context: Optional[Context] = None) -> bo clock.sleep_until(clock.now() + rel_time, context) - When using a ``ROSClock``, this may sleep forever if the ``TimeSource`` is misconfigured + When using a ``ROS_TIME``, this may sleep forever if the ``TimeSource`` is misconfigured and the context is never shut down. ROS time being activated or deactivated causes this function to cease sleeping and return False. @@ -313,28 +319,8 @@ def sleep_for(self, rel_time: Duration, context: Optional[Context] = None) -> bo return self.sleep_until(self.now() + rel_time, context) +@deprecated('Use Clock(clock_type=ClockType.ROS_TIME) instead') class ROSClock(Clock): - def __new__(cls) -> 'ROSClock': - self = super().__new__(Clock, clock_type=ClockType.ROS_TIME) - assert isinstance(self, ROSClock) - return self - - @property - def ros_time_is_active(self) -> bool: - """Return ``True`` if ROS time is currently active.""" - with self.handle: - return self.handle.get_ros_time_override_is_enabled() - - def _set_ros_time_is_active(self, enabled: bool) -> None: - # This is not public because it is only to be called by a TimeSource managing the Clock - with self.handle: - self.handle.set_ros_time_override_is_enabled(enabled) - - def set_ros_time_override(self, time: Time) -> None: - """Set the next time the ROS clock will report when ROS time is active.""" - if not isinstance(time, Time): - raise TypeError( - 'Time must be specified as rclpy.time.Time. Received type: {0}'.format(type(time))) - with self.handle: - self.handle.set_ros_time_override(time._time_handle) + def __init__(self) -> None: + super().__init__(clock_type=ClockType.ROS_TIME) diff --git a/rclpy/rclpy/impl/rcutils_logger.py b/rclpy/rclpy/impl/rcutils_logger.py index 044e744bc..7c3bb49cf 100644 --- a/rclpy/rclpy/impl/rcutils_logger.py +++ b/rclpy/rclpy/impl/rcutils_logger.py @@ -28,7 +28,7 @@ from typing import TypedDict from typing import Union -from rclpy.clock import Clock +from rclpy.clock import BaseClock from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.impl.logging_severity import LoggingSeverity from typing_extensions import Unpack @@ -103,7 +103,7 @@ class OnceContext(RcutilsLoggerContext): class ThrottleContext(RcutilsLoggerContext): throttle_duration_sec: float - throttle_time_source_type: Clock + throttle_time_source_type: BaseClock throttle_last_logged: int @@ -114,14 +114,14 @@ class SkipFirstContext(RcutilsLoggerContext): class LoggingFilterArgs(TypedDict, total=False): once: bool throttle_duration_sec: float - throttle_time_source_type: Clock + throttle_time_source_type: BaseClock skip_first: bool class LoggingFilterParams(TypedDict, total=False): once: Optional[bool] throttle_duration_sec: Optional[float] - throttle_time_source_type: Clock + throttle_time_source_type: BaseClock skip_first: Optional[bool] @@ -192,7 +192,7 @@ class Throttle(LoggingFilter): params: ClassVar[LoggingFilterParams] = { 'throttle_duration_sec': None, - 'throttle_time_source_type': Clock(), + 'throttle_time_source_type': BaseClock(), } @classmethod @@ -201,7 +201,7 @@ def initialize_context(cls, context: RcutilsLoggerContext, context = cast(ThrottleContext, context) super(Throttle, cls).initialize_context(context, **kwargs) context['throttle_last_logged'] = 0 - if not isinstance(context['throttle_time_source_type'], Clock): + if not isinstance(context['throttle_time_source_type'], BaseClock): raise ValueError( 'Received throttle_time_source_type of "{0}" ' 'is not a clock instance' diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index cf173ac4a..4ff17c452 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -52,7 +52,7 @@ from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.client import Client from rclpy.clock import Clock -from rclpy.clock import ROSClock +from rclpy.clock_type import ClockType from rclpy.constants import S_TO_NS from rclpy.context import Context from rclpy.endpoint_info import ServiceEndpointInfo, TopicEndpointInfo @@ -253,7 +253,7 @@ def __init__( self._parameter_overrides.update({p.name: p for p in parameter_overrides}) # Clock that has support for ROS time. - self._clock = ROSClock() + self._clock = Clock(clock_type=ClockType.ROS_TIME) if automatically_declare_parameters_from_overrides: self.declare_parameters( diff --git a/rclpy/rclpy/time_source.py b/rclpy/rclpy/time_source.py index 707e6d89b..d11c6ca45 100644 --- a/rclpy/rclpy/time_source.py +++ b/rclpy/rclpy/time_source.py @@ -16,7 +16,7 @@ import weakref from rcl_interfaces.msg import SetParametersResult -from rclpy.clock import ROSClock +from rclpy.clock import BaseClock from rclpy.clock_type import ClockType from rclpy.parameter import Parameter from rclpy.qos import QoSProfile @@ -37,7 +37,7 @@ class TimeSource: def __init__(self, *, node: Optional['Node'] = None): self._clock_sub: Optional['Subscription[rosgraph_msgs.msg.Clock]'] = None self._node_weak_ref: Optional[weakref.ReferenceType['Node']] = None - self._associated_clocks: Set[ROSClock] = set() + self._associated_clocks: Set[BaseClock] = set() # Zero time is a special value that means time is uninitialized self._last_time_set = Time(clock_type=ClockType.ROS_TIME) self._ros_time_is_active = False @@ -112,8 +112,8 @@ def detach_node(self) -> None: self._clock_sub = None self._node_weak_ref = None - def attach_clock(self, clock: ROSClock) -> None: - if not isinstance(clock, ROSClock): + def attach_clock(self, clock: BaseClock) -> None: + if clock.clock_type != ClockType.ROS_TIME: raise ValueError('Only clocks with type ROS_TIME can be attached.') clock.set_ros_time_override(self._last_time_set) diff --git a/rclpy/test/test_clock.py b/rclpy/test/test_clock.py index f89f2e823..0c2e9bcfb 100644 --- a/rclpy/test/test_clock.py +++ b/rclpy/test/test_clock.py @@ -20,7 +20,7 @@ import pytest import rclpy -from rclpy.clock import Clock +from rclpy.clock import BaseClock, Clock from rclpy.clock import JumpHandle from rclpy.clock import JumpThreshold from rclpy.clock import ROSClock @@ -28,6 +28,7 @@ from rclpy.context import Context from rclpy.duration import Duration from rclpy.exceptions import NotInitializedException +from rclpy.subscription import RCLError from rclpy.time import Time from rclpy.utilities import get_default_context @@ -66,22 +67,23 @@ def test_invalid_jump_threshold() -> None: class TestClock(unittest.TestCase): def test_clock_construction(self) -> None: - clock = Clock() - - with self.assertRaises(TypeError): - clock = Clock(clock_type='STEADY_TIME') # type: ignore[call-overload] - - clock = Clock(clock_type=ClockType.STEADY_TIME) - assert clock.clock_type == ClockType.STEADY_TIME - clock = Clock(clock_type=ClockType.SYSTEM_TIME) - assert clock.clock_type == ClockType.SYSTEM_TIME - # A subclass ROSClock is returned if ROS_TIME is specified. - clock = Clock(clock_type=ClockType.ROS_TIME) - assert clock.clock_type == ClockType.ROS_TIME - assert isinstance(clock, ROSClock) - - # Direct instantiation of a ROSClock is also possible. - clock = ROSClock() + for cls in [BaseClock, Clock]: + with self.subTest(cls=cls): + clock = cls() + + with self.assertRaises(TypeError): + clock = cls(clock_type='STEADY_TIME') # type: ignore[call-overload] + + clock = cls(clock_type=ClockType.STEADY_TIME) + assert clock.clock_type == ClockType.STEADY_TIME + clock = cls(clock_type=ClockType.SYSTEM_TIME) + assert clock.clock_type == ClockType.SYSTEM_TIME + clock = cls(clock_type=ClockType.ROS_TIME) + assert clock.clock_type == ClockType.ROS_TIME + + def test_ros_clock_deprecation(self) -> None: + with self.assertWarns(DeprecationWarning): + clock = ROSClock() assert clock.clock_type == ClockType.ROS_TIME def test_clock_now(self) -> None: @@ -108,12 +110,30 @@ def test_clock_now(self) -> None: now = now2 def test_ros_time_is_active(self) -> None: - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) clock._set_ros_time_is_active(True) assert clock.ros_time_is_active clock._set_ros_time_is_active(False) assert not clock.ros_time_is_active + def test_ros_time_is_active_raises_when_clock_type_is_not_ros_time(self) -> None: + clock = Clock(clock_type=ClockType.SYSTEM_TIME) + with self.assertRaises(RCLError): + clock.ros_time_is_active + + clock = Clock(clock_type=ClockType.STEADY_TIME) + with self.assertRaises(RCLError): + clock.ros_time_is_active + + def test_set_ros_time_override_raises_when_clock_type_is_not_ros_time(self) -> None: + clock = Clock(clock_type=ClockType.SYSTEM_TIME) + with self.assertRaises(RCLError): + clock.set_ros_time_override(Time(seconds=1)) + + clock = Clock(clock_type=ClockType.STEADY_TIME) + with self.assertRaises(RCLError): + clock.set_ros_time_override(Time(seconds=1)) + def test_triggered_time_jump_callbacks(self) -> None: one_second = Duration(seconds=1) half_second = Duration(seconds=0.5) @@ -130,7 +150,7 @@ def test_triggered_time_jump_callbacks(self) -> None: pre_callback2 = Mock() post_callback2 = Mock() - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) handler1 = clock.create_jump_callback( threshold1, pre_callback=pre_callback1, post_callback=post_callback1) handler2 = clock.create_jump_callback( @@ -182,7 +202,7 @@ def test_triggered_clock_change_callbacks(self) -> None: pre_callback3 = Mock() post_callback3 = Mock() - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) handler1 = clock.create_jump_callback( threshold1, pre_callback=pre_callback1, post_callback=post_callback1) handler2 = clock.create_jump_callback( @@ -307,7 +327,7 @@ def test_sleep_for_negative_duration(default_context: Context, clock_type: Clock @pytest.mark.parametrize('ros_time_enabled', (True, False)) def test_sleep_until_ros_time_toggled(default_context: Context, ros_time_enabled: bool) -> None: - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) clock._set_ros_time_is_active(not ros_time_enabled) retval = None @@ -335,7 +355,7 @@ def run() -> None: @pytest.mark.parametrize('ros_time_enabled', (True, False)) def test_sleep_for_ros_time_toggled(default_context: Context, ros_time_enabled: bool) -> None: - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) clock._set_ros_time_is_active(not ros_time_enabled) retval = None @@ -413,7 +433,7 @@ def run() -> None: def test_sleep_until_ros_time_enabled(default_context: Context) -> None: - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) clock._set_ros_time_is_active(True) start_time = Time(seconds=1, clock_type=ClockType.ROS_TIME) @@ -444,7 +464,7 @@ def run() -> None: def test_sleep_for_ros_time_enabled(default_context: Context) -> None: - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) clock._set_ros_time_is_active(True) start_time = Time(seconds=1, clock_type=ClockType.ROS_TIME) @@ -476,7 +496,7 @@ def run() -> None: def test_with_jump_handle() -> None: - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) clock._set_ros_time_is_active(False) post_callback = Mock() diff --git a/rclpy/test/test_logging.py b/rclpy/test/test_logging.py index 4b2b586ab..ff11cb17d 100644 --- a/rclpy/test/test_logging.py +++ b/rclpy/test/test_logging.py @@ -19,7 +19,8 @@ import unittest import rclpy -from rclpy.clock import Clock, ROSClock +from rclpy.clock import Clock +from rclpy.clock_type import ClockType from rclpy.logging import LoggingSeverity from rclpy.time import Time from rclpy.time_source import TimeSource @@ -125,7 +126,7 @@ def test_log_throttle(self) -> None: def test_log_throttle_ros_clock(self) -> None: message_was_logged = [] - ros_clock = ROSClock() + ros_clock = Clock(clock_type=ClockType.ROS_TIME) time_source = TimeSource() time_source.attach_clock(ros_clock) time_source.ros_time_is_active = True diff --git a/rclpy/test/test_time_source.py b/rclpy/test/test_time_source.py index 2a5697110..f8e4e577c 100644 --- a/rclpy/test/test_time_source.py +++ b/rclpy/test/test_time_source.py @@ -17,10 +17,9 @@ from unittest.mock import Mock import rclpy -from rclpy.clock import Clock +from rclpy.clock import BaseClock, Clock from rclpy.clock import ClockChange from rclpy.clock import JumpThreshold -from rclpy.clock import ROSClock from rclpy.clock_type import ClockType from rclpy.duration import Duration from rclpy.parameter import Parameter @@ -84,23 +83,23 @@ def set_use_sim_time_parameter(self, value: bool) -> bool: return use_sim_time_param.value == value def test_time_source_attach_clock(self) -> None: - time_source = TimeSource(node=self.node) - - # ROSClock is a specialization of Clock with ROS time methods. - time_source.attach_clock(ROSClock()) + for cls in [BaseClock, Clock]: + with self.subTest(cls=cls): + time_source = TimeSource(node=self.node) + time_source.attach_clock(cls(clock_type=ClockType.ROS_TIME)) - # Other clock types are not supported. - with self.assertRaises(ValueError): - time_source.attach_clock( - Clock(clock_type=ClockType.SYSTEM_TIME)) # type: ignore[arg-type] + # Other clock types are not supported. + with self.assertRaises(ValueError): + time_source.attach_clock( + cls(clock_type=ClockType.SYSTEM_TIME)) - with self.assertRaises(ValueError): - time_source.attach_clock( - Clock(clock_type=ClockType.STEADY_TIME)) # type: ignore[arg-type] + with self.assertRaises(ValueError): + time_source.attach_clock( + cls(clock_type=ClockType.STEADY_TIME)) def test_time_source_not_using_sim_time(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) # When not using sim time, ROS time should look like system time @@ -118,7 +117,7 @@ def test_time_source_not_using_sim_time(self) -> None: # Whether or not an attached clock is using ROS time should be determined by the time # source managing it. self.assertFalse(time_source.ros_time_is_active) - clock2 = ROSClock() + clock2 = Clock(clock_type=ClockType.ROS_TIME) clock2._set_ros_time_is_active(True) time_source.attach_clock(clock2) self.assertFalse(clock2.ros_time_is_active) @@ -126,7 +125,7 @@ def test_time_source_not_using_sim_time(self) -> None: def test_time_source_using_sim_time(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) # Setting ROS time active on a time source should also cause attached clocks' use of ROS @@ -164,7 +163,7 @@ def test_time_source_using_sim_time(self) -> None: def test_forwards_jump(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) assert self.set_use_sim_time_parameter(True) @@ -184,7 +183,7 @@ def test_forwards_jump(self) -> None: def test_backwards_jump(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) assert self.set_use_sim_time_parameter(True) @@ -204,7 +203,7 @@ def test_backwards_jump(self) -> None: def test_clock_change(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) assert self.set_use_sim_time_parameter(True) @@ -230,7 +229,7 @@ def test_clock_change(self) -> None: def test_no_pre_callback(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) assert self.set_use_sim_time_parameter(True) @@ -246,7 +245,7 @@ def test_no_pre_callback(self) -> None: def test_no_post_callback(self) -> None: time_source = TimeSource(node=self.node) - clock = ROSClock() + clock = Clock(clock_type=ClockType.ROS_TIME) time_source.attach_clock(clock) assert self.set_use_sim_time_parameter(True)