-
Notifications
You must be signed in to change notification settings - Fork 275
Expand file tree
/
Copy pathdelay.dart
More file actions
103 lines (85 loc) · 3.22 KB
/
delay.dart
File metadata and controls
103 lines (85 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import 'dart:async';
import 'dart:collection';
import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';
class _DelayStreamSink<S> extends ForwardingSink<S, S> {
final Duration _duration;
var _inputClosed = false;
final _subscriptions = Queue<StreamSubscription<void>>();
_DelayStreamSink(this._duration);
@override
void add(EventSink<S> sink, S data) {
final subscription = Rx.timer<void>(null, _duration).listen((_) {
_subscriptions.removeFirst();
sink.add(data);
if (_inputClosed && _subscriptions.isEmpty) {
sink.close();
}
});
_subscriptions.addLast(subscription);
}
@override
void addError(EventSink<S> sink, Object error, StackTrace st) =>
sink.addError(error, st);
@override
void close(EventSink<S> sink) {
_inputClosed = true;
if (_subscriptions.isEmpty) {
sink.close();
}
}
@override
FutureOr<void> onCancel(EventSink<S> sink) {
if (_subscriptions.isNotEmpty) {
return Future.wait(_subscriptions.map((t) => t.cancel()))
.whenComplete(() => _subscriptions.clear());
}
}
@override
void onListen(EventSink<S> sink) {}
@override
void onPause(EventSink<S> sink) => _subscriptions.forEach((s) => s.pause());
@override
void onResume(EventSink<S> sink) => _subscriptions.forEach((s) => s.resume());
}
/// The Delay operator modifies its source Stream by pausing for
/// a particular increment of time (that you specify) before emitting
/// each of the source Stream’s items.
/// This has the effect of shifting the entire sequence of items emitted
/// by the Stream forward in time by that specified increment.
///
/// [Interactive marble diagram](http://rxmarbles.com/#delay)
///
/// ### Example
///
/// Stream.fromIterable([1, 2, 3, 4])
/// .delay(Duration(seconds: 1))
/// .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately
class DelayStreamTransformer<S> extends StreamTransformerBase<S, S> {
/// The delay used to pause initial emission of events by
final Duration duration;
/// Constructs a [StreamTransformer] which will first pause for [duration] of time,
/// before submitting events from the source [Stream].
DelayStreamTransformer(this.duration);
@override
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream, connectedSink: _DelayStreamSink<S>(duration));
}
/// Extends the Stream class with the ability to delay events being emitted
extension DelayExtension<T> on Stream<T> {
/// The Delay operator modifies its source Stream by pausing for a particular
/// increment of time (that you specify) before emitting each of the source
/// Stream’s items. This has the effect of shifting the entire sequence of
/// items emitted by the Stream forward in time by that specified increment.
///
/// [Interactive marble diagram](http://rxmarbles.com/#delay)
///
/// ### Example
///
/// Stream.fromIterable([1, 2, 3, 4])
/// .delay(Duration(seconds: 1))
/// .listen(print); // [after one second delay] prints 1, 2, 3, 4 immediately
Stream<T> delay(Duration duration) =>
transform(DelayStreamTransformer<T>(duration));
}