在 Dart 中创建流
由 Lasse Nielsen 撰写
2013 年 4 月(2021 年 5 月更新)
dart:async
库包含两种对许多 Dart API 至关重要的类型: Stream 和 Future 。Future 表示单个计算的结果,而 Stream 是结果的 序列 。您可以监听 Stream 以获取结果(包括数据和错误)以及 Stream 关闭的通知。您也可以在监听时暂停或在 Stream 完成之前停止监听。
但这篇文章不是关于 使用 Stream 的。它是关于创建您自己的 Stream 的。您可以通过几种方式创建 Stream:
- 转换现有的 Stream。
- 使用
async*
函数从头开始创建 Stream。 - 使用
StreamController
创建 Stream。
本文展示了每种方法的代码,并提供了一些技巧来帮助您正确实现 Stream。
有关使用 Stream 的帮助,请参阅 异步编程:Stream 。
转换现有 Stream
#创建 Stream 的常见情况是您已经有一个 Stream,并且您想根据原始 Stream 的事件创建一个新的 Stream。例如,您可能有一个字节流,您想通过对输入进行 UTF-8 解码将其转换为字符串流。最通用的方法是创建一个新的 Stream,该 Stream 等待原始 Stream 上的事件,然后输出新事件。示例:
/// 将连续字符串的流拆分为行。
///
/// 输入字符串通过 `source` 流以较小的块提供。
Stream<String> lines(Stream<String> source) async* {
// 存储来自上一块的任何部分行。
var partial = '';
// 等待新的块可用,然后处理它。
await for (final chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0]; // 附加部分行。
partial = lines.removeLast(); // 删除新的部分行。
for (final line in lines) {
yield line; // 将行添加到输出流。
}
}
// 如果有,将最终的部分行添加到输出流。
if (partial.isNotEmpty) yield partial;
}
对于许多常见的转换,您可以使用 Stream
提供的转换方法,例如 map()
、 where()
、 expand()
和 take()
。
例如,假设您有一个 Stream, counterStream
,它每秒发出一个递增的计数器。以下是它的实现方式:
var counterStream =
Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);
要快速查看事件,您可以使用如下代码:
counterStream.forEach(print); // 每秒打印一个整数,共 15 次。
要转换 Stream 事件,您可以在监听 Stream 之前在其上调用转换方法,例如 map()
。该方法返回一个新的 Stream。
// 将每个事件中的整数加倍。
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);
您可以使用任何其他转换方法代替 map()
,例如以下方法:
.where((int x) => x.isEven) // 只保留偶数整数事件。
.expand((var x) => [x, x]) // 复制每个事件。
.take(5) // 在前五个事件后停止。
通常,转换方法就足够了。但是,如果您需要对转换进行更多控制,则可以使用 Stream
的 transform()
方法指定一个 StreamTransformer 。平台库为许多常见任务提供了 Stream 变换器。例如,以下代码使用 dart:convert
库提供的 utf8.decoder
和 LineSplitter
变换器。
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
.transform(utf8.decoder)
.transform(const LineSplitter())
.toList();
从头开始创建 Stream
#创建新 Stream 的一种方法是使用异步生成器 (async*
) 函数。当调用该函数时创建 Stream,并且当监听 Stream 时函数体开始运行。当函数返回时,Stream 关闭。在函数返回之前,它可以使用 yield
或 yield*
语句在 Stream 上发出事件。
这是一个发出数字的简单示例,该示例以固定的时间间隔发出数字:
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
此函数返回一个 Stream
。当监听该 Stream 时,函数体开始运行。它重复延迟请求的时间间隔,然后产生下一个数字。如果省略 maxCount
参数,则循环没有停止条件,因此 Stream 将永远输出越来越大的数字——或者直到监听器取消其订阅。
当监听器取消(通过调用 listen()
方法返回的 StreamSubscription
对象上的 cancel()
方法)时,则下次函数体到达 yield
语句时, yield
将充当 return
语句。执行任何封闭的 finally
块,函数退出。如果函数试图在退出前产生一个值,则该操作失败并充当返回。
当函数最终退出时, cancel()
方法返回的 Future 完成。如果函数以错误退出,则 Future 以该错误完成;否则,它以 null
完成。
另一个更有用的例子是一个将一系列 Future 转换为 Stream 的函数:
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for (final future in futures) {
var result = await future;
yield result;
}
}
此函数向 futures
可迭代对象请求一个新的 Future,等待该 Future,发出结果值,然后循环。如果 Future 以错误完成,则 Stream 以该错误完成。
很少有 async*
函数从无到有地构建 Stream。它需要从某个地方获取数据,而大多数情况下,这个地方是另一个 Stream。在某些情况下,例如上面的 Future 序列,数据来自其他异步事件源。但是,在许多情况下, async*
函数过于简单,难以轻松处理多个数据源。这就是 StreamController
类发挥作用的地方。
使用 StreamController
#如果 Stream 的事件来自程序的不同部分,而不仅仅是从可以由 async
函数遍历的 Stream 或 Future 中,则使用 StreamController 来创建和填充 Stream。
StreamController
为您提供一个新的 Stream 和一种方法,可以在任何点和任何地方向 Stream 添加事件。Stream 具有处理监听器和暂停所需的所有逻辑。您返回 Stream 并将控制器保留给自己。
以下示例(来自 stream_controller_bad.dart )展示了 StreamController
的基本用法(虽然有缺陷),用于实现前面示例中的 timedCounter()
函数。此代码创建一个要返回的 Stream,然后根据计时器事件(既不是 Future 也不是 Stream 事件)向其中提供数据。
// 注意:此实现有缺陷!
// 它在拥有订阅者之前就开始,并且没有实现暂停。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close(); // 请求 Stream 关闭并告知监听器。
}
}
Timer.periodic(interval, tick); // 有缺陷:在拥有订阅者之前就开始。
return controller.stream;
}
和以前一样,您可以像这样使用 timedCounter()
返回的 Stream:
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // 每秒打印一个整数,共 15 次。
此 timedCounter()
实现有两个问题:
- 它在拥有订阅者之前就开始生成事件。
- 即使订阅者请求暂停,它也会继续生成事件。
如下节所示,您可以通过在创建 StreamController
时指定回调(例如 onListen
和 onPause
)来解决这两个问题。
等待订阅
#通常,Stream 应该在开始工作之前等待订阅者。 async*
函数会自动执行此操作,但是当使用 StreamController
时,您可以完全控制,即使不应该也可以添加事件。当 Stream 没有订阅者时,它的 StreamController
会缓冲事件,如果 Stream 从未获得订阅者,这可能会导致内存泄漏。
尝试将使用 Stream 的代码更改为以下代码:
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
// 5 秒后,添加监听器。
await for (final n in counterStream) {
print(n); // 每秒打印一个整数,共 15 次。
}
}
当此代码运行时,前 5 秒没有任何内容打印,尽管 Stream 正在工作。然后添加监听器,并且一次性打印前 5 个左右的事件,因为它们已由 StreamController
缓冲。
要收到订阅通知,请在创建 StreamController
时指定 onListen
参数。当 Stream 获得其第一个订阅者时,将调用 onListen
回调。如果您指定 onCancel
回调,则在控制器失去其最后一个订阅者时调用它。在前面的示例中, Timer.periodic()
应该移动到 onListen
处理程序,如下节所示。
遵守暂停状态
#当监听器请求暂停时,避免生成事件。 async*
函数在 Stream 订阅暂停时会在 yield
语句处自动暂停。另一方面, StreamController
在暂停期间会缓冲事件。如果提供事件的代码不遵守暂停,则缓冲区的尺寸可能会无限增长。此外,如果监听器在暂停后不久停止监听,
那么创建缓冲区的工作就浪费了。
要查看没有暂停支持会发生什么,请尝试将使用 Stream 的代码更改为以下代码:
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
late StreamSubscription<int> subscription;
subscription = counterStream.listen((int counter) {
print(counter); // 每秒打印一个整数。
if (counter == 5) {
// 5 次滴答后,暂停五秒钟,然后恢复。
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
当五秒钟的暂停结束后,这段时间内触发的事件会一次性全部接收。发生这种情况是因为 Stream 的源代码不遵守暂停并继续向 Stream 添加事件。因此 Stream 缓冲事件,然后在 Stream 恢复暂停时清空其缓冲区。
以下版本的 timedCounter()
(来自 stream_controller.dart )使用 StreamController
上的 onListen
、 onPause
、 onResume
和 onCancel
回调来实现暂停。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
if (counter == maxCount) {
timer?.cancel();
controller.close(); // 请求 Stream 关闭并告知监听器。
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
使用上面的 listenWithPause()
函数运行此代码。您会看到它在暂停时停止计数,并在之后很好地恢复。
您必须使用所有监听器—— onListen
、 onCancel
、 onPause
和 onResume
——才能收到暂停状态更改的通知。原因是如果订阅和暂停状态同时更改,则只调用 onListen
或 onCancel
回调。
最终提示
#在不使用 async*
函数创建 Stream 时,请记住以下提示:
使用同步控制器时要小心——例如,使用
StreamController(sync: true)
创建的控制器。当您在未暂停的同步控制器上发送事件时(例如,使用 EventSink 定义的add()
、addError()
或close()
方法),该事件会立即发送到 Stream 上的所有监听器。Stream
监听器必须在添加监听器的代码完全返回之前才能被调用,并且在错误的时间使用同步控制器可能会破坏此承诺并导致良好的代码失败。避免使用同步控制器。如果您使用
StreamController
,则在listen
调用返回StreamSubscription
之前调用onListen
回调。不要让onListen
回调依赖于订阅已经存在。例如,在以下代码中,onListen
事件在subscription
变量具有有效值之前触发(并调用handler
)。dartsubscription = stream.listen(handler);
StreamController
定义的onListen
、onPause
、onResume
和onCancel
回调在 Stream 的监听器状态更改时由 Stream 调用,但在触发事件期间或调用其他状态更改处理程序期间绝不会调用。在这些情况下,状态更改回调会延迟到上一个回调完成之后。不要尝试自己实现
Stream
接口。事件、回调以及添加和删除监听器之间的交互很容易变得微妙地错误。始终使用现有的 Stream(可能来自StreamController
)来实现新 Stream 的listen
调用。虽然可以通过扩展
Stream
类并实现listen
方法以及顶部的额外功能来创建扩展Stream
并具有更多功能的类,但这通常不推荐,因为它引入了一种用户必须考虑的新类型。您可以经常创建一个 具有Stream
(以及更多)的类,而不是一个 是Stream
(以及更多)的类。
除非另有说明,否则本网站上的文档反映的是 Dart 3.6.0。页面最后更新于 2021-05-16。 查看源代码 或 报告问题.