forEach<S, T> method
- Iterable<
S> elements, - FutureOr<
T> action(- S source
- bool onError(
- S item,
- Object error,
- StackTrace stack
Returns a Stream containing the result of action applied to each
element of elements.
While action is invoked on each element of elements in order,
it's possible the return Stream may have items out-of-order – especially
if the completion time of action varies.
If action throws an error the source item along with the error object
and StackTrace are passed to onError, if it is provided. If onError
returns true, the error is added to the returned Stream, otherwise
it is ignored.
Errors thrown from iterating elements will not be passed to
onError. They will always be added to the returned stream as an error.
Note: all of the resources of the this Pool will be used when the returned Stream is listened to until it is completed or canceled.
Note: if this Pool is closed before the returned Stream is listened to, a StateError is thrown.
Implementation
Stream<T> forEach<S, T>(
Iterable<S> elements, FutureOr<T> Function(S source) action,
{bool Function(S item, Object error, StackTrace stack)? onError}) {
onError ??= (item, e, s) => true;
var cancelPending = false;
Completer? resumeCompleter;
late StreamController<T> controller;
late Iterator<S> iterator;
Future<void> run(int _) async {
while (iterator.moveNext()) {
// caching `current` is necessary because there are async breaks
// in this code and `iterator` is shared across many workers
final current = iterator.current;
_resetTimer();
if (resumeCompleter != null) {
await resumeCompleter!.future;
}
if (cancelPending) {
break;
}
T value;
try {
value = await action(current);
} catch (e, stack) {
if (onError!(current, e, stack)) {
controller.addError(e, stack);
}
continue;
}
controller.add(value);
}
}
Future<void>? doneFuture;
void onListen() {
iterator = elements.iterator;
assert(doneFuture == null);
var futures = Iterable<Future<void>>.generate(
_maxAllocatedResources, (i) => withResource(() => run(i)));
doneFuture = Future.wait(futures, eagerError: true)
.then<void>((_) {})
.catchError(controller.addError);
doneFuture!.whenComplete(controller.close);
}
controller = StreamController<T>(
sync: true,
onListen: onListen,
onCancel: () async {
assert(!cancelPending);
cancelPending = true;
await doneFuture;
},
onPause: () {
assert(resumeCompleter == null);
resumeCompleter = Completer<void>();
},
onResume: () {
assert(resumeCompleter != null);
resumeCompleter!.complete();
resumeCompleter = null;
},
);
return controller.stream;
}