目录

隔离区

此页面讨论了一些使用 Isolate API 实现隔离区的示例。

只要您的应用程序正在处理足以暂时阻塞其他计算的大型计算,就应该使用隔离区。最常见的示例是在 Flutter 应用程序中,当您需要执行可能导致 UI 响应迟钝的大型计算时。

没有关于您 必须 何时使用隔离区的规定,但以下是一些可以使用隔离区的其他情况:

  • 解析和解码异常大的 JSON 数据块。
  • 处理和压缩照片、音频和视频。
  • 转换音频和视频文件。
  • 对大型列表或文件系统执行复杂的搜索和过滤。
  • 执行 I/O 操作,例如与数据库通信。
  • 处理大量网络请求。

实现简单的 worker 隔离区

#

这些示例实现了一个主隔离区,该隔离区会生成一个简单的 worker 隔离区。 Isolate.run() 简化了设置和管理 worker 隔离区的步骤:

  1. 生成(启动和创建)一个隔离区。
  2. 在生成的隔离区上运行一个函数。
  3. 获取结果。
  4. 将结果返回给主隔离区。
  5. 工作完成后终止隔离区。
  6. 检查、捕获并将异常和错误抛回主隔离区。

在新的隔离区中运行现有方法

#
  1. 调用 run() 来生成一个新的隔离区(一个 后台工作线程 ),直接在 主隔离区 中,同时 main() 等待结果:
dart
const String filename = 'with_keys.json';

void main() async {
  // 读取一些数据。
  final jsonData = await Isolate.run(_readAndParseJson);

  // 使用该数据。
  print('JSON 密钥数量:${jsonData.length}');
}
  1. 将您希望 worker 隔离区执行的函数作为其第一个参数传递给 worker 隔离区。在此示例中,它是现有函数 _readAndParseJson()
dart
Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 获取 _readAndParseJson() 返回的结果,并将该值发送回主隔离区,关闭 worker 隔离区。

  2. worker 隔离区将保存结果的内存 转移 到主隔离区。它 不会复制 数据。worker 隔离区执行验证过程以确保允许转移对象。

_readAndParseJson() 是一个现有的异步函数,它同样可以轻松地在主隔离区中直接运行。使用 Isolate.run() 来运行它可以实现并发。worker 隔离区完全抽象了 _readAndParseJson() 的计算。它可以在不阻塞主隔离区的情况下完成。

Isolate.run() 的结果始终是 Future,因为主隔离区中的代码会继续运行。worker 隔离区执行的计算是同步的还是异步的都不会影响主隔离区,因为它无论如何都在并发运行。

有关完整程序,请查看 send_and_receive.dart 示例。

使用隔离区发送闭包

#

您还可以使用主隔离区中的函数字面量或闭包直接使用 run() 创建简单的 worker 隔离区。

dart
const String filename = 'with_keys.json';

void main() async {
  // 读取一些数据。
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // 使用该数据。
  print('JSON 密钥数量:${jsonData.length}');
}

此示例与之前的示例相同。一个新的隔离区生成,计算一些内容,然后发送回结果。

但是,现在隔离区发送了一个 闭包 。闭包在功能和编写方式上都比典型的命名函数更不局限。在此示例中, Isolate.run() 并发执行看起来像是局部代码。从这个意义上说,您可以想象 run() 就像一个用于“并行运行”的控制流运算符。

使用端口在隔离区之间发送多条消息

#

短暂的隔离区使用方便,但需要性能开销来生成新的隔离区并将对象从一个隔离区复制到另一个隔离区。如果您的代码依赖于使用 Isolate.run 重复运行相同的计算,则可以通过创建不会立即退出的长期隔离区来提高性能。

为此,您可以使用 Isolate.run 抽象化的一些低级隔离区 API:

本节介绍了在新建的隔离区与 主隔离区 之间建立双向通信所需的步骤。第一个示例 基本端口 在高级别介绍了该过程。第二个示例 健壮的端口 逐渐向第一个示例添加更多实用的现实世界功能。

ReceivePortSendPort

#

在隔离区之间设置长期通信需要两个类(除了 Isolate ): ReceivePortSendPort 。这些端口是隔离区彼此通信的唯一方式。

