-
Notifications
You must be signed in to change notification settings - Fork 275
Expand file tree
/
Copy pathstart_with_error.dart
More file actions
81 lines (66 loc) · 2.27 KB
/
start_with_error.dart
File metadata and controls
81 lines (66 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';
class _StartWithErrorStreamSink<S> extends ForwardingSink<S, S> {
final Object _e;
final StackTrace? _st;
var _isFirstEventAdded = false;
_StartWithErrorStreamSink(this._e, this._st);
@override
void add(EventSink<S> sink, S data) {
_safeAddFirstEvent(sink);
sink.add(data);
}
@override
void addError(EventSink<S> sink, Object e, StackTrace st) {
_safeAddFirstEvent(sink);
sink.addError(e, st);
}
@override
void close(EventSink<S> sink) {
_safeAddFirstEvent(sink);
sink.close();
}
@override
FutureOr onCancel(EventSink<S> sink) {}
@override
void onListen(EventSink<S> sink) {
scheduleMicrotask(() => _safeAddFirstEvent(sink));
}
@override
void onPause(EventSink<S> sink) {}
@override
void onResume(EventSink<S> sink) {}
// Immediately setting the starting value when onListen trigger can
// result in an Exception (might be a bug in dart:async?)
// Therefore, scheduleMicrotask is used after onListen.
// Because events could be added before scheduleMicrotask completes,
// this method is ran before any other events might be added.
// Once the first event(s) is/are successfully added, this method
// will not trigger again.
void _safeAddFirstEvent(EventSink<S> sink) {
if (_isFirstEventAdded) return;
sink.addError(_e, _st);
_isFirstEventAdded = true;
}
}
/// Prepends an error to the source [Stream].
///
/// ### Example
///
/// Stream.fromIterable([2])
/// .transform(StartWithErrorStreamTransformer('error'))
/// .listen(null, onError: (e) => print(e)); // prints 'error'
class StartWithErrorStreamTransformer<S> extends StreamTransformerBase<S, S> {
/// The starting error of this [Stream]
final Object error;
/// The starting stackTrace of this [Stream]
final StackTrace? stackTrace;
/// Constructs a [StreamTransformer] which starts with the provided [error]
/// and then outputs all events from the source [Stream].
StartWithErrorStreamTransformer(this.error, [this.stackTrace]);
@override
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream,
connectedSink: _StartWithErrorStreamSink(error, stackTrace));
}