跳到主要内容

simple_subscribe

简介

这是一个实现订阅/发布机制的功能模块 几点说明:

  1. 为了便于使用,订阅者应该包含组件pkg.simple_subscribe.FSubscriber;通过subscribe方法进行订阅

  2. 通过simple_subscribe.publish接口发布消息

网络频道的几点说明:

1.订阅者订阅/取消订阅网络频道的id格式为: id@host:port

2.订阅者只能以只读方式订阅网络频道消息(也就是不能通过网络发布消息)

3.网络频道的承载主机需要开启网络服务(simple_subscribe.start_server)

组件接口

Channel.gs

处理消息的频道对象

函数原型函数作用
void publish(...)发布消息
void remove_subscriber(object ob)移除一个订阅者
void do_heartbeat()心跳函数

FSubscriber.gs

订阅者基础组件

函数原型函数作用
void subscribe(string channel, function func)订阅
void unsubscribe(string channel)取消订阅
void unsubscribe_all()取消所有订阅

simple_subscribe.gs

函数原型函数作用
void publish(string channel, ...)发布消息
bool flush(string channel, mixed wait_time = -1)等待频道消息清空
void dump()调试函数,显示所有频道
void subscribe(string channel, object ob, function func)对象ob订阅频道消息
void unsubscribe(string channel, object ob)对象ob取消订阅频道
void destroy(string channel)销毁指定频道
void user_invoke(function func, ...)执行用户自定义函数调用(在异步消息处理协程对象的协程里执行)
bool start_server(int port, map para = )开启订阅功能的网络服务
void stop_server()停止订阅功能的网络服务
bool is_local_channel(string channel)判断是不是一个本地频道

样例

import pkg.simple_subscribe;
public void pkg_sample()
{
simple_subscribe.start_server(9999);
coroutine.sleep(0.5);
defer {
simple_subscribe.stop_server();
}

object ob = new_object(FSubscriber, domain.create());
defer {
ob.close();
}
printf("---SUBSCRIBE CHANNEL0\n");
ob.subscribe("CHANNEL0", (: on_local_message :));

printf("---SUBSCRIBE CHANNEL0 VIA NETWORK\n");
ob.subscribe("CHANNEL0@127.0.0.1:9999", (: on_remote_message :));
coroutine.sleep(0.5);

printf("---PUBLISH CHANNEL0 -> 1\n");
for (int i = 1 upto 100)
simple_subscribe.publish("CHANNEL0", i);

coroutine.sleep(1);
ob.unsubscribe_all();
printf("---UNSUBSCRIBE ALL\n");

printf("---PUBLISH CHANNEL0 -> 2\n");
for (int i = 1 upto 100)
simple_subscribe.publish("CHANNEL0", i);
coroutine.sleep(1);
printf("---FINISH\n");
}

parallel void on_local_message(string channel, object ob, ...)
{
printf("CHANNEL [%s] LOCAL MESSAGE: %M\n", channel, unpack(2));
}

parallel void on_remote_message(string channel, object ob, ...)
{
printf("CHANNEL [%s] REMOTE MESSAGE: %M\n", channel, unpack(2));
}