From 76e7baa7df38113ee59f09b7924157b62339274f Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Mon, 3 Feb 2025 12:36:28 +0700 Subject: [PATCH 1/2] use Queue in startWith --- .../lib/src/transformers/start_with.dart | 48 +++++++++++++++++-- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/packages/rxdart/lib/src/transformers/start_with.dart b/packages/rxdart/lib/src/transformers/start_with.dart index 33583305..ee62498c 100644 --- a/packages/rxdart/lib/src/transformers/start_with.dart +++ b/packages/rxdart/lib/src/transformers/start_with.dart @@ -1,28 +1,66 @@ import 'dart:async'; +import 'dart:collection'; +import 'package:rxdart/rxdart.dart'; import 'package:rxdart/src/utils/forwarding_sink.dart'; import 'package:rxdart/src/utils/forwarding_stream.dart'; class _StartWithStreamSink extends ForwardingSink { final S _startValue; + late final queue = Queue>() + ..add(StreamNotification.data(_startValue)); + var _isCancelled = false; + _StartWithStreamSink(this._startValue); @override - void onData(S data) => sink.add(data); + void onData(S data) { + if (queue.isEmpty) { + sink.add(data); + } else { + queue.add(StreamNotification.data(data)); + } + } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) { + if (queue.isEmpty) { + sink.addError(e, st); + } else { + queue.add(StreamNotification.error(e, st)); + } + } @override - void onDone() => sink.close(); + void onDone() { + if (queue.isEmpty) { + sink.close(); + } else { + queue.add(DoneNotification()); + } + } @override - FutureOr onCancel() {} + FutureOr onCancel() { + _isCancelled = true; + } @override void onListen() { - sink.add(_startValue); + scheduleMicrotask(() { + final add = sink.add; + final addError = sink.addError; + final close = sink.close; + + while (queue.isNotEmpty) { + if (_isCancelled) { + queue.clear(); + return; + } + queue.removeFirst().when(data: add, error: addError, done: close); + } + }); } @override From 49da3e3596a31a89ad54e448bc6bd6d4c237a492 Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Mon, 3 Feb 2025 15:21:19 +0700 Subject: [PATCH 2/2] refactor --- .../backpressure/backpressure.dart | 32 +++++++------ .../rxdart/lib/src/transformers/delay.dart | 8 ++-- .../lib/src/transformers/delay_when.dart | 13 ++--- packages/rxdart/lib/src/transformers/do.dart | 24 +++++----- .../lib/src/transformers/exhaust_map.dart | 12 ++--- .../rxdart/lib/src/transformers/flat_map.dart | 11 +++-- .../rxdart/lib/src/transformers/group_by.dart | 8 ++-- .../lib/src/transformers/on_error_resume.dart | 10 ++-- .../lib/src/transformers/skip_last.dart | 6 +-- .../lib/src/transformers/skip_until.dart | 8 ++-- .../lib/src/transformers/start_with.dart | 47 ++---------------- .../src/transformers/start_with_error.dart | 8 ++-- .../lib/src/transformers/start_with_many.dart | 8 ++-- .../lib/src/transformers/switch_if_empty.dart | 12 ++--- .../lib/src/transformers/switch_map.dart | 14 +++--- .../lib/src/transformers/take_last.dart | 6 +-- .../lib/src/transformers/take_until.dart | 8 ++-- .../lib/src/transformers/time_interval.dart | 6 +-- .../src/transformers/with_latest_from.dart | 10 ++-- .../rxdart/lib/src/utils/forwarding_sink.dart | 48 +++++++++++++++++-- .../lib/src/utils/forwarding_stream.dart | 32 ++++++++----- 21 files changed, 174 insertions(+), 157 deletions(-) diff --git a/packages/rxdart/lib/src/transformers/backpressure/backpressure.dart b/packages/rxdart/lib/src/transformers/backpressure/backpressure.dart index 18c2439a..c9dadb32 100644 --- a/packages/rxdart/lib/src/transformers/backpressure/backpressure.dart +++ b/packages/rxdart/lib/src/transformers/backpressure/backpressure.dart @@ -72,7 +72,7 @@ class _BackpressureStreamSink extends ForwardingSink { } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { @@ -93,7 +93,7 @@ class _BackpressureStreamSink extends ForwardingSink { queue.clear(); _windowSubscription?.cancel(); - sink.close(); + sink.closeSync(); } @override @@ -108,7 +108,7 @@ class _BackpressureStreamSink extends ForwardingSink { @override void onResume() => _windowSubscription?.resume(); - void maybeCreateWindow(S event, EventSink sink) { + void maybeCreateWindow(S event, EnhancedEventSink sink) { switch (_strategy) { // for example throttle case WindowStrategy.eventAfterLastWindow: @@ -142,29 +142,30 @@ class _BackpressureStreamSink extends ForwardingSink { } } - void maybeCloseWindow(EventSink sink) { + void maybeCloseWindow(EnhancedEventSink sink) { if (_closeWindowWhen != null && _closeWindowWhen!(unmodifiableQueue)) { resolveWindowEnd(sink); } } - StreamSubscription singleWindow(S event, EventSink sink) => + StreamSubscription singleWindow( + S event, EnhancedEventSink sink) => buildStream(event, sink).take(1).listen( null, - onError: sink.addError, + onError: sink.addErrorSync, onDone: () => resolveWindowEnd(sink, _mainClosed), ); // opens a new Window which is kept open until the main Stream // closes. - StreamSubscription multiWindow(S event, EventSink sink) => + StreamSubscription multiWindow(S event, EnhancedEventSink sink) => buildStream(event, sink).listen( (dynamic _) => resolveWindowEnd(sink), - onError: sink.addError, + onError: sink.addErrorSync, onDone: () => resolveWindowEnd(sink), ); - Stream buildStream(S event, EventSink sink) { + Stream buildStream(S event, EnhancedEventSink sink) { Stream stream; _windowSubscription?.cancel(); @@ -174,27 +175,28 @@ class _BackpressureStreamSink extends ForwardingSink { return stream; } - void resolveWindowStart(S event, EventSink sink) { + void resolveWindowStart(S event, EnhancedEventSink sink) { if (_onWindowStart != null) { - sink.add(_onWindowStart!(event)); + sink.addSync(_onWindowStart!(event)); } } - void resolveWindowEnd(EventSink sink, [bool isControllerClosing = false]) { + void resolveWindowEnd(EnhancedEventSink sink, + [bool isControllerClosing = false]) { if (isControllerClosing && _strategy == WindowStrategy.eventAfterLastWindow) { if (_dispatchOnClose && _hasData && queue.length > 1 && _onWindowEnd != null) { - sink.add(_onWindowEnd!(unmodifiableQueue)); + sink.addSync(_onWindowEnd!(unmodifiableQueue)); } queue.clear(); _windowSubscription?.cancel(); _windowSubscription = null; - sink.close(); + sink.closeSync(); return; } @@ -211,7 +213,7 @@ class _BackpressureStreamSink extends ForwardingSink { if (_hasData && (queue.isNotEmpty || !_ignoreEmptyWindows)) { if (_onWindowEnd != null) { - sink.add(_onWindowEnd!(unmodifiableQueue)); + sink.addSync(_onWindowEnd!(unmodifiableQueue)); } // prepare the buffer for the next window. diff --git a/packages/rxdart/lib/src/transformers/delay.dart b/packages/rxdart/lib/src/transformers/delay.dart index 658042ab..ce738e38 100644 --- a/packages/rxdart/lib/src/transformers/delay.dart +++ b/packages/rxdart/lib/src/transformers/delay.dart @@ -18,10 +18,10 @@ class _DelayStreamSink extends ForwardingSink { final subscription = Rx.timer(null, _duration).listen((_) { _subscriptions.removeFirst(); - sink.add(data); + sink.addSync(data); if (_inputClosed && _subscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } }); @@ -29,14 +29,14 @@ class _DelayStreamSink extends ForwardingSink { } @override - void onError(Object error, StackTrace st) => sink.addError(error, st); + void onError(Object error, StackTrace st) => sink.addErrorSync(error, st); @override void onDone() { _inputClosed = true; if (_subscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } } diff --git a/packages/rxdart/lib/src/transformers/delay_when.dart b/packages/rxdart/lib/src/transformers/delay_when.dart index 6ee9bedc..0cc694cf 100644 --- a/packages/rxdart/lib/src/transformers/delay_when.dart +++ b/packages/rxdart/lib/src/transformers/delay_when.dart @@ -17,15 +17,16 @@ class _DelayWhenStreamSink extends ForwardingSink { @override void onData(T data) { - final subscription = - itemDelaySelector(data).take(1).listen(null, onError: sink.addError); + final subscription = itemDelaySelector(data) + .take(1) + .listen(null, onError: sink.addErrorSync); subscription.onDone(() { subscriptions.remove(subscription); - sink.add(data); + sink.addSync(data); if (subscriptions.isEmpty && closed) { - sink.close(); + sink.closeSync(); } }); @@ -33,13 +34,13 @@ class _DelayWhenStreamSink extends ForwardingSink { } @override - void onError(Object error, StackTrace st) => sink.addError(error, st); + void onError(Object error, StackTrace st) => sink.addErrorSync(error, st); @override void onDone() { closed = true; if (subscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } } diff --git a/packages/rxdart/lib/src/transformers/do.dart b/packages/rxdart/lib/src/transformers/do.dart index 2953880e..adb1d7c4 100644 --- a/packages/rxdart/lib/src/transformers/do.dart +++ b/packages/rxdart/lib/src/transformers/do.dart @@ -30,14 +30,14 @@ class _DoStreamSink extends ForwardingSink { try { _onData?.call(data); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } try { _onEach?.call(StreamNotification.data(data)); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } - sink.add(data); + sink.addSync(data); } @override @@ -45,14 +45,14 @@ class _DoStreamSink extends ForwardingSink { try { _onError?.call(e, st); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } try { _onEach?.call(StreamNotification.error(e, st)); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } - sink.addError(e, st); + sink.addErrorSync(e, st); } @override @@ -60,14 +60,14 @@ class _DoStreamSink extends ForwardingSink { try { _onDone?.call(); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } try { _onEach?.call(StreamNotification.done()); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } - sink.close(); + sink.closeSync(); } @override @@ -78,7 +78,7 @@ class _DoStreamSink extends ForwardingSink { try { _onListen?.call(); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } } @@ -87,7 +87,7 @@ class _DoStreamSink extends ForwardingSink { try { _onPause?.call(); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } } @@ -96,7 +96,7 @@ class _DoStreamSink extends ForwardingSink { try { _onResume?.call(); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); } } } diff --git a/packages/rxdart/lib/src/transformers/exhaust_map.dart b/packages/rxdart/lib/src/transformers/exhaust_map.dart index eadf7a6e..ea289fc5 100644 --- a/packages/rxdart/lib/src/transformers/exhaust_map.dart +++ b/packages/rxdart/lib/src/transformers/exhaust_map.dart @@ -20,31 +20,31 @@ class _ExhaustMapStreamSink extends ForwardingSink { try { mappedStream = _mapper(data); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); return; } _mapperSubscription = mappedStream.listen( - sink.add, - onError: sink.addError, + sink.addSync, + onError: sink.addErrorSync, onDone: () { _mapperSubscription = null; if (_inputClosed) { - sink.close(); + sink.closeSync(); } }, ); } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { _inputClosed = true; - _mapperSubscription ?? sink.close(); + _mapperSubscription ?? sink.closeSync(); } @override diff --git a/packages/rxdart/lib/src/transformers/flat_map.dart b/packages/rxdart/lib/src/transformers/flat_map.dart index 602e5f28..ee0fa532 100644 --- a/packages/rxdart/lib/src/transformers/flat_map.dart +++ b/packages/rxdart/lib/src/transformers/flat_map.dart @@ -29,32 +29,33 @@ class _FlatMapStreamSink extends ForwardingSink { try { mappedStream = _mapper(data); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); return; } - final subscription = mappedStream.listen(sink.add, onError: sink.addError); + final subscription = + mappedStream.listen(sink.addSync, onError: sink.addErrorSync); subscription.onDone(() { _subscriptions.remove(subscription); if (queue.isNotEmpty) { listenInner(queue.removeFirst()); } else if (_inputClosed && _subscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } }); _subscriptions.add(subscription); } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { _inputClosed = true; if (_subscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } } diff --git a/packages/rxdart/lib/src/transformers/group_by.dart b/packages/rxdart/lib/src/transformers/group_by.dart index 18ec60c2..f3b29565 100644 --- a/packages/rxdart/lib/src/transformers/group_by.dart +++ b/packages/rxdart/lib/src/transformers/group_by.dart @@ -37,7 +37,7 @@ class _GroupByStreamSink extends ForwardingSink> { ); } - sink.add(groupByStream); + sink.addSync(groupByStream); return groupedController; } @@ -47,7 +47,7 @@ class _GroupByStreamSink extends ForwardingSink> { try { key = grouper(data); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); return; } @@ -55,12 +55,12 @@ class _GroupByStreamSink extends ForwardingSink> { } @override - void onError(e, st) => sink.addError(e, st); + void onError(e, st) => sink.addErrorSync(e, st); @override void onDone() { _closeAll(); - sink.close(); + sink.closeSync(); } @override diff --git a/packages/rxdart/lib/src/transformers/on_error_resume.dart b/packages/rxdart/lib/src/transformers/on_error_resume.dart index 85fd9778..60312f15 100644 --- a/packages/rxdart/lib/src/transformers/on_error_resume.dart +++ b/packages/rxdart/lib/src/transformers/on_error_resume.dart @@ -12,7 +12,7 @@ class _OnErrorResumeStreamSink extends ForwardingSink { _OnErrorResumeStreamSink(this._recoveryFn); @override - void onData(S data) => sink.add(data); + void onData(S data) => sink.addSync(data); @override void onError(Object e, StackTrace st) { @@ -21,16 +21,16 @@ class _OnErrorResumeStreamSink extends ForwardingSink { try { recoveryStream = _recoveryFn(e, st); } catch (newError, newSt) { - sink.addError(newError, newSt); + sink.addErrorSync(newError, newSt); return; } final subscription = - recoveryStream.listen(sink.add, onError: sink.addError); + recoveryStream.listen(sink.addSync, onError: sink.addErrorSync); subscription.onDone(() { _recoverySubscriptions.remove(subscription); if (closed && _recoverySubscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } }); _recoverySubscriptions.add(subscription); @@ -40,7 +40,7 @@ class _OnErrorResumeStreamSink extends ForwardingSink { void onDone() { closed = true; if (_recoverySubscriptions.isEmpty) { - sink.close(); + sink.closeSync(); } } diff --git a/packages/rxdart/lib/src/transformers/skip_last.dart b/packages/rxdart/lib/src/transformers/skip_last.dart index dd1b0cfc..b01b0531 100644 --- a/packages/rxdart/lib/src/transformers/skip_last.dart +++ b/packages/rxdart/lib/src/transformers/skip_last.dart @@ -15,15 +15,15 @@ class _SkipLastStreamSink extends ForwardingSink { } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { final limit = queue.length - count; if (limit > 0) { - queue.sublist(0, limit).forEach(sink.add); + queue.sublist(0, limit).forEach(sink.addSync); } - sink.close(); + sink.closeSync(); } @override diff --git a/packages/rxdart/lib/src/transformers/skip_until.dart b/packages/rxdart/lib/src/transformers/skip_until.dart index 5407620d..742f6abb 100644 --- a/packages/rxdart/lib/src/transformers/skip_until.dart +++ b/packages/rxdart/lib/src/transformers/skip_until.dart @@ -13,17 +13,17 @@ class _SkipUntilStreamSink extends ForwardingSink { @override void onData(S data) { if (_canAdd) { - sink.add(data); + sink.addSync(data); } } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { _otherSubscription?.cancel(); - sink.close(); + sink.closeSync(); } @override @@ -32,7 +32,7 @@ class _SkipUntilStreamSink extends ForwardingSink { @override void onListen() => _otherSubscription = _otherStream .take(1) - .listen(null, onError: sink.addError, onDone: () => _canAdd = true); + .listen(null, onError: sink.addErrorSync, onDone: () => _canAdd = true); @override void onPause() => _otherSubscription?.pause(); diff --git a/packages/rxdart/lib/src/transformers/start_with.dart b/packages/rxdart/lib/src/transformers/start_with.dart index ee62498c..b6592e36 100644 --- a/packages/rxdart/lib/src/transformers/start_with.dart +++ b/packages/rxdart/lib/src/transformers/start_with.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:collection'; import 'package:rxdart/rxdart.dart'; import 'package:rxdart/src/utils/forwarding_sink.dart'; @@ -8,59 +7,23 @@ import 'package:rxdart/src/utils/forwarding_stream.dart'; class _StartWithStreamSink extends ForwardingSink { final S _startValue; - late final queue = Queue>() - ..add(StreamNotification.data(_startValue)); - var _isCancelled = false; - _StartWithStreamSink(this._startValue); @override - void onData(S data) { - if (queue.isEmpty) { - sink.add(data); - } else { - queue.add(StreamNotification.data(data)); - } - } + void onData(S data) => sink.addSync(data); @override - void onError(Object e, StackTrace st) { - if (queue.isEmpty) { - sink.addError(e, st); - } else { - queue.add(StreamNotification.error(e, st)); - } - } + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override - void onDone() { - if (queue.isEmpty) { - sink.close(); - } else { - queue.add(DoneNotification()); - } - } + void onDone() => sink.closeSync(); @override - FutureOr onCancel() { - _isCancelled = true; - } + FutureOr onCancel() {} @override void onListen() { - scheduleMicrotask(() { - final add = sink.add; - final addError = sink.addError; - final close = sink.close; - - while (queue.isNotEmpty) { - if (_isCancelled) { - queue.clear(); - return; - } - queue.removeFirst().when(data: add, error: addError, done: close); - } - }); + sink.add(_startValue); } @override diff --git a/packages/rxdart/lib/src/transformers/start_with_error.dart b/packages/rxdart/lib/src/transformers/start_with_error.dart index 3731def8..0fccca06 100644 --- a/packages/rxdart/lib/src/transformers/start_with_error.dart +++ b/packages/rxdart/lib/src/transformers/start_with_error.dart @@ -10,20 +10,20 @@ class _StartWithErrorStreamSink extends ForwardingSink { _StartWithErrorStreamSink(this._e, this._st); @override - void onData(S data) => sink.add(data); + void onData(S data) => sink.addSync(data); @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override - void onDone() => sink.close(); + void onDone() => sink.closeSync(); @override FutureOr onCancel() {} @override void onListen() { - sink.addError(_e, _st); + sink.addErrorSync(_e, _st); } @override diff --git a/packages/rxdart/lib/src/transformers/start_with_many.dart b/packages/rxdart/lib/src/transformers/start_with_many.dart index d7f484ea..6f61fe78 100644 --- a/packages/rxdart/lib/src/transformers/start_with_many.dart +++ b/packages/rxdart/lib/src/transformers/start_with_many.dart @@ -9,20 +9,20 @@ class _StartWithManyStreamSink extends ForwardingSink { _StartWithManyStreamSink(this._startValues); @override - void onData(S data) => sink.add(data); + void onData(S data) => sink.addSync(data); @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override - void onDone() => sink.close(); + void onDone() => sink.closeSync(); @override FutureOr onCancel() {} @override void onListen() { - _startValues.forEach(sink.add); + _startValues.forEach(sink.addSync); } @override diff --git a/packages/rxdart/lib/src/transformers/switch_if_empty.dart b/packages/rxdart/lib/src/transformers/switch_if_empty.dart index 2213f069..030468ba 100644 --- a/packages/rxdart/lib/src/transformers/switch_if_empty.dart +++ b/packages/rxdart/lib/src/transformers/switch_if_empty.dart @@ -14,24 +14,24 @@ class _SwitchIfEmptyStreamSink extends ForwardingSink { @override void onData(S data) { _isEmpty = false; - sink.add(data); + sink.addSync(data); } @override void onError(Object error, StackTrace st) { - sink.addError(error, st); + sink.addErrorSync(error, st); } @override void onDone() { if (_isEmpty) { _fallbackSubscription = _fallbackStream.listen( - sink.add, - onError: sink.addError, - onDone: sink.close, + sink.addSync, + onError: sink.addErrorSync, + onDone: sink.closeSync, ); } else { - sink.close(); + sink.closeSync(); } } diff --git a/packages/rxdart/lib/src/transformers/switch_map.dart b/packages/rxdart/lib/src/transformers/switch_map.dart index 233a8eef..4842c100 100644 --- a/packages/rxdart/lib/src/transformers/switch_map.dart +++ b/packages/rxdart/lib/src/transformers/switch_map.dart @@ -17,7 +17,7 @@ class _SwitchMapStreamSink extends ForwardingSink { try { mappedStream = _mapper(data); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); return; } @@ -32,7 +32,7 @@ class _SwitchMapStreamSink extends ForwardingSink { pauseSubscription(); mapperSubscription.cancel().onError((e, s) { if (!_isCancelled) { - sink.addError(e, s); + sink.addErrorSync(e, s); } }).whenComplete(() => resumeAndListenToInner(mappedStream)); } @@ -50,13 +50,13 @@ class _SwitchMapStreamSink extends ForwardingSink { assert(_mapperSubscription == null); _mapperSubscription = mappedStream.listen( - sink.add, - onError: sink.addError, + sink.addSync, + onError: sink.addErrorSync, onDone: () { _mapperSubscription = null; if (_inputClosed) { - sink.close(); + sink.closeSync(); } }, ); @@ -71,13 +71,13 @@ class _SwitchMapStreamSink extends ForwardingSink { } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { _inputClosed = true; - _mapperSubscription ?? sink.close(); + _mapperSubscription ?? sink.closeSync(); } @override diff --git a/packages/rxdart/lib/src/transformers/take_last.dart b/packages/rxdart/lib/src/transformers/take_last.dart index 2ccf3ec9..0ae119d5 100644 --- a/packages/rxdart/lib/src/transformers/take_last.dart +++ b/packages/rxdart/lib/src/transformers/take_last.dart @@ -21,14 +21,14 @@ class _TakeLastStreamSink extends ForwardingSink { } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { if (queue.isNotEmpty) { - queue.toList(growable: false).forEach(sink.add); + queue.toList(growable: false).forEach(sink.addSync); } - sink.close(); + sink.closeSync(); } @override diff --git a/packages/rxdart/lib/src/transformers/take_until.dart b/packages/rxdart/lib/src/transformers/take_until.dart index eff62da9..a84c8365 100644 --- a/packages/rxdart/lib/src/transformers/take_until.dart +++ b/packages/rxdart/lib/src/transformers/take_until.dart @@ -10,15 +10,15 @@ class _TakeUntilStreamSink extends ForwardingSink { _TakeUntilStreamSink(this._otherStream); @override - void onData(S data) => sink.add(data); + void onData(S data) => sink.addSync(data); @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override void onDone() { _otherSubscription?.cancel(); - sink.close(); + sink.closeSync(); } @override @@ -27,7 +27,7 @@ class _TakeUntilStreamSink extends ForwardingSink { @override void onListen() => _otherSubscription = _otherStream .take(1) - .listen(null, onError: sink.addError, onDone: sink.close); + .listen(null, onError: sink.addErrorSync, onDone: sink.closeSync); @override void onPause() => _otherSubscription?.pause(); diff --git a/packages/rxdart/lib/src/transformers/time_interval.dart b/packages/rxdart/lib/src/transformers/time_interval.dart index 3ac65458..81ed9eea 100644 --- a/packages/rxdart/lib/src/transformers/time_interval.dart +++ b/packages/rxdart/lib/src/transformers/time_interval.dart @@ -9,7 +9,7 @@ class _TimeIntervalStreamSink extends ForwardingSink> { @override void onData(S data) { _stopwatch.stop(); - sink.add( + sink.addSync( TimeInterval( data, Duration( @@ -23,10 +23,10 @@ class _TimeIntervalStreamSink extends ForwardingSink> { } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override - void onDone() => sink.close(); + void onDone() => sink.closeSync(); @override FutureOr onCancel() {} diff --git a/packages/rxdart/lib/src/transformers/with_latest_from.dart b/packages/rxdart/lib/src/transformers/with_latest_from.dart index 5644b666..5b75dcc2 100644 --- a/packages/rxdart/lib/src/transformers/with_latest_from.dart +++ b/packages/rxdart/lib/src/transformers/with_latest_from.dart @@ -22,18 +22,18 @@ class _WithLatestFromStreamSink extends ForwardingSink { try { combinedValue = _combiner(data, List.unmodifiable(_latestValues!)); } catch (e, s) { - sink.addError(e, s); + sink.addErrorSync(e, s); return; } - sink.add(combinedValue); + sink.addSync(combinedValue); } } @override - void onError(Object e, StackTrace st) => sink.addError(e, st); + void onError(Object e, StackTrace st) => sink.addErrorSync(e, st); @override - void onDone() => sink.close(); + void onDone() => sink.closeSync(); @override Future? onCancel() { @@ -58,7 +58,7 @@ class _WithLatestFromStreamSink extends ForwardingSink { } _latestValues![index] = value; }, - onError: sink.addError, + onError: sink.addErrorSync, ); } diff --git a/packages/rxdart/lib/src/utils/forwarding_sink.dart b/packages/rxdart/lib/src/utils/forwarding_sink.dart index 8b2765f5..1a8bfe2b 100644 --- a/packages/rxdart/lib/src/utils/forwarding_sink.dart +++ b/packages/rxdart/lib/src/utils/forwarding_sink.dart @@ -2,10 +2,52 @@ import 'dart:async'; import 'package:rxdart/src/utils/error_and_stacktrace.dart'; -/// A enhanced [EventSink] that allows to check if the sink is paused. -abstract class EnhancedEventSink implements EventSink { +/// An interface that is similar to [EventSink] and [MultiStreamController], but with additional features. +/// See also [EventSink] and [MultiStreamController]. +/// +/// Acts like a normal asynchronous controller, but also allows +/// adding events synchronously. +/// As with any synchronous event delivery, the sender should be very careful +/// to not deliver events at times when a new listener might not +/// be ready to receive them. +/// That usually means only delivering events synchronously in response to other +/// asynchronous events, because that is a time when an asynchronous event could +/// happen. +abstract class EnhancedEventSink { /// Whether the subscription would need to buffer events. bool get isPaused; + + /// Adds a data [event] to the sink. + /// + /// Must not be called on a closed sink. + void add(T event); + + /// Adds and delivers an event. + /// + /// Adds an event like [add] and attempts to deliver it immediately. + /// Delivery can be delayed if other previously added events are + /// still pending delivery, if the subscription is paused, + /// or if the subscription isn't listening yet. + void addSync(T value); + + /// Adds and delivers an error event. + /// + /// Adds an error like [addErrorSync] and attempts to deliver it immediately. + /// Delivery can be delayed if other previously added events are + /// still pending delivery, if the subscription is paused, + /// or if the subscription isn't listening yet. + void addErrorSync(Object error, StackTrace? stackTrace); + + /// Closes the controller and delivers a done event. + /// + /// Closes the controller like [closeSync] and attempts to deliver a "done" + /// event immediately. + /// Delivery can be delayed if other previously added events are + /// still pending delivery, if the subscription is paused, + /// or if the subscription isn't listening yet. + /// If it's necessary to know whether the "done" event has been delivered, + /// [done] future will complete when that has happened. + void closeSync(); } /// A [Sink] that supports event hooks. @@ -47,7 +89,7 @@ abstract class ForwardingSink { /// @nonVirtual void resumeSubscription() => _subscription?.resume(); - /// -------------------------------------------------------------------------- + /// ----------------------------- Lifecycle ----------------------------- /// Handle data event void onData(T data); diff --git a/packages/rxdart/lib/src/utils/forwarding_stream.dart b/packages/rxdart/lib/src/utils/forwarding_stream.dart index 1050eca4..a2602959 100644 --- a/packages/rxdart/lib/src/utils/forwarding_stream.dart +++ b/packages/rxdart/lib/src/utils/forwarding_stream.dart @@ -126,40 +126,48 @@ Stream _forward( return controller.stream; } -class _MultiControllerSink implements EventSink, EnhancedEventSink { - final MultiStreamController controller; +class _MultiControllerSink implements EnhancedEventSink { + final MultiStreamController _controller; - _MultiControllerSink(this.controller); + _MultiControllerSink(this._controller); @override - void add(T event) => controller.addSync(event); + void addSync(T event) => _controller.addSync(event); @override - void addError(Object error, [StackTrace? stackTrace]) => - controller.addErrorSync(error, stackTrace); + void addErrorSync(Object error, [StackTrace? stackTrace]) => + _controller.addErrorSync(error, stackTrace); @override - void close() => controller.closeSync(); + void closeSync() => _controller.closeSync(); @override - bool get isPaused => controller.isPaused; + bool get isPaused => _controller.isPaused; + + @override + void add(T event) => _controller.add(event); } class _EnhancedEventSink implements EnhancedEventSink { final StreamController _controller; - _EnhancedEventSink(this._controller); + _EnhancedEventSink(this._controller) { + assert(_controller.runtimeType.toString().startsWith('_Sync')); + } @override - void add(T event) => _controller.add(event); + void addSync(T event) => _controller.add(event); @override - void addError(Object error, [StackTrace? stackTrace]) => + void addErrorSync(Object error, [StackTrace? stackTrace]) => _controller.addError(error, stackTrace); @override - void close() => _controller.close(); + void closeSync() => _controller.close(); @override bool get isPaused => _controller.isPaused; + + @override + void add(T event) => _controller.add(event); }