跳到主要内容

redis_net

简介

基于redis_queue实现的redis数据通讯。 为每个session创建收发两个方向的redis_queue。 可以通过创建两个session对象互为发送方和接收方,来实现两个对象之间的通讯。 通过为session对象设置收消息的回调函数来处理消息。

组件接口

redis_net.gs

函数原型函数作用
object new_session(string from, string to, map para)*

Session.gs

实现一个基于redis_queue的全双工会话。

函数原型函数作用
void run()*
void set_callback(string type, function fn)*
bool send(...)*

样例

#pragma parallel

import gs.lang.*;
import gs.util.*;

import pkg.redis_net;

import pkg.hiredis;
import pkg.redis_pool;

void create()
{
map redis_cfg = {
"thread_num" : 1,
};
hiredis.setup_redis(redis_cfg);
map reidis_dbs = {
"redis_mq" : { // db
"driver": "redis",
"host": "127.0.0.1",
"port": 6379,
"passwd": "",
"db_index": 3,
"is_cluster": false,
},
};
for (string db, map db_config : reidis_dbs)
redis_pool=>set_db_config(db, db_config);
}

public void run()
{
coroutine.create_with_domain(nil, domain.create(), (: co_entry :));
}

void co_entry()
{
//创建两个session对象,ab的发送队列是ba的接收队列,ab的接收队列是ba的发送队列
object ab = create_session_ab();
object ba = create_session_ba();
//相互发送数据
ab.send("123");
ba.send("321");
coroutine.sleep(10);

ab.close();
ba.close();
}

public object create_session_ab()
{
map sp = {
"db" : "redis_mq",
};
object s = redis_net.new_session("a", "b", sp);
s.set_callback("message", (: on_any_msg :));
s.run();

return s;
}

public object create_session_ba()
{
map sp = {
"db" : "redis_mq",
};
object s = redis_net.new_session("b", "a", sp);
s.set_callback("message", (: on_any_msg :));
s.run();

return s;
}

void on_any_msg(object ob, string cmd, ...)
{
if (cmd != "msg_recv")
ob.send("msg_recv", unpack(1));

printf("session=%O, msg: %M\n", ob, $<[1..<1]);
}