目录

在 Dart 中创建流

由 Lasse Nielsen 撰写
2013 年 4 月(2021 年 5 月更新)

dart:async 库包含两种对许多 Dart API 至关重要的类型: StreamFuture 。Future 表示单个计算的结果,而 Stream 是结果的 序列 。您可以监听 Stream 以获取结果(包括数据和错误)以及 Stream 关闭的通知。您也可以在监听时暂停或在 Stream 完成之前停止监听。

但这篇文章不是关于 使用 Stream 的。它是关于创建您自己的 Stream 的。您可以通过几种方式创建 Stream:

  • 转换现有的 Stream。
  • 使用 async* 函数从头开始创建 Stream。
  • 使用 StreamController 创建 Stream。

本文展示了每种方法的代码,并提供了一些技巧来帮助您正确实现 Stream。

有关使用 Stream 的帮助,请参阅 异步编程:Stream

转换现有 Stream

#

创建 Stream 的常见情况是您已经有一个 Stream,并且您想根据原始 Stream 的事件创建一个新的 Stream。例如,您可能有一个字节流,您想通过对输入进行 UTF-8 解码将其转换为字符串流。最通用的方法是创建一个新的 Stream,该 Stream 等待原始 Stream 上的事件,然后输出新事件。示例:

dart
/// 将连续字符串的流拆分为行。
///
/// 输入字符串通过 `source` 流以较小的块提供。
Stream<String> lines(Stream<String> source) async* {
  // 存储来自上一块的任何部分行。
  var partial = '';
  // 等待新的块可用,然后处理它。
  await for (final chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // 附加部分行。
    partial = lines.removeLast(); // 删除新的部分行。
    for (final line in lines) {
      yield line; // 将行添加到输出流。
    }
  }
  // 如果有,将最终的部分行添加到输出流。
  if (partial.isNotEmpty) yield partial;
}

对于许多常见的转换,您可以使用 Stream 提供的转换方法,例如 map()where()expand()take()

例如,假设您有一个 Stream, counterStream ,它每秒发出一个递增的计数器。以下是它的实现方式:

dart
var counterStream =
    Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15);

要快速查看事件,您可以使用如下代码:

dart
counterStream.forEach(print); // 每秒打印一个整数,共 15 次。

要转换 Stream 事件,您可以在监听 Stream 之前在其上调用转换方法,例如 map() 。该方法返回一个新的 Stream。

dart
// 将每个事件中的整数加倍。
var doubleCounterStream = counterStream.map((int x) => x * 2);
doubleCounterStream.forEach(print);

您可以使用任何其他转换方法代替 map() ,例如以下方法:

dart
.where((int x) => x.isEven) // 只保留偶数整数事件。
.expand((var x) => [x, x]) // 复制每个事件。
.take(5) // 在前五个事件后停止。

通常,转换方法就足够了。但是,如果您需要对转换进行更多控制,则可以使用 Streamtransform() 方法指定一个 StreamTransformer 。平台库为许多常见任务提供了 Stream 变换器。例如,以下代码使用 dart:convert 库提供的 utf8.decoderLineSplitter 变换器。

dart
Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines = await content
    .transform(utf8.decoder)
    .transform(const LineSplitter())
    .toList();

从头开始创建 Stream

#

创建新 Stream 的一种方法是使用异步生成器 (async*) 函数。当调用该函数时创建 Stream,并且当监听 Stream 时函数体开始运行。当函数返回时,Stream 关闭。在函数返回之前,它可以使用 yieldyield* 语句在 Stream 上发出事件。

这是一个发出数字的简单示例,该示例以固定的时间间隔发出数字:

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

此函数返回一个 Stream 。当监听该 Stream 时,函数体开始运行。它重复延迟请求的时间间隔,然后产生下一个数字。如果省略 maxCount 参数,则循环没有停止条件,因此 Stream 将永远输出越来越大的数字——或者直到监听器取消其订阅。

当监听器取消(通过调用 listen() 方法返回的 StreamSubscription 对象上的 cancel() 方法)时,则下次函数体到达 yield 语句时, yield 将充当 return 语句。执行任何封闭的 finally 块,函数退出。如果函数试图在退出前产生一个值,则该操作失败并充当返回。

当函数最终退出时, cancel() 方法返回的 Future 完成。如果函数以错误退出,则 Future 以该错误完成;否则,它以 null 完成。

另一个更有用的例子是一个将一系列 Future 转换为 Stream 的函数:

dart
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
  for (final future in futures) {
    var result = await future;
    yield result;
  }
}

此函数向 futures 可迭代对象请求一个新的 Future,等待该 Future,发出结果值,然后循环。如果 Future 以错误完成,则 Stream 以该错误完成。

很少有 async* 函数从无到有地构建 Stream。它需要从某个地方获取数据,而大多数情况下,这个地方是另一个 Stream。在某些情况下,例如上面的 Future 序列,数据来自其他异步事件源。但是,在许多情况下, async* 函数过于简单,难以轻松处理多个数据源。这就是 StreamController 类发挥作用的地方。

使用 StreamController

#

如果 Stream 的事件来自程序的不同部分,而不仅仅是从可以由 async 函数遍历的 Stream 或 Future 中,则使用 StreamController 来创建和填充 Stream。

StreamController 为您提供一个新的 Stream 和一种方法,可以在任何点和任何地方向 Stream 添加事件。Stream 具有处理监听器和暂停所需的所有逻辑。您返回 Stream 并将控制器保留给自己。