ReceivePort 是一个处理从其他隔离区发送的消息的对象。这些消息是通过 SendPort 发送的。

端口的行为类似于 Stream 对象(事实上,接收端口实现了 Stream !)您可以将 SendPortReceivePort 视为 StreamStreamController 和监听器。 SendPort 类似于 StreamController ,因为您可以使用 SendPort.send() 方法 向其“添加”消息,而这些消息由监听器处理,在本例中是 ReceivePort 。然后, ReceivePort 通过将其作为参数传递给您提供的回调来处理它接收到的消息。

设置端口

#

新生成的隔离区只有通过 Isolate.spawn 调用接收到的信息。如果您需要主隔离区在其初始创建后继续与生成的隔离区通信,则必须设置一个通信通道,以便生成的隔离区可以向主隔离区发送消息。隔离区只能通过消息传递进行通信。它们不能“查看”彼此的内存内部,这就是“隔离区”名称的由来。

要设置此双向通信,首先在主隔离区中创建一个 ReceivePort ,然后在使用 Isolate.spawn 生成新隔离区时,将其 SendPort 作为参数传递给新隔离区。然后,新隔离区创建它自己的 ReceivePort ,并通过主隔离区传递给它的 SendPort SendPort 发送回主隔离区。主隔离区接收此 SendPort ,现在双方都有一个开放的通道来发送和接收消息。

一个显示事件逐一馈送到事件循环的图形

  1. 在主隔离区中创建一个 ReceivePortSendPort 会自动作为 ReceivePort 上的属性创建。
  2. 使用 Isolate.spawn() 生成 worker 隔离区。
  3. 将对 ReceivePort.sendPort 的引用作为第一条消息传递给 worker 隔离区。
  4. 在 worker 隔离区中创建另一个新的 ReceivePort
  5. 将对 worker 隔离区的 ReceivePort.sendPort 的引用作为第一条消息 发送回 主隔离区。

除了创建端口和设置通信外,您还需要告诉端口在接收消息时该做什么。这是使用每个相应 ReceivePort 上的 listen 方法完成的。

一个显示事件逐一馈送到事件循环的图形

  1. 通过主隔离区对 worker 隔离区的 SendPort 的引用发送消息。
  2. 通过 worker 隔离区的 ReceivePort 上的监听器接收和处理消息。这就是您希望将计算移出主隔离区的地方。
  3. 通过 worker 隔离区对主隔离区的 SendPort 的引用发送返回消息。
  4. 通过主隔离区的 ReceivePort 上的监听器接收消息。

基本端口示例

#

此示例演示如何使用在它和主隔离区之间进行双向通信的长期 worker 隔离区。该代码使用将 JSON 文本发送到新隔离区的示例,在该示例中,JSON 将被解析和解码,然后发送回主隔离区。

步骤 1:定义 worker 类

#

首先,为您的后台 worker 隔离区创建一个类。此类包含您需要的所有功能:

  • 生成隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。 该类公开了两种公共方法:一种用于生成 worker 隔离区,另一种用于处理向该 worker 隔离区发送消息。

本示例中的其余部分将逐步向您展示如何填充类方法。

dart
class Worker {
  Future<void> spawn() async {
    // TODO: 添加生成 worker 隔离区的功能。
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: 处理从 worker 隔离区发送回的消息。
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: 定义应在 worker 隔离区上执行的代码。
  }

  Future<void> parseJson(String message) async {
    // TODO: 定义可用于向 worker 隔离区发送消息的公共方法。
  }
}

步骤 2:生成 worker 隔离区

#

Worker.spawn 方法中,您将把创建 worker 隔离区并确保它可以接收和发送消息的代码组合在一起。

  • 首先,创建一个 ReceivePort 。这允许主隔离区接收从新生成的 worker 隔离区发送的消息。
  • 接下来,向接收端口添加一个监听器以处理 worker 隔离区将发送回的消息。传递给监听器的回调 _handleResponsesFromIsolate 将在 步骤 4 中介绍。
  • 最后,使用 Isolate.spawn 生成 worker 隔离区。它需要两个参数:要在 worker 隔离区上执行的函数(在 步骤 3 中介绍),以及接收端口的 sendPort 属性。
dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

当在 worker 隔离区上调用时, receivePort.sendPort 参数将作为参数传递给回调( _startRemoteIsolate )。这是确保 worker 隔离区有方法将消息发送回主隔离区的第一个步骤。

步骤 3:在 worker 隔离区上执行代码

#

在此步骤中,您定义了发送到 worker 隔离区的方法 _startRemoteIsolate ,以便在它生成时执行。此方法类似于 worker 隔离区的“main”方法。

  • 首先,创建一个新的 ReceivePort 。此端口接收来自主隔离区的未来消息。
  • 接下来,将该端口的 SendPort 发送回主隔离区。
  • 最后,向新的 ReceivePort 添加一个监听器。此监听器处理主隔离区发送到 worker 隔离区的消息。
dart
static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

worker 的 ReceivePort 上的监听器解码从主隔离区传递的 JSON,然后将解码后的 JSON 发送回主隔离区。

此监听器是从主隔离区到 worker 隔离区发送消息的入口点。 这是您唯一有机会告诉 worker 隔离区将来执行哪些代码的机会。

步骤 4:处理主隔离区上的消息

#

最后,您需要告诉主隔离区如何处理从 worker 隔离区发送回主隔离区的消息。为此,您需要填写 _handleResponsesFromIsolate 方法。回想一下,此方法传递给 receivePort.listen 方法,如 步骤 2 中所述:

dart
Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

还记得您在 步骤 3 中将 SendPort 发送回主隔离区吗?此方法处理接收该 SendPort 以及处理未来消息(这将是解码后的 JSON)。

  • 首先,检查消息是否为 SendPort 。如果是,则将该端口分配给类的 _sendPort 属性,以便以后可以使用它来发送消息。
  • 接下来,检查消息的类型是否为 Map<String, dynamic> ,这是解码后的 JSON 的预期类型。如果是,则使用您的应用程序特定逻辑来处理该消息。在此示例中,将打印该消息。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

步骤 5:添加 completer 以确保您的隔离区已设置

#

要完成该类,请定义一个名为 parseJson 的公共方法,该方法负责向 worker 隔离区发送消息。它还需要确保在隔离区完全设置之前可以发送消息。要处理此问题,请使用 Completer

  • 首先,添加一个名为 Completer 的类级属性,并将其命名为 _isolateReady
  • 接下来,如果消息是 SendPort ,则在 _handleResponsesFromIsolate 方法(在 步骤 4 中创建)中添加对 completer 上的 complete() 的调用。
  • 最后,在 parseJson 方法中,在添加 _sendPort.send 之前添加 await _isolateReady.future 。这确保在 worker 隔离区生成 将其 SendPort 发送回主隔离区之前,不会向 worker 隔离区发送任何消息。
dart
Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

#
展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

健壮的端口示例

#

前面的示例 解释了设置具有双向通信的长期隔离区所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用端口时关闭端口的能力以及某些情况下消息排序不一致的问题。

此示例通过创建一个具有这些附加功能和更多功能的长期 worker 隔离区并遵循更好的设计模式来扩展第一个示例中的信息。尽管此代码与第一个示例有相似之处,但它不是该示例的扩展。

步骤 1:定义 worker 类

#

首先,为您的后台 worker 隔离区创建一个类。此类包含您需要的所有功能:

  • 生成隔离区。
  • 向该隔离区发送消息。
  • 让隔离区解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离区。

该类公开了三种公共方法:一种用于创建 worker 隔离区,一种用于处理向该 worker 隔离区发送消息,以及一种用于在不再使用端口时关闭端口。

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: 确保端口仍然打开。
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: 添加创建具有与生成的隔离区连接的新 Worker 对象的功能。
    throw UnimplementedError();
  }

  Worker._(this._responses, this._commands) {
    // TODO: 初始化主隔离区接收端口监听器。
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: 处理从 worker 隔离区发送回的消息。
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: 处理从 worker 隔离区发送回的消息。
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: 初始化 worker 隔离区的端口。
  }
}

步骤 2:在 Worker.spawn 方法中创建一个 RawReceivePort

#

在生成隔离区之前,您需要创建一个 RawReceivePort ,它是一个较低级别的 ReceivePort 。使用 RawReceivePort 是首选模式,因为它允许您将隔离区启动逻辑与处理隔离区上消息传递的逻辑分开。

Worker.spawn 方法中:

  • 首先,创建 RawReceivePort 。此 ReceivePort 仅负责接收来自 worker 隔离区的初始消息,该消息将是 SendPort
  • 接下来,创建一个 Completer ,它将指示隔离区何时准备好接收消息。完成后,它将返回一个记录,其中包含 ReceivePortSendPort
  • 接下来,定义 RawReceivePort.handler 属性。此属性是一个 Function? ,其行为类似于 ReceivePort.listener 。当此端口收到消息时,将调用该函数。
  • 在处理程序函数中,调用 connection.complete() 。此方法期望一个包含 ReceivePortSendPort 作为参数的 记录SendPort 是从 worker 隔离区发送的初始消息,它将在下一步分配给名为 _commands 的类级别 SendPort
  • 然后,使用 ReceivePort.fromRawReceivePort 构造函数创建一个新的 ReceivePort ,并将 initPort 传递进去。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // 创建一个接收端口并添加其初始消息处理程序。
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

通过首先创建一个 RawReceivePort ,然后创建一个 ReceivePort ,您将能够稍后向 ReceivePort.listen 添加新的回调。相反,如果您直接创建一个 ReceivePort ,则只能添加一个 listener ,因为 ReceivePort 实现的是 Stream ,而不是 BroadcastStream

实际上,这允许您将隔离区启动逻辑与在完成通信设置后处理接收消息的逻辑分开。随着其他方法中逻辑的增长,此优势将变得更加明显。

步骤 3:使用 Isolate.spawn 生成 worker 隔离区

#

此步骤继续填充 Worker.spawn 方法。您将添加生成隔离区所需的代码,并从此类返回 Worker 的实例。在此示例中,对 Isolate.spawn 的调用包装在一个 try / catch 中,这确保如果隔离区启动失败,则会关闭 initPort ,并且不会创建 Worker 对象。

  • 首先,尝试在一个 try / catch 块中生成一个 worker 隔离区。如果生成 worker 隔离区失败,请关闭上一步中创建的接收端口。传递给 Isolate.spawn 的方法将在后面的步骤中介绍。
  • 接下来,等待 connection.future ,并从其返回的记录中解构发送端口和接收端口。
  • 最后,通过调用其私有构造函数并传入该 completer 中的端口来返回 Worker 的实例。
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // 创建一个接收端口并添加其初始消息处理程序
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // 生成隔离区。
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

请注意,在此示例中(与 前面的示例 相比), Worker.spawn 充当此类的异步静态构造函数,并且是创建 Worker 实例的唯一方法。这简化了 API,使创建 Worker 实例的代码更加简洁。

步骤 4:完成隔离区设置过程

#

在此步骤中,您将完成基本的隔离区设置过程。这几乎完全与 前面的示例 相对应,没有新的概念。有一点细微的变化,代码被分解成更多的方法,这是一种设计实践,可以帮助您在此示例的其余部分添加更多功能。有关设置隔离区的详细基本过程,请参见 基本端口示例

首先,创建从 Worker.spawn 方法返回的私有构造函数。在构造函数体中,向主隔离区使用的接收端口添加一个监听器,并将一个尚未定义的方法传递给该监听器,该方法称为 _handleResponsesFromIsolate

dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

接下来,添加 _startRemoteIsolate 中负责初始化 worker 隔离区上的端口的代码。 回想一下 此方法在 Worker.spawn 方法中传递给 Isolate.spawn ,并且它将主隔离区的 SendPort 作为参数传递。

  • 创建一个新的 ReceivePort
  • 将该端口的 SendPort 发送回主隔离区。
  • 调用一个名为 _handleCommandsToIsolate 的新方法,并将新的 ReceivePort 和来自主隔离区的 SendPort 作为参数传递。
dart
static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下来,添加 _handleCommandsToIsolate 方法,该方法负责接收来自主隔离区的消息,在 worker 隔离区解码 json,并将解码后的 json 发送回作为响应。

  • 首先,在 worker 隔离区的 ReceivePort 上声明一个监听器。
  • 在添加到监听器的回调中,尝试在一个 try / catch 中解码从主隔离区传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主隔离区。
  • 如果发生错误,则发送回 RemoteError
dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

  • 首先,检查消息是否为 RemoteError ,如果是,则应 throw 该错误。
  • 否则,打印消息。在后面的步骤中,您将更新此代码以返回消息而不是打印它们。
dart
void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最后,添加 parseJson 方法,这是一个公共方法,允许外部代码将 JSON 发送到 worker 隔离区进行解码。

dart
Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

您将在下一步更新此方法。

步骤 5:同时处理多条消息

#

目前,如果您快速向 worker 隔离区发送消息,隔离区将 按完成顺序 发送解码后的 json 响应,而不是按发送顺序发送。您无法确定哪个响应对应于哪个消息。

在此步骤中,您将通过为每条消息指定一个 ID 并使用 Completer 对象来解决此问题,以确保当外部代码调用 parseJson 时,返回给该调用者的响应是正确的响应。

首先,向 Worker 添加两个类级别属性:

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
dart
class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

_activeRequests 映射将发送到 worker 隔离区的消息与 Completer 关联起来。 _activeRequests 中使用的键取自 _idCounter ,随着发送更多消息,它将增加。

接下来,更新 parseJson 方法,以便在它向 worker 隔离区发送消息之前创建 completer。

  • 首先创建一个 Completer
  • 接下来,递增 _idCounter ,以便每个 Completer 都与一个唯一的数字关联。
  • _activeRequests 映射中添加一个条目,其中键是 _idCounter 的当前数字,值是 completer。
  • 将消息与 ID 一起发送到 worker 隔离区。因为您只能通过 SendPort 发送一个值,所以将 ID 和消息包装在一个 记录 中。
  • 最后,返回 completer 的 future,该 future 最终将包含来自 worker 隔离区的响应。
dart
Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

您还需要更新 _handleResponsesFromIsolate_handleCommandsToIsolate 来处理此系统。

_handleCommandsToIsolate 中,您需要考虑 message 是一个包含两个值的记录,而不仅仅是 json 文本。通过从 message 解构值来做到这一点。

然后,在解码 json 后,更新对 sendPort.send 的调用以将 ID 和解码后的 json 发送回主隔离区,再次使用记录。

dart
static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,更新 _handleResponsesFromIsolate

  • 首先,再次从 message 参数解构 ID 和响应。
  • 然后,从 _activeRequests 映射中删除与该请求对应的 completer。
  • 最后,不是抛出错误或打印解码后的 json,而是完成 completer,传入响应。完成后,响应将返回到在主隔离区上调用 parseJson 的代码。
dart
void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

步骤 6:添加关闭端口的功能

#

当您的代码不再使用隔离区时,您应该关闭主隔离区和 worker 隔离区上的端口。

  • 首先,添加一个类级别布尔值来跟踪端口是否已关闭。
  • 然后,添加 Worker.close 方法。在此方法中:
    • _closed 更新为 true。
    • 向 worker 隔离区发送最终消息。 此消息是一个读取“shutdown”的 String ,但它可以是您喜欢的任何对象。 您将在下一段代码中使用它。
  • 最后,检查 _activeRequests 是否为空。如果是,则关闭名为 _responses 的主隔离区的 ReceivePort
dart
class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
  • 接下来,您需要在 worker 隔离区处理“shutdown”消息。将以下代码添加到 _handleCommandsToIsolate 方法中。此代码将检查消息是否为读取“shutdown”的 String 。如果是,它将关闭 worker 隔离区的 ReceivePort 并返回。
dart
static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}
  • 最后,您应该添加代码来检查端口是否已关闭,然后再尝试发送消息。在 Worker.parseJson 方法中添加一行。
dart
Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例

#
在此展开以查看完整示例
dart
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // 创建一个接收端口并添加其初始消息处理程序
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // 生成隔离区。
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}