“异步编程:流”
:::次要的 意义何在?
- 流提供异步数据序列。
- 数据序列包括用户生成的事件和从文件中读取的数据。
- 你可以使用 await for 或 Stream API 中的
listen()
来处理流。 - 流提供了一种响应错误的方法。
- 流有两种:单订阅或广播。 :::
Dart 中的异步编程以 Future 和 Stream 类为特征。
Future 表示一个不会立即完成的计算。普通函数返回结果,而异步函数返回 Future,Future 最终将包含结果。Future 会告诉你结果何时准备就绪。
流是异步事件的序列。它类似于异步 Iterable——你不用在需要时获取下一个事件,而是流会在事件准备就绪时通知你。
接收流事件
#流可以通过多种方式创建,这将在另一篇文章中讨论,但它们的使用方法都相同:异步 for 循环(通常简称为 await for )像 for 循环 遍历 Iterable 一样遍历流的事件。例如:
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
这段代码简单地接收整数事件流的每个事件,将它们加起来,并返回(一个 Future 的)总和。当循环体结束时,函数会暂停,直到下一个事件到达或流完成。
该函数用 async
关键字标记,在使用 await for 循环时这是必需的。
下面的例子使用 async*
函数生成一个简单的整数流来测试之前的代码:
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
await for (final value in stream) {
sum += value;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
yield i;
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // 55
}
错误事件
#当流中没有更多事件时,流就完成了,接收事件的代码会像接收到新事件一样收到通知。当使用 await for 循环读取事件时,循环在流完成时停止。
在某些情况下,错误发生在流完成之前;例如,从远程服务器获取文件时网络失败,或者创建事件的代码存在错误,但有人需要知道它。
流也可以像传递数据事件一样传递错误事件。大多数流在第一个错误之后会停止,但可以有传递多个错误的流,以及在错误事件之后传递更多数据的流。在本文件中,我们只讨论最多传递一个错误的流。
当使用 await for 读取流时,循环语句会抛出错误。这也会结束循环。你可以使用 try-catch 来捕获错误。下面的例子在循环迭代器等于 4 时抛出一个错误:
Future<int> sumStream(Stream<int> stream) async {
var sum = 0;
try {
await for (final value in stream) {
sum += value;
}
} catch (e) {
return -1;
}
return sum;
}
Stream<int> countStream(int to) async* {
for (int i = 1; i <= to; i++) {
if (i == 4) {
throw Exception('Intentional exception');
} else {
yield i;
}
}
}
void main() async {
var stream = countStream(10);
var sum = await sumStream(stream);
print(sum); // -1
}
使用流
#Stream 类包含许多辅助方法,可以为你完成流上的常见操作,类似于 Iterable 上的方法。例如,你可以使用 Stream API 中的 lastWhere()
查找流中的最后一个正整数。
Future<int> lastPositive(Stream<int> stream) =>
stream.lastWhere((x) => x >= 0);
两种流
#有两种流。
单订阅流
#最常见的流类型包含一系列事件,这些事件是更大整体的一部分。事件需要按正确的顺序传递,并且不能缺少任何事件。这是读取文件或接收 Web 请求时获得的流类型。
这种流只能监听一次。稍后再次监听可能会错过初始事件,然后流的其余部分就没有任何意义了。当你开始监听时,数据将被获取并分块提供。
广播流
#另一种流类型用于可以逐个处理的单个消息。例如,这种流可以用于浏览器中的鼠标事件。
你可以随时开始监听这种流,并且你会收到在你监听时触发的事件。多个监听器可以同时监听,你可以在取消之前的订阅后稍后再次监听。
处理流的方法
#Stream<T> 上的以下方法处理流并返回结果:
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
除了 drain()
和 pipe()
之外,所有这些函数都对应于 Iterable 上的类似函数。每个函数都可以通过使用带有 await for 循环的 async
函数轻松编写(或仅使用其他方法之一)。例如,一些实现可以是:
Future<bool> contains(Object? needle) async {
await for (final event in this) {
if (event == needle) return true;
}
return false;
}
Future forEach(void Function(T element) action) async {
await for (final event in this) {
action(event);
}
}
Future<List<T>> toList() async {
final result = <T>[];
await forEach(result.add);
return result;
}
Future<String> join([String separator = '']) async =>
(await toList()).join(separator);
(实际实现稍微复杂一些,但这主要是出于历史原因。)
修改流的方法
#Stream 上的以下方法基于原始流返回一个新流。每个方法都在有人监听新流之前监听原始流。
Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);
上述方法对应于 Iterable 上的类似方法,这些方法将可迭代对象转换为另一个可迭代对象。所有这些都可以使用带有 await for 循环的 async
函数轻松编写。
Stream<E> asyncExpand<E>(Stream<E>? Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next)? equals]);
asyncExpand()
和 asyncMap()
函数类似于 expand()
和 map()
,但允许它们的函数参数为异步函数。 distinct()
函数在 Iterable
中不存在,但它本可以存在。
Stream<T> handleError(Function onError, {bool Function(dynamic error)? test});
Stream<T> timeout(Duration timeLimit,
{void Function(EventSink<T> sink)? onTimeout});
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer);
最后三个函数更特殊。它们涉及错误处理,而 await for 循环无法做到这一点——到达循环的第一个错误将结束循环及其对流的订阅。无法从中恢复。以下代码展示了如何在使用 await for 循环之前使用 handleError()
从流中移除错误。
Stream<S> mapLogErrors<S, T>(
Stream<T> stream,
S Function(T event) convert,
) async* {
var streamWithoutErrors = stream.handleError((e) => log(e));
await for (final event in streamWithoutErrors) {
yield convert(event);
}
}
transform() 函数
#transform()
函数不仅仅用于错误处理;它是流的更通用的“map”。普通的 map 对于每个传入事件都需要一个值。但是,特别是对于 I/O 流,可能需要多个传入事件才能产生一个输出事件。 StreamTransformer 可以处理这种情况。例如,解码器如 Utf8Decoder 就是转换器。转换器只需要一个函数, bind() ,它可以很容易地由 async
函数实现。
读取和解码文件
#以下代码读取文件并在流上运行两个转换。它首先将数据从 UTF8 转换为,然后通过 LineSplitter 运行它。所有行都将被打印,除了任何以井号 #
开头的行。
import 'dart:convert';
import 'dart:io';
void main(List<String> args) async {
var file = File(args[0]);
var lines = utf8.decoder
.bind(file.openRead())
.transform(const LineSplitter());
await for (final line in lines) {
if (!line.startsWith('#')) print(line);
}
}
listen() 方法
#Stream 上的最后一个方法是 listen()
。这是一个“低级”方法——所有其他流函数都是根据 listen()
定义的。
StreamSubscription<T> listen(void Function(T event)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError});
要创建一个新的 Stream
类型,你只需扩展 Stream
类并实现 listen()
方法—— Stream
上的所有其他方法都会调用 listen()
以便工作。
listen()
方法允许你开始监听流。在你这样做之前,流是一个惰性对象,描述你想要看到的事件。当你监听时,会返回一个 StreamSubscription 对象,它表示正在产生事件的活动流。这类似于 Iterable
只是一组对象,但迭代器才是执行实际迭代的对象。
流订阅允许你暂停订阅,暂停后恢复订阅,并完全取消订阅。你可以设置回调函数,以便为每个数据事件或错误事件以及流关闭时调用。
其他资源
#阅读以下文档以了解有关在 Dart 中使用流和异步编程的更多详细信息。
- 在 Dart 中创建流 ,一篇关于创建你自己的流的文章
- Future 和错误处理 ,一篇解释如何使用 Future API 处理错误的文章
- 异步支持 , 语言教程 中的一个章节
- Stream API 参考
除非另有说明,否则本网站上的文档反映的是 Dart 3.6.0。页面最后更新于 2025-02-05。 查看源代码 或 报告问题.