关于发布订阅者模式
发布-订阅模式是一种行为设计模式,它允许多个插件或实例通过事件的发布和订阅来进行通信。
在这种模式中,发布者(又称为主题)负责发布事件,而订阅者(也称为观察者)则通过订阅主题来接收这些事件。
这种模式使得应用程序的不同部分能够松散耦合,并且可以动态地添加或删除订阅者。
kokoro-flume-channel
鉴于发布-订阅模式支持动态特性,Kokoro 也采用了这种模式。我们提供了 kokoro-flume-channel
(以下简称 flume-channel
),它基于 flume 库实现了进程内的发布-订阅模式。flume
库的详细特性可在其官方仓库中查看。我们对 flume
进行了封装,以简化发布-订阅模式的实现。
示例用法
// flume-channel 提供的函数,用于创建一个 `Mode` 为 `MPSC` 的 `Context`,其 `Resources` 为 `()`
let ctx = channel_ctx();
// 注册一个 `Subscriber`
ctx.subscribe(..);
Subscriber
是一个 trait
,任何实现了 trait Subscriber
的类型都可以注册为订阅者。默认实现包括:
FnMut()
FnMut(&Context)
FnMut(Query)
FnMut(&Context, Query)
订阅者被执行的时机由 Query
决定,有关 Query
的相关信息,请参阅 关于订阅查询 (待补充)。
// 定义事件 `Hello`
#[derive(Event)]
struct Hello(String);
fn foo(e: &Hello) {
println!("{}", e.0);
}
let ctx = channel_ctx();
ctx.subscribe(foo); // 订阅事件 `Hello`
ctx.publish(Hello("Hello World".to_string())); // 发布事件 `Hello`
在上述代码中,订阅者不会立即执行,因为发布操作本质上是 sender.send
,还需要调用 receiver.recv
。因此,我们提供了以下方法:
ctx.run()
- 迭代receiver
(会阻塞线程)ctx.next()
- 单次recv
(暂未实现)ctx.run_no_block()
- 非阻塞迭代receiver
(暂未实现)
最终,我们实现了一个简单的发布-订阅示例 Hello World:
#[derive(Event)]
struct Hello(String);
fn foo(e: &Hello) {
println!("{}", e.0);
}
let ctx = channel_ctx();
ctx.subscribe(foo);
ctx.publish(Hello("Hello World".to_string()));
// 运行
ctx.run();
// 输出:Hello World
在 kokoro-flume-channel
中,一个事件可以由多个发布者发布,并且可以由多个订阅者订阅。需要注意的是,这是一个广播系统,而不是单一消费者模型。
在 Hello World 示例中,发布操作并不是常规的发布方式。发布者应在单独的线程中工作,以便与 ctx.run()
协同运行。关于线程的生成和终止时机,请参阅 关于线程 (待补充)。