net
简介
这是一个基于tcp协议的网络库
对driver自有网络接口(通过man network查看)的封装,构建一个易于使用的网络库
提供接入服务功能的服务对象: socket_server
提供客户端接入的客户端侧连接对象: socket_client
服务对象接受客户端连接后创建的服务端测连接对象: socket_server_client
通过注册回调函数的方式,处理网络事件/消息
组件接口
FSocketBase.gs
socket的基础组件
函数原型 | 函数作用 |
---|---|
int get_max_send_size() | 获取允许的单次数据发送大小的最大值(发送数据经由msgpack打包后计算出来的值) |
void set_max_send_size(int size) | 设置允许的单次数据发送大小的最大值(默认值为512K) |
void set_max_sending_queue_length(int len) | 设置底层发送队列允许积压的数据包的最大数量(默认值为无限制) |
function get_pack_func() | 返回当前的打包函数 |
function get_unpack_func() | 获得当前的解包函数 |
int get_port_recv_size() | 获取接收缓冲区的大小 |
void set_port_recv_size(int port_recv_size) | 设置接收缓冲区的大小(默认值值为32K) |
void bind_value(mixed value) | 绑定一个关联值 |
mixed get_bind_value() | 获取和本对象绑定的值 |
void set_callback(string type, function callback) | 注册指定类型的回调函数 |
socket get_socket() | 返回关联的socket |
FSocketClientBase.gs
socket的基础组件
函数原型 | 函数作用 |
---|---|
string remote_address() | 返回对端地址 |
bool is_connected() | 是否已经连接 |
int send(mixed payload) | 发送数据 |
int send_raw(buffer payload) | 直接发送buffer类型元数据 |
int send_raw_batch(array payloads) | 批量发送buffer类型元数据 |
int send_binary(buffer payload) | 发送buffer类型的数据 |
int send_text(string payload) | 发送string类型的数据 |
int send_ping() | ping/pong机制中用于发送一个PING包 |
int send_pong() | ping/pong机制中用于发送一个PONG包 |
void disconnect() | 主动断开连接 |
void set_recv_key(string key) | 设置接收时网络数据的解密密钥 |
string get_recv_key() | 获取当前已设置的网络数据解密密钥 |
void ignore_recv_msg(bool flag = true) | 设置或者取消“忽略所有已接收数据" |
bool recv_msg_ignored() | 返回是否忽略所有已接收数据 |
net.gs
函数原型 | 函数作用 |
---|---|
object create_server(map para = ) | 创建一个提供网络连接接入服务的对象 |
object create_client(map para = ) | 创建用于接入网络连接服务的对象 |
SocketClient.gs
用于接入服务的客户端对象
包含组件socket_base
函数原型 | 函数作用 |
---|---|
bool is_socket_client() | 我是socket client对象 |
bool connect(string ip, int port, int timeout = 5) | 连接指定的主机 |
SocketServer.gs
提供网络连接接入服务的对象
包含组件socket_base
函数原型 | 函数作用 |
---|---|
bool is_socket_server() | 我是socket server对象 |
bool listen(int port, int backlog = 128) | 启动接入服务,侦听指定端口 |
void stop_listen() | 停止监听服务(已建立的连接不断开) |
void add(object server_client_ob) | 记录一个服务侧客户端对象 |
void remove(object server_client_ob) | 移除一个服务侧客户端对象记录 |
bool clients_empty() | 客户端列表是否为空 |
void dump_clients() | 列出当前服务侧客户端列表 |
SocketServerClient.gs
socket_server接受客户端连接后创建的服务器侧连接对象
包含组件socket_base
函数原型 | 函数作用 |
---|---|
bool is_socket_server_client() | 我是服务器侧的客户端对象 |
常量
PORT_RECV_SIZE
用于接收网络数据的缓冲区大小,这是默认值,可以通过接口修改
public const int PORT_RECV_SIZE = "1024 * 32"
MAX_SEND_SIZE
单次允许发送的数据大小(经过msgpack打包后的大小)
public const int MAX_SEND_SIZE = "1024 * 512"
枚举
SocketEvent
网络事件
枚举成员 | 值 | 描述 |
---|---|---|
Open | ||
Close | ||
Message | ||
Ping | ||
Pong |
样例
// sample.gs
#pragma parallel
import pkg.net;
void sample()
{
int port = 12345;
int total = 0;
sync_object sem = sync_object.create_semaphore();
function msg_func = [&total](object server_client, mixed data) {
if (data == -1)
{
server_client.send(-1);
sem.give();
}
else
{
total += data;
server_client.send(data);
}
};
object s = start_server(port, msg_func);
int totalc = 0;
sync_object semc = sync_object.create_semaphore();
function client_msg_func = [&totalc](object client, mixed data) {
if (data == -1)
semc.give();
else
{
totalc += data;
}
};
let object c, queue q = create_client("localhost", port, client_msg_func);
defer {
if (q)
q.close();
if (c)
c.close();
if (s)
s.close();
if (sem)
sem.close();
if (semc)
semc.close();
}
for (int i = 1 upto 1000)
q.send_dup(i);
q.send_dup(-1);
sem.take();
semc.take();
printf("total: %d, totac: %d\n", total, totalc);
}
object start_server(int port, function message_func)
{
map server_para = {
"port_recv_size" : 1024 * 1024,
"max_send_size" : 1024 * 1024,
};
object s = net.create_server(server_para);
bool success = false;
defer {
if (! success)
s.close();
}
s.set_callback("event", (: on_server_event :));
s.set_callback("message", (: on_server_message, message_func :));
s.set_callback("pingpong", (: on_server_pingpong :));
if (! s.listen(port))
return nil;
success = true;
return s;
}
array create_client(string host, int port, function message_func)
{
map client_para = {
"port_recv_size" : 1024 * 1024,
"max_send_size" : 1024 * 1024,
};
object c = net.create_client(client_para);
bool success = false;
defer {
if (! success)
c.close();
}
c.set_callback("event", (: on_client_event :));
c.set_callback("message", (: on_client_message, message_func :));
if (! c.connect(host, port))
return [ nil, nil ];
queue q = queue.create("");
coroutine co = coroutine.create_with_domain(nil, domain.create(), (: co_entry, c, q :));
success = true;
return [ c, q ];
}
void co_entry(object c, queue q)
{
defer {
this_coroutine().get_entry_domain().close();
};
int n = 0;
while (true)
{
mixed msg = q.receive();
if (is_nil(msg))
break;
c.send(msg);
if (msg == -1)
break;
if (++n % 100 == 0)
coroutine.sleep(0.01);
}
}
void on_server_event(SocketEvent evt, object server, object server_client)
{
switch (evt)
{
case SocketEvent.Open:
break;
case SocketEvent.Close:
server_client.close();
break;
}
}
void on_server_message(function message_func, object server, object server_client, mixed data)
{
message_func.call(server_client, data);
}
void on_server_pingpong(object server, bool ping, object server_client)
{
if (ping)
server_client.send_pong();
else
server_client.send_ping();
}
void on_client_event(SocketEvent evt, object client)
{
switch (evt)
{
case SocketEvent.Close:
client.close();
break;
}
}
void on_client_message(function message_func, object client, mixed data)
{
message_func.call(client, data);
}