隔离区
此页面讨论了一些使用 Isolate
API 实现隔离区的示例。
只要您的应用程序正在处理足以暂时阻塞其他计算的大型计算,就应该使用隔离区。最常见的示例是在 Flutter 应用程序中,当您需要执行可能导致 UI 响应迟钝的大型计算时。
没有关于您 必须 何时使用隔离区的规定,但以下是一些可以使用隔离区的其他情况:
- 解析和解码异常大的 JSON 数据块。
- 处理和压缩照片、音频和视频。
- 转换音频和视频文件。
- 对大型列表或文件系统执行复杂的搜索和过滤。
- 执行 I/O 操作,例如与数据库通信。
- 处理大量网络请求。
实现简单的 worker 隔离区
#这些示例实现了一个主隔离区,该隔离区会生成一个简单的 worker 隔离区。 Isolate.run()
简化了设置和管理 worker 隔离区的步骤:
- 生成(启动和创建)一个隔离区。
- 在生成的隔离区上运行一个函数。
- 获取结果。
- 将结果返回给主隔离区。
- 工作完成后终止隔离区。
- 检查、捕获并将异常和错误抛回主隔离区。
在新的隔离区中运行现有方法
#const String filename = 'with_keys.json';
void main() async {
// 读取一些数据。
final jsonData = await Isolate.run(_readAndParseJson);
// 使用该数据。
print('JSON 密钥数量:${jsonData.length}');
}
- 将您希望 worker 隔离区执行的函数作为其第一个参数传递给 worker 隔离区。在此示例中,它是现有函数
_readAndParseJson()
:
Future<Map<String, dynamic>> _readAndParseJson() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
}
Isolate.run()
获取_readAndParseJson()
返回的结果,并将该值发送回主隔离区,关闭 worker 隔离区。worker 隔离区将保存结果的内存 转移 到主隔离区。它 不会复制 数据。worker 隔离区执行验证过程以确保允许转移对象。
_readAndParseJson()
是一个现有的异步函数,它同样可以轻松地在主隔离区中直接运行。使用 Isolate.run()
来运行它可以实现并发。worker 隔离区完全抽象了 _readAndParseJson()
的计算。它可以在不阻塞主隔离区的情况下完成。
Isolate.run()
的结果始终是 Future,因为主隔离区中的代码会继续运行。worker 隔离区执行的计算是同步的还是异步的都不会影响主隔离区,因为它无论如何都在并发运行。
有关完整程序,请查看 send_and_receive.dart 示例。
使用隔离区发送闭包
#您还可以使用主隔离区中的函数字面量或闭包直接使用 run()
创建简单的 worker 隔离区。
const String filename = 'with_keys.json';
void main() async {
// 读取一些数据。
final jsonData = await Isolate.run(() async {
final fileData = await File(filename).readAsString();
final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
return jsonData;
});
// 使用该数据。
print('JSON 密钥数量:${jsonData.length}');
}
此示例与之前的示例相同。一个新的隔离区生成,计算一些内容,然后发送回结果。
但是,现在隔离区发送了一个 闭包 。闭包在功能和编写方式上都比典型的命名函数更不局限。在此示例中, Isolate.run()
并发执行看起来像是局部代码。从这个意义上说,您可以想象 run()
就像一个用于“并行运行”的控制流运算符。
使用端口在隔离区之间发送多条消息
#短暂的隔离区使用方便,但需要性能开销来生成新的隔离区并将对象从一个隔离区复制到另一个隔离区。如果您的代码依赖于使用 Isolate.run
重复运行相同的计算,则可以通过创建不会立即退出的长期隔离区来提高性能。
为此,您可以使用 Isolate.run
抽象化的一些低级隔离区 API:
本节介绍了在新建的隔离区与 主隔离区 之间建立双向通信所需的步骤。第一个示例 基本端口 在高级别介绍了该过程。第二个示例 健壮的端口 逐渐向第一个示例添加更多实用的现实世界功能。
ReceivePort
和 SendPort
#在隔离区之间设置长期通信需要两个类(除了 Isolate
): ReceivePort
和 SendPort
。这些端口是隔离区彼此通信的唯一方式。
ReceivePort
是一个处理从其他隔离区发送的消息的对象。这些消息是通过 SendPort
发送的。
端口的行为类似于 Stream
对象(事实上,接收端口实现了 Stream
!)您可以将 SendPort
和 ReceivePort
视为 Stream
的 StreamController
和监听器。 SendPort
类似于 StreamController
,因为您可以使用 SendPort.send()
方法 向其“添加”消息,而这些消息由监听器处理,在本例中是 ReceivePort
。然后, ReceivePort
通过将其作为参数传递给您提供的回调来处理它接收到的消息。
设置端口
#新生成的隔离区只有通过 Isolate.spawn
调用接收到的信息。如果您需要主隔离区在其初始创建后继续与生成的隔离区通信,则必须设置一个通信通道,以便生成的隔离区可以向主隔离区发送消息。隔离区只能通过消息传递进行通信。它们不能“查看”彼此的内存内部,这就是“隔离区”名称的由来。
要设置此双向通信,首先在主隔离区中创建一个 ReceivePort
,然后在使用 Isolate.spawn
生成新隔离区时,将其 SendPort
作为参数传递给新隔离区。然后,新隔离区创建它自己的 ReceivePort
,并通过主隔离区传递给它的 SendPort
将 其 SendPort
发送回主隔离区。主隔离区接收此 SendPort
,现在双方都有一个开放的通道来发送和接收消息。
- 在主隔离区中创建一个
ReceivePort
。SendPort
会自动作为ReceivePort
上的属性创建。 - 使用
Isolate.spawn()
生成 worker 隔离区。 - 将对
ReceivePort.sendPort
的引用作为第一条消息传递给 worker 隔离区。 - 在 worker 隔离区中创建另一个新的
ReceivePort
。 - 将对 worker 隔离区的
ReceivePort.sendPort
的引用作为第一条消息 发送回 主隔离区。
除了创建端口和设置通信外,您还需要告诉端口在接收消息时该做什么。这是使用每个相应 ReceivePort
上的 listen
方法完成的。
- 通过主隔离区对 worker 隔离区的
SendPort
的引用发送消息。 - 通过 worker 隔离区的
ReceivePort
上的监听器接收和处理消息。这就是您希望将计算移出主隔离区的地方。 - 通过 worker 隔离区对主隔离区的
SendPort
的引用发送返回消息。 - 通过主隔离区的
ReceivePort
上的监听器接收消息。
基本端口示例
#此示例演示如何使用在它和主隔离区之间进行双向通信的长期 worker 隔离区。该代码使用将 JSON 文本发送到新隔离区的示例,在该示例中,JSON 将被解析和解码,然后发送回主隔离区。
步骤 1:定义 worker 类
#首先,为您的后台 worker 隔离区创建一个类。此类包含您需要的所有功能:
- 生成隔离区。
- 向该隔离区发送消息。
- 让隔离区解码一些 JSON。
- 将解码后的 JSON 发送回主隔离区。 该类公开了两种公共方法:一种用于生成 worker 隔离区,另一种用于处理向该 worker 隔离区发送消息。
本示例中的其余部分将逐步向您展示如何填充类方法。
class Worker {
Future<void> spawn() async {
// TODO: 添加生成 worker 隔离区的功能。
}
void _handleResponsesFromIsolate(dynamic message) {
// TODO: 处理从 worker 隔离区发送回的消息。
}
static void _startRemoteIsolate(SendPort port) {
// TODO: 定义应在 worker 隔离区上执行的代码。
}
Future<void> parseJson(String message) async {
// TODO: 定义可用于向 worker 隔离区发送消息的公共方法。
}
}
步骤 2:生成 worker 隔离区
#在 Worker.spawn
方法中,您将把创建 worker 隔离区并确保它可以接收和发送消息的代码组合在一起。
- 首先,创建一个
ReceivePort
。这允许主隔离区接收从新生成的 worker 隔离区发送的消息。 - 接下来,向接收端口添加一个监听器以处理 worker 隔离区将发送回的消息。传递给监听器的回调
_handleResponsesFromIsolate
将在 步骤 4 中介绍。 - 最后,使用
Isolate.spawn
生成 worker 隔离区。它需要两个参数:要在 worker 隔离区上执行的函数(在 步骤 3 中介绍),以及接收端口的sendPort
属性。
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
当在 worker 隔离区上调用时, receivePort.sendPort
参数将作为参数传递给回调( _startRemoteIsolate
)。这是确保 worker 隔离区有方法将消息发送回主隔离区的第一个步骤。
步骤 3:在 worker 隔离区上执行代码
#在此步骤中,您定义了发送到 worker 隔离区的方法 _startRemoteIsolate
,以便在它生成时执行。此方法类似于 worker 隔离区的“main”方法。
- 首先,创建一个新的
ReceivePort
。此端口接收来自主隔离区的未来消息。 - 接下来,将该端口的
SendPort
发送回主隔离区。 - 最后,向新的
ReceivePort
添加一个监听器。此监听器处理主隔离区发送到 worker 隔离区的消息。
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
worker 的 ReceivePort
上的监听器解码从主隔离区传递的 JSON,然后将解码后的 JSON 发送回主隔离区。
此监听器是从主隔离区到 worker 隔离区发送消息的入口点。 这是您唯一有机会告诉 worker 隔离区将来执行哪些代码的机会。
步骤 4:处理主隔离区上的消息
#最后,您需要告诉主隔离区如何处理从 worker 隔离区发送回主隔离区的消息。为此,您需要填写 _handleResponsesFromIsolate
方法。回想一下,此方法传递给 receivePort.listen
方法,如 步骤 2 中所述:
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
还记得您在 步骤 3 中将 SendPort
发送回主隔离区吗?此方法处理接收该 SendPort
以及处理未来消息(这将是解码后的 JSON)。
- 首先,检查消息是否为
SendPort
。如果是,则将该端口分配给类的_sendPort
属性,以便以后可以使用它来发送消息。 - 接下来,检查消息的类型是否为
Map<String, dynamic>
,这是解码后的 JSON 的预期类型。如果是,则使用您的应用程序特定逻辑来处理该消息。在此示例中,将打印该消息。
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
步骤 5:添加 completer 以确保您的隔离区已设置
#要完成该类,请定义一个名为 parseJson
的公共方法,该方法负责向 worker 隔离区发送消息。它还需要确保在隔离区完全设置之前可以发送消息。要处理此问题,请使用 Completer
。
- 首先,添加一个名为
Completer
的类级属性,并将其命名为_isolateReady
。 - 接下来,如果消息是
SendPort
,则在_handleResponsesFromIsolate
方法(在 步骤 4 中创建)中添加对 completer 上的complete()
的调用。 - 最后,在
parseJson
方法中,在添加_sendPort.send
之前添加await _isolateReady.future
。这确保在 worker 隔离区生成 并 将其SendPort
发送回主隔离区之前,不会向 worker 隔离区发送任何消息。
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
完整示例
#展开以查看完整示例
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = Worker();
await worker.spawn();
await worker.parseJson('{"key":"value"}');
}
class Worker {
late SendPort _sendPort;
final Completer<void> _isolateReady = Completer.sync();
Future<void> spawn() async {
final receivePort = ReceivePort();
receivePort.listen(_handleResponsesFromIsolate);
await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}
void _handleResponsesFromIsolate(dynamic message) {
if (message is SendPort) {
_sendPort = message;
_isolateReady.complete();
} else if (message is Map<String, dynamic>) {
print(message);
}
}
static void _startRemoteIsolate(SendPort port) {
final receivePort = ReceivePort();
port.send(receivePort.sendPort);
receivePort.listen((dynamic message) async {
if (message is String) {
final transformed = jsonDecode(message);
port.send(transformed);
}
});
}
Future<void> parseJson(String message) async {
await _isolateReady.future;
_sendPort.send(message);
}
}
健壮的端口示例
#前面的示例 解释了设置具有双向通信的长期隔离区所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、在不再使用端口时关闭端口的能力以及某些情况下消息排序不一致的问题。
此示例通过创建一个具有这些附加功能和更多功能的长期 worker 隔离区并遵循更好的设计模式来扩展第一个示例中的信息。尽管此代码与第一个示例有相似之处,但它不是该示例的扩展。
步骤 1:定义 worker 类
#首先,为您的后台 worker 隔离区创建一个类。此类包含您需要的所有功能:
- 生成隔离区。
- 向该隔离区发送消息。
- 让隔离区解码一些 JSON。
- 将解码后的 JSON 发送回主隔离区。
该类公开了三种公共方法:一种用于创建 worker 隔离区,一种用于处理向该 worker 隔离区发送消息,以及一种用于在不再使用端口时关闭端口。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
Future<Object?> parseJson(String message) async {
// TODO: 确保端口仍然打开。
_commands.send(message);
}
static Future<Worker> spawn() async {
// TODO: 添加创建具有与生成的隔离区连接的新 Worker 对象的功能。
throw UnimplementedError();
}
Worker._(this._responses, this._commands) {
// TODO: 初始化主隔离区接收端口监听器。
}
void _handleResponsesFromIsolate(dynamic message) {
// TODO: 处理从 worker 隔离区发送回的消息。
}
static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
// TODO: 处理从 worker 隔离区发送回的消息。
}
static void _startRemoteIsolate(SendPort sp) {
// TODO: 初始化 worker 隔离区的端口。
}
}
步骤 2:在 Worker.spawn
方法中创建一个 RawReceivePort
#在生成隔离区之前,您需要创建一个 RawReceivePort
,它是一个较低级别的 ReceivePort
。使用 RawReceivePort
是首选模式,因为它允许您将隔离区启动逻辑与处理隔离区上消息传递的逻辑分开。
在 Worker.spawn
方法中:
- 首先,创建
RawReceivePort
。此ReceivePort
仅负责接收来自 worker 隔离区的初始消息,该消息将是SendPort
。 - 接下来,创建一个
Completer
,它将指示隔离区何时准备好接收消息。完成后,它将返回一个记录,其中包含ReceivePort
和SendPort
。 - 接下来,定义
RawReceivePort.handler
属性。此属性是一个Function?
,其行为类似于ReceivePort.listener
。当此端口收到消息时,将调用该函数。 - 在处理程序函数中,调用
connection.complete()
。此方法期望一个包含ReceivePort
和SendPort
作为参数的 记录 。SendPort
是从 worker 隔离区发送的初始消息,它将在下一步分配给名为_commands
的类级别SendPort
。 - 然后,使用
ReceivePort.fromRawReceivePort
构造函数创建一个新的ReceivePort
,并将initPort
传递进去。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// 创建一个接收端口并添加其初始消息处理程序。
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// ···
}
通过首先创建一个 RawReceivePort
,然后创建一个 ReceivePort
,您将能够稍后向 ReceivePort.listen
添加新的回调。相反,如果您直接创建一个 ReceivePort
,则只能添加一个 listener
,因为 ReceivePort
实现的是 Stream
,而不是 BroadcastStream
。
实际上,这允许您将隔离区启动逻辑与在完成通信设置后处理接收消息的逻辑分开。随着其他方法中逻辑的增长,此优势将变得更加明显。
步骤 3:使用 Isolate.spawn
生成 worker 隔离区
#此步骤继续填充 Worker.spawn
方法。您将添加生成隔离区所需的代码,并从此类返回 Worker
的实例。在此示例中,对 Isolate.spawn
的调用包装在一个 try
/ catch
块 中,这确保如果隔离区启动失败,则会关闭 initPort
,并且不会创建 Worker
对象。
- 首先,尝试在一个
try
/catch
块中生成一个 worker 隔离区。如果生成 worker 隔离区失败,请关闭上一步中创建的接收端口。传递给Isolate.spawn
的方法将在后面的步骤中介绍。 - 接下来,等待
connection.future
,并从其返回的记录中解构发送端口和接收端口。 - 最后,通过调用其私有构造函数并传入该 completer 中的端口来返回
Worker
的实例。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
static Future<Worker> spawn() async {
// 创建一个接收端口并添加其初始消息处理程序
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// 生成隔离区。
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
请注意,在此示例中(与 前面的示例 相比), Worker.spawn
充当此类的异步静态构造函数,并且是创建 Worker
实例的唯一方法。这简化了 API,使创建 Worker
实例的代码更加简洁。
步骤 4:完成隔离区设置过程
#在此步骤中,您将完成基本的隔离区设置过程。这几乎完全与 前面的示例 相对应,没有新的概念。有一点细微的变化,代码被分解成更多的方法,这是一种设计实践,可以帮助您在此示例的其余部分添加更多功能。有关设置隔离区的详细基本过程,请参见 基本端口示例 。
首先,创建从 Worker.spawn
方法返回的私有构造函数。在构造函数体中,向主隔离区使用的接收端口添加一个监听器,并将一个尚未定义的方法传递给该监听器,该方法称为 _handleResponsesFromIsolate
。
class Worker {
final SendPort _commands;
final ReceivePort _responses;
// ···
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
接下来,添加 _startRemoteIsolate
中负责初始化 worker 隔离区上的端口的代码。 回想一下 此方法在 Worker.spawn
方法中传递给 Isolate.spawn
,并且它将主隔离区的 SendPort
作为参数传递。
- 创建一个新的
ReceivePort
。 - 将该端口的
SendPort
发送回主隔离区。 - 调用一个名为
_handleCommandsToIsolate
的新方法,并将新的ReceivePort
和来自主隔离区的SendPort
作为参数传递。
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
接下来,添加 _handleCommandsToIsolate
方法,该方法负责接收来自主隔离区的消息,在 worker 隔离区解码 json,并将解码后的 json 发送回作为响应。
- 首先,在 worker 隔离区的
ReceivePort
上声明一个监听器。 - 在添加到监听器的回调中,尝试在一个
try
/catch
块 中解码从主隔离区传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主隔离区。 - 如果发生错误,则发送回
RemoteError
。
static void _handleCommandsToIsolate(
ReceivePort receivePort, SendPort sendPort) {
receivePort.listen((message) {
try {
final jsonData = jsonDecode(message as String);
sendPort.send(jsonData);
} catch (e) {
sendPort.send(RemoteError(e.toString(), ''));
}
});
}
接下来,添加 _handleResponsesFromIsolate
方法的代码。
- 首先,检查消息是否为
RemoteError
,如果是,则应throw
该错误。 - 否则,打印消息。在后面的步骤中,您将更新此代码以返回消息而不是打印它们。
void _handleResponsesFromIsolate(dynamic message) {
if (message is RemoteError) {
throw message;
} else {
print(message);
}
}
最后,添加 parseJson
方法,这是一个公共方法,允许外部代码将 JSON 发送到 worker 隔离区进行解码。
Future<Object?> parseJson(String message) async {
_commands.send(message);
}
您将在下一步更新此方法。
步骤 5:同时处理多条消息
#目前,如果您快速向 worker 隔离区发送消息,隔离区将 按完成顺序 发送解码后的 json 响应,而不是按发送顺序发送。您无法确定哪个响应对应于哪个消息。
在此步骤中,您将通过为每条消息指定一个 ID 并使用 Completer
对象来解决此问题,以确保当外部代码调用 parseJson
时,返回给该调用者的响应是正确的响应。
首先,向 Worker
添加两个类级别属性:
Map<int, Completer<Object?>> _activeRequests
int _idCounter
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
_activeRequests
映射将发送到 worker 隔离区的消息与 Completer
关联起来。 _activeRequests
中使用的键取自 _idCounter
,随着发送更多消息,它将增加。
接下来,更新 parseJson
方法,以便在它向 worker 隔离区发送消息之前创建 completer。
- 首先创建一个
Completer
。 - 接下来,递增
_idCounter
,以便每个Completer
都与一个唯一的数字关联。 - 向
_activeRequests
映射中添加一个条目,其中键是_idCounter
的当前数字,值是 completer。 - 将消息与 ID 一起发送到 worker 隔离区。因为您只能通过
SendPort
发送一个值,所以将 ID 和消息包装在一个 记录 中。 - 最后,返回 completer 的 future,该 future 最终将包含来自 worker 隔离区的响应。
Future<Object?> parseJson(String message) async {
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
您还需要更新 _handleResponsesFromIsolate
和 _handleCommandsToIsolate
来处理此系统。
在 _handleCommandsToIsolate
中,您需要考虑 message
是一个包含两个值的记录,而不仅仅是 json 文本。通过从 message
解构值来做到这一点。
然后,在解码 json 后,更新对 sendPort.send
的调用以将 ID 和解码后的 json 发送回主隔离区,再次使用记录。
static void _handleCommandsToIsolate(
ReceivePort receivePort, SendPort sendPort) {
receivePort.listen((message) {
final (int id, String jsonText) = message as (int, String); // New
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData)); // Updated
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
最后,更新 _handleResponsesFromIsolate
。
- 首先,再次从
message
参数解构 ID 和响应。 - 然后,从
_activeRequests
映射中删除与该请求对应的 completer。 - 最后,不是抛出错误或打印解码后的 json,而是完成 completer,传入响应。完成后,响应将返回到在主隔离区上调用
parseJson
的代码。
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?); // New
final completer = _activeRequests.remove(id)!; // New
if (response is RemoteError) {
completer.completeError(response); // Updated
} else {
completer.complete(response); // Updated
}
}
步骤 6:添加关闭端口的功能
#当您的代码不再使用隔离区时,您应该关闭主隔离区和 worker 隔离区上的端口。
- 首先,添加一个类级别布尔值来跟踪端口是否已关闭。
- 然后,添加
Worker.close
方法。在此方法中:- 将
_closed
更新为 true。 - 向 worker 隔离区发送最终消息。 此消息是一个读取“shutdown”的
String
,但它可以是您喜欢的任何对象。 您将在下一段代码中使用它。
- 将
- 最后,检查
_activeRequests
是否为空。如果是,则关闭名为_responses
的主隔离区的ReceivePort
。
class Worker {
bool _closed = false;
// ···
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
- 接下来,您需要在 worker 隔离区处理“shutdown”消息。将以下代码添加到
_handleCommandsToIsolate
方法中。此代码将检查消息是否为读取“shutdown”的String
。如果是,它将关闭 worker 隔离区的ReceivePort
并返回。
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
// New if-block.
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
- 最后,您应该添加代码来检查端口是否已关闭,然后再尝试发送消息。在
Worker.parseJson
方法中添加一行。
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed'); // New
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
完整示例
#在此展开以查看完整示例
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = await Worker.spawn();
print(await worker.parseJson('{"key":"value"}'));
print(await worker.parseJson('"banana"'));
print(await worker.parseJson('[true, false, null, 1, "string"]'));
print(
await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
worker.close();
}
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed');
final completer = Completer<Object?>.sync();
final id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
static Future<Worker> spawn() async {
// 创建一个接收端口并添加其初始消息处理程序
final initPort = RawReceivePort();
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
final commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// 生成隔离区。
try {
await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
final completer = _activeRequests.remove(id)!;
if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}
if (_closed && _activeRequests.isEmpty) _responses.close();
}
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
static void _startRemoteIsolate(SendPort sendPort) {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
除非另有说明,否则本网站上的文档反映的是 Dart 3.6.0。页面最后更新于 2025-02-05。 查看源代码 或 报告问题.