以下示例(来自 stream_controller_bad.dart )展示了 StreamController 的基本用法(虽然有缺陷),用于实现前面示例中的 timedCounter() 函数。此代码创建一个要返回的 Stream,然后根据计时器事件(既不是 Future 也不是 Stream 事件)向其中提供数据。

baddart
// 注意:此实现有缺陷!
// 它在拥有订阅者之前就开始,并且没有实现暂停。
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // 请求 Stream 关闭并告知监听器。
    }
  }

  Timer.periodic(interval, tick); // 有缺陷:在拥有订阅者之前就开始。
  return controller.stream;
}

和以前一样,您可以像这样使用 timedCounter() 返回的 Stream:

dart
var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // 每秒打印一个整数,共 15 次。

timedCounter() 实现有两个问题:

  • 它在拥有订阅者之前就开始生成事件。
  • 即使订阅者请求暂停,它也会继续生成事件。

如下节所示,您可以通过在创建 StreamController 时指定回调(例如 onListenonPause )来解决这两个问题。

等待订阅

#

通常,Stream 应该在开始工作之前等待订阅者。 async* 函数会自动执行此操作,但是当使用 StreamController 时,您可以完全控制,即使不应该也可以添加事件。当 Stream 没有订阅者时,它的 StreamController 会缓冲事件,如果 Stream 从未获得订阅者,这可能会导致内存泄漏。

尝试将使用 Stream 的代码更改为以下代码:

dart
void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // 5 秒后,添加监听器。
  await for (final n in counterStream) {
    print(n); // 每秒打印一个整数,共 15 次。
  }
}

当此代码运行时,前 5 秒没有任何内容打印,尽管 Stream 正在工作。然后添加监听器,并且一次性打印前 5 个左右的事件,因为它们已由 StreamController 缓冲。

要收到订阅通知,请在创建 StreamController 时指定 onListen 参数。当 Stream 获得其第一个订阅者时,将调用 onListen 回调。如果您指定 onCancel 回调,则在控制器失去其最后一个订阅者时调用它。在前面的示例中, Timer.periodic() 应该移动到 onListen 处理程序,如下节所示。

遵守暂停状态

#

当监听器请求暂停时,避免生成事件。 async* 函数在 Stream 订阅暂停时会在 yield 语句处自动暂停。另一方面, StreamController 在暂停期间会缓冲事件。如果提供事件的代码不遵守暂停,则缓冲区的尺寸可能会无限增长。此外,如果监听器在暂停后不久停止监听,

那么创建缓冲区的工作就浪费了。

要查看没有暂停支持会发生什么,请尝试将使用 Stream 的代码更改为以下代码:

dart
void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  late StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // 每秒打印一个整数。
    if (counter == 5) {
      // 5 次滴答后,暂停五秒钟,然后恢复。
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

当五秒钟的暂停结束后,这段时间内触发的事件会一次性全部接收。发生这种情况是因为 Stream 的源代码不遵守暂停并继续向 Stream 添加事件。因此 Stream 缓冲事件,然后在 Stream 恢复暂停时清空其缓冲区。

以下版本的 timedCounter() (来自 stream_controller.dart )使用 StreamController 上的 onListenonPauseonResumeonCancel 回调来实现暂停。

dart
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
  late StreamController<int> controller;
  Timer? timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // 请求 Stream 将计数器值作为事件发送。
    if (counter == maxCount) {
      timer?.cancel();
      controller.close(); // 请求 Stream 关闭并告知监听器。
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    timer?.cancel();
    timer = null;
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

使用上面的 listenWithPause() 函数运行此代码。您会看到它在暂停时停止计数,并在之后很好地恢复。

您必须使用所有监听器—— onListenonCancelonPauseonResume ——才能收到暂停状态更改的通知。原因是如果订阅和暂停状态同时更改,则只调用 onListenonCancel 回调。

最终提示

#

在不使用 async* 函数创建 Stream 时,请记住以下提示:

  • 使用同步控制器时要小心——例如,使用 StreamController(sync: true) 创建的控制器。当您在未暂停的同步控制器上发送事件时(例如,使用 EventSink 定义的 add()addError()close() 方法),该事件会立即发送到 Stream 上的所有监听器。 Stream 监听器必须在添加监听器的代码完全返回之前才能被调用,并且在错误的时间使用同步控制器可能会破坏此承诺并导致良好的代码失败。避免使用同步控制器。

  • 如果您使用 StreamController ,则在 listen 调用返回 StreamSubscription 之前调用 onListen 回调。不要让 onListen 回调依赖于订阅已经存在。例如,在以下代码中, onListen 事件在 subscription 变量具有有效值之前触发(并调用 handler )。

    dart
    subscription = stream.listen(handler);
  • StreamController 定义的 onListenonPauseonResumeonCancel 回调在 Stream 的监听器状态更改时由 Stream 调用,但在触发事件期间或调用其他状态更改处理程序期间绝不会调用。在这些情况下,状态更改回调会延迟到上一个回调完成之后。

  • 不要尝试自己实现 Stream 接口。事件、回调以及添加和删除监听器之间的交互很容易变得微妙地错误。始终使用现有的 Stream(可能来自 StreamController )来实现新 Stream 的 listen 调用。

  • 虽然可以通过扩展 Stream 类并实现 listen 方法以及顶部的额外功能来创建扩展 Stream 并具有更多功能的类,但这通常不推荐,因为它引入了一种用户必须考虑的新类型。您可以经常创建一个 具有 Stream (以及更多)的类,而不是一个 Stream (以及更多)的类。