ob_sync
简介
提供一种同步机制,让处于不同域的对象之间进行数据同步
数据同步的简单流程:
一组待同步的处于不同域的对象,绑定一个相同的数据同步管理器对象
一个对象通过管理器以事件的方式将同步给其他对象的数据进行发布
其他对象接收到事件后择机对同步事件进行处理,以便达到数据同步的目的
其中,同步数据的发布和同步事件的处理都在对象自己所处的域,整个同步流程不切域
组件接口
FObSyncBase.gs
数据同步机制中,进行数据同步的对象必须包含的组件
函数原型 | 函数作用 |
---|---|
void sync_bind(object sync_ob) | 和一个数据同步管理器对象绑定 |
void sync_unbind() | 和已绑定的数据同步管理器对象解绑 |
object get_sync_ob() | 获取已绑定的数据同步管理器对象 |
void sync_heartbeat() | 主动进行一次数据同步心跳(立刻处理本对象待处理的同步事件并清空) |
ObSyncMgr.gs
同步机制中的数据同步对象
协调管理一组处于不同域的对象之间进行数据同步
函数原型 | 函数作用 |
---|---|
void bind(object ob) | 将一个需要同步数据的对象绑定到本管理器上来 |
void unbind(object ob) | 将一个已经绑定到本管理器的对象解绑 |
void flush(object ob) | 将一个对象未处理的同步事件立刻处理并清空 |
void sync(object ob, string event, ...) | 将某个对象的数据同步给本管理器内的其他对象 |
void sync_to(array others, object ob, string event, ...) | 将某个对象的数据同步给本管理器内的一组指定对象 |
ob_sync.gs
函数原型 | 函数作用 |
---|---|
object create_sync_ob() | 创建一个数据同步管理器对象 |
void start_sync(...) | 一组处于不同域的对象开始同步 |
void stop_sync(...) | 一组处于不同域的对象停止同步 |
样例
import gs.lang.*;
import gs.util.*;
component pkg.ob_sync.FObSyncBase;
map _dbase = {};
public mixed get(string key)
{
return _dbase[key];
}
public void set(string key, mixed val)
{
if (get_sync_ob())
get_sync_ob().sync(this, "set", key, val);
_dbase[key] = val;
printf("%O===>%s=%M\n", this, "set", $<);
}
override void FObSyncBase.on_sync_event(string event, ...)
{
switch (event)
{
case "set":
_dbase[$2] = $3;
printf("%O<===%s=%M\n", this, "set", $<[1..<1]);
break;
}
}
#pragma parallel
import gs.lang.*;
import gs.util.*;
import pkg.ob_sync;
import .c3;
public void test()
{
int n = 2;
array obs = [];
for (int i = 1 upto n)
{
object c = new_object(c3, domain.create());
c=>set("hp", random(0x7FFFFFF));
obs.push_back(c);
}
defer {
for (object c : obs)
c.close();
}
ob_sync.start_sync(unpack(obs));
array cos = [];
for (object c : obs)
{
coroutine co;
if (__index == 0)
co = coroutine.create_in_thread(nil, domain.create(), (: co_entry, c :));
else
co = coroutine.create_in_thread(nil, domain.create(), (: co_entry_p, c, cos[0] :));
cos.push_back(co);
}
for (coroutine co : cos)
co.wait();
ob_sync.stop_sync(unpack(obs));
int hp = -1;
for (object c : obs)
{
int c_hp = c=>get("hp");
if (hp == -1)
hp = c_hp;
printf("[%O] hp=%d\n", c, c_hp);
assert(c_hp == hp, "%d != %d", c_hp, hp);
}
}
void co_entry(object c)
{
defer {
this_coroutine().get_entry_domain().close();
}
int n = 100;
for (int i = 1 upto n)
{
c=>set("hp", random(0x7FFFFFF));
coroutine.sleep(1);
}
}
void co_entry_p(object c, coroutine co)
{
defer {
this_coroutine().get_entry_domain().close();
}
while (co)
{
c=>sync_heartbeat();
coroutine.sleep(2);
}
}