simple_subscribe
简介
这是一个实现订阅/发布机制的功能模块 几点说明:
-
为了便于使用,订阅者应该包含组件pkg.simple_subscribe.FSubscriber;通过subscribe方法进行订阅
-
通过simple_subscribe.publish接口发布消息
网络频道的几点说明:
1.订阅者订阅/取消订阅网络频道的id格式为: id@host:port
2.订阅者只能以只读方式订阅网络频道消息(也就是不能通过网络发布消息)
3.网络频道的承载主机需要开启网络服务(simple_subscribe.start_server)
组件接口
Channel.gs
处理消息的频道对象
函数原型 | 函数作用 |
---|---|
void publish(...) | 发布消息 |
void remove_subscriber(object ob) | 移除一个订阅者 |
void do_heartbeat() | 心跳函数 |
FSubscriber.gs
订阅者基础组件
函数原型 | 函数作用 |
---|---|
void subscribe(string channel, function func) | 订阅 |
void unsubscribe(string channel) | 取消订阅 |
void unsubscribe_all() | 取消所有订阅 |
simple_subscribe.gs
函数原型 | 函数作用 |
---|---|
void publish(string channel, ...) | 发布消息 |
bool flush(string channel, mixed wait_time = -1) | 等待频道消息清空 |
void dump() | 调试函数,显示所有频道 |
void subscribe(string channel, object ob, function func) | 对象ob订阅频道消息 |
void unsubscribe(string channel, object ob) | 对象ob取消订阅频道 |
void destroy(string channel) | 销毁指定频道 |
void user_invoke(function func, ...) | 执行用户自定义函数调用(在异步消息处理协程对象的协程里执行) |
bool start_server(int port, map para = ) | 开启订阅功能的网络服务 |
void stop_server() | 停止订阅功能的网络服务 |
bool is_local_channel(string channel) | 判断是不是一个本地频道 |
样例
import pkg.simple_subscribe;
public void pkg_sample()
{
simple_subscribe.start_server(9999);
coroutine.sleep(0.5);
defer {
simple_subscribe.stop_server();
}
object ob = new_object(FSubscriber, domain.create());
defer {
ob.close();
}
printf("---SUBSCRIBE CHANNEL0\n");
ob.subscribe("CHANNEL0", (: on_local_message :));
printf("---SUBSCRIBE CHANNEL0 VIA NETWORK\n");
ob.subscribe("CHANNEL0@127.0.0.1:9999", (: on_remote_message :));
coroutine.sleep(0.5);
printf("---PUBLISH CHANNEL0 -> 1\n");
for (int i = 1 upto 100)
simple_subscribe.publish("CHANNEL0", i);
coroutine.sleep(1);
ob.unsubscribe_all();
printf("---UNSUBSCRIBE ALL\n");
printf("---PUBLISH CHANNEL0 -> 2\n");
for (int i = 1 upto 100)
simple_subscribe.publish("CHANNEL0", i);
coroutine.sleep(1);
printf("---FINISH\n");
}
parallel void on_local_message(string channel, object ob, ...)
{
printf("CHANNEL [%s] LOCAL MESSAGE: %M\n", channel, unpack(2));
}
parallel void on_remote_message(string channel, object ob, ...)
{
printf("CHANNEL [%s] REMOTE MESSAGE: %M\n", channel, unpack(2));
}