1313# limitations under the License.
1414
1515import asyncio
16- from typing import Optional , Set
16+ from typing import Dict , Optional
1717
18- from rclpy .clock import BaseClock , ClockChange , JumpThreshold , TimeJump
18+ from rclpy .clock import BaseClock , ClockChange , JumpHandle , JumpThreshold , TimeJump
1919from rclpy .clock_type import ClockType
2020from rclpy .duration import Duration
2121from rclpy .exceptions import TimeSourceChangedError
22+ from rclpy .time import Time
2223
2324
2425class AsyncClock (BaseClock ):
2526 """Clock with asyncio-compatible sleep support."""
2627
2728 def __init__ (self , * , clock_type : ClockType = ClockType .SYSTEM_TIME ) -> None :
2829 super ().__init__ (clock_type = clock_type )
29- self ._pending_sleeps : Set [asyncio .Future ] = set ()
30+ self ._pending_sleeps : Dict [asyncio .Future , Optional [ Time ]] = {}
3031 self ._destroyed = False
3132
33+ threshold = JumpThreshold (
34+ min_forward = Duration (nanoseconds = 1 ),
35+ min_backward = None ,
36+ on_clock_change = True ,
37+ )
38+ self ._jump_handle : JumpHandle = self .create_jump_callback (
39+ threshold , post_callback = self ._on_jump )
40+
41+ @staticmethod
42+ def _resolve_future (future : asyncio .Future ) -> None :
43+ if not future .done ():
44+ future .set_result (None )
45+
46+ def _on_jump (self , time_jump : TimeJump ) -> None :
47+ if time_jump .clock_change in (
48+ ClockChange .ROS_TIME_ACTIVATED ,
49+ ClockChange .ROS_TIME_DEACTIVATED ,
50+ ):
51+ for future in self ._pending_sleeps :
52+ if not future .done ():
53+ future .set_exception (TimeSourceChangedError ())
54+ elif time_jump .clock_change == ClockChange .ROS_TIME_NO_CHANGE :
55+ now = self .now ()
56+ for future , target in self ._pending_sleeps .items ():
57+ if target is not None and now >= target :
58+ self ._resolve_future (future )
59+
3260 def _destroy (self ) -> None :
3361 """Cancel all pending sleeps. Called by AsyncNode.destroy_node()."""
62+ if self ._destroyed :
63+ return
3464 self ._destroyed = True
35- for future in list (self ._pending_sleeps ):
65+ self ._jump_handle .unregister ()
66+ for future in self ._pending_sleeps :
3667 future .cancel ()
68+ self .handle .destroy_when_not_in_use ()
3769
3870 async def sleep (self , duration_sec : float ) -> None :
3971 """
@@ -51,40 +83,18 @@ async def sleep(self, duration_sec: float) -> None:
5183 loop = asyncio .get_running_loop ()
5284 future : asyncio .Future [None ] = loop .create_future ()
5385 timer_handle : Optional [asyncio .TimerHandle ] = None
54- target = None
55-
56- def _resolve () -> None :
57- if not future .done ():
58- future .set_result (None )
59-
60- def _reject () -> None :
61- if not future .done ():
62- future .set_exception (TimeSourceChangedError ())
86+ target : Optional [Time ] = None
6387
6488 if self .ros_time_is_active :
6589 target = self .now () + Duration (nanoseconds = int (duration_sec * 1e9 ))
6690 else :
67- timer_handle = loop .call_later (duration_sec , _resolve )
91+ timer_handle = loop .call_later (
92+ duration_sec , AsyncClock ._resolve_future , future )
6893
69- def _on_jump (time_jump : TimeJump ) -> None :
70- if time_jump .clock_change in (
71- ClockChange .ROS_TIME_ACTIVATED ,
72- ClockChange .ROS_TIME_DEACTIVATED ,
73- ):
74- _reject ()
75- elif target is not None and self .now () >= target :
76- _resolve ()
77-
78- threshold = JumpThreshold (
79- min_forward = Duration (nanoseconds = 1 ),
80- min_backward = None ,
81- on_clock_change = True ,
82- )
83- with self .create_jump_callback (threshold , post_callback = _on_jump ):
84- self ._pending_sleeps .add (future )
85- try :
86- await future
87- finally :
88- self ._pending_sleeps .discard (future )
89- if timer_handle is not None :
90- timer_handle .cancel ()
94+ self ._pending_sleeps [future ] = target
95+ try :
96+ await future
97+ finally :
98+ self ._pending_sleeps .pop (future )
99+ if timer_handle is not None :
100+ timer_handle .cancel ()
0 commit comments