coroutine
传统意义上的协程是指:将比较耗时的任务通过切片的方式,将几个这样的任务做到逻辑上的平行执行,来达到增强运行效率的目的:
假设有两个因为IO或者网络需要各自阻塞一定时间的任务,它们分别需要:
A : 10s CPU时间 > 10s IO时间 > 10s CPU时间 > 10s IO时间
B : 10s CPU时间 > 10s IO时间 > 10s CPU时间 > 10s IO时间
现在假设只有一个CPU,如果现在需要发起A、B,并需要等待AB两个任务的结果再继续执行下一步,如果使用传统的多线程,可能会发生这样的事情:
A : 10s CPU时间 > 10s IO时间 > 10s CPU时间 > 10s IO时间
B : 10s CPU时间 > 10s IO时间 > 10s CPU时间 > 10s IO时间
| 多线程执行,| 两边同时IO | 假设A先完成 | A在上一步先完
| AB争夺片轮 | 对IO操作发 |IO操作,领先B| 成操作,因此A
|忽略切换损耗 |生阻塞,假设|所以没有IO争夺| 在这一步领先B
| 总耗时20s | A先完成操作| 各自花费10s | 无冲突,各自耗时10s
| | A花费10s |
| | B花费20s |
A耗时 20 + 10 + 10 + 10 = 50s
B耗时 20 + 20 + 10 + 10 = 60s
由于需要等待两个任务全部完成 整个操作耗时60s
现在如果我们能够这样安排任务
A : 10s CPU时间 > 10s IO时间 > 10s CPU时间 > 10s IO时间
| A执行CPU | A 执行IO | A 执行CPU | A 执行IO
| 花费 10s | 10s | 10s | 10s
| B等待,10s | IO与CPU不冲 | B 执行IO | B 执行CPU |B 执行IO
| | 突,10s | 10s | 10s |10s
B : (轮空) > 10s CPU时间 > 10s IO时间 > 10s CPU时间 > 10s IO时间
A耗时 10 + 10 + 10 + 10 = 40s
B耗时 10 + 10 + 10 + 10 + 10 = 50s
由于需要等待两个任务全部完成 整个操作耗时50s
可以看到,在一些情况下(高IO 高并发等),合理安排任务的终止和等待,比起直接一股脑地多线程效率要高。
GS提供的协程机制是一个综合性较强的、横跨语言各个功能的综合性系统。当开发者在尝试调用一些占用资源的耗时操作(文件、IO、网络等)时,GS虚拟机在一定程度上会自动对协程进行调度,保证资源被合理利用。
协程、线程、域
实际上GS和golang的设计在这一方面类似,都在模糊协程与线程的关系。尽可能让协程和线程的选择以及协程的切换能够自动完成,降低开发人员的心智负担。
从底层来看,GS的协程在域(Domain)内执行,协程在运行时,会占用一个域,这意味着:
- 协程在域(Domain)上执行
- 一个域可以有多个协程尝试执行
- 同一域的两个协程
不能
同时执行 - 不同域的两个协程
可以
同时执行
另外,当一个协程休眠
(或者叫挂起
)时,会释放占有的域,此时其他协程即可继续运行。
协程(coroutine)的基本语法:
int add_func(int a, int b)
{
return a + b;
}
int a = 2, b = 10;
coroutine co = coroutine.create("hello", (: add_func , a, b :));
协程的创建方法
协程可以通过coroutine.create系列的3个函数创建;除了必须指定的协程名称(可以是匿名)和入口函数相同以外,这几个函数的区别在于可以指定协程的执行域和指定调度这个协程的OS线程.
-
coroutine.create: 创建协程时,使用入口函数的参数域做为协程执行域,当入口函数的参数域为nil时,使用创建时的当前域(this_domain())做为协程的执行域;使用默认的OS线程调度
// test.gs
#pragma parallel
public void test()
{
// 故意指定函数的参数域
function fn1 = (: co_entry :);
fn1.bind_arg_domain(domain.create());
// 无参数域
function fn2 = (: co_entry :);
// 创建协程
coroutine co1 = coroutine.create(nil, fn1);
coroutine co2 = coroutine.create(nil, fn2);
// 协程执行域就是函数的参数域
printf("----------------------\n");
printf("this domain:%M\n", this_domain());
printf("co1 entry domain: %M\n", co1.get_entry_domain());
printf("fn1 arg domain: %M\n", fn1.get_arg_domain());
printf("co2 entry domain: %M\n", co2.get_entry_domain());
printf("fn2 arg domain: %M\n", fn2.get_arg_domain());
printf("----------------------\n");
}
void co_entry()
{
printf("+++++++++++++++++++++++++++\n");
coroutine co = this_coroutine();
printf("entry domain: %M\n", co.get_entry_domain());
printf("+++++++++++++++++++++++++++\n");
}import .test;
test.test();输出结果在此:
----------------------
this domain:domain[0:v1]zero
co1 entry domain: domain[11:v1]d1
fn1 arg domain: domain[11:v1]d1
co2 entry domain: domain[0:v1]zero
fn2 arg domain: nil
----------------------
+++++++++++++++++++++++++++
Shell> entry domain: domain[11:v1]d1
+++++++++++++++++++++++++++
+++++++++++++++++++++++++++
entry domain: domain[0:v1]zero
+++++++++++++++++++++++++++ -
coroutine.create_with_domain: 创建协程时,使用参数指定的域做为协程执行域(entry domain),使用默认的OS线程调度;其中,入口函数的参数域必须与协程执行域一致(相同或者为nil)
// test.gs
// test.gs
#pragma parallel
public void test()
{
domain d = domain.create();
// 故意指定函数的参数域
function fn1 = (: co_entry :);
fn1.bind_arg_domain(d);
// 无参数域
function fn2 = (: co_entry :);
// 有参数域但是和协程执行域不一致
function fn3 = (: co_entry :);
fn3.bind_arg_domain(this_domain());
// 创建协程
coroutine co1 = coroutine.create_with_domain(nil, d, fn1);
coroutine co2 = coroutine.create_with_domain(nil, d, fn2);
// 这个创建要发生错误
coroutine co3;
catch(co3 = coroutine.create_with_domain(nil, d, fn3));
// 协程执行域就是函数的参数域
printf("----------------------\n");
printf("this domain:%M\n", this_domain());
printf("co1 entry domain: %M\n", co1.get_entry_domain());
printf("fn1 arg domain: %M\n", fn1.get_arg_domain());
printf("co2 entry domain: %M\n", co2.get_entry_domain());
printf("fn2 arg domain: %M\n", fn2.get_arg_domain());
printf("----------------------\n");
}
void co_entry()
{
coroutine.sleep(1);
printf("+++++++++++++++++++++++++++\n");
coroutine co = this_coroutine();
printf("entry domain: %M\n", co.get_entry_domain());
printf("+++++++++++++++++++++++++++\n");
}import .test;
test.test();输出结果在此:
Error(-1): The function 'co_entry' can not be called in domain[11:v1]d1, expected to be called in domain[0:v1]zero
Coroutine: co[12:v1]shell
Domain: domain[0:v1]zero [local call]
At unknown:0 (create_with_domain) in object[31:v2]
Argument name = nil
Argument domain = domain[11:v1]d1
Argument name_or_func = (: .co_entry :)object[31:v2]/test.gs<Parallel>
Domain: domain[0:v1]zero [local call]
At /test.gs:25 (test) in object[31:v2]
Variable d = domain[11:v1]d1
Variable fn1 = (: .co_entry :)object[31:v2]/test.gs<Parallel>
Variable fn2 = (: .co_entry :)object[31:v2]/test.gs<Parallel>
Variable fn3 = (: .co_entry :)object[31:v2]/test.gs<Parallel>
Variable co1 = co[13:v2]anonymous
Variable co2 = co[14:v1]anonymous
Variable co3 = nil
Variable $catch_ret_0 = nil
Domain: domain[0:v1]zero [local call]
At /@shell_1.gs:8 (::entry) in object[32:v1]
Domain: domain[0:v1]zero [local call]
Domain: domain[0:v1]zero [local call]
Domain: domain[0:v1]zero [local call]
Domain: domain[0:v1]zero [local call]
At <Internal routine>
----------------------
this domain:domain[0:v1]zero
co1 entry domain: domain[11:v1]d1
fn1 arg domain: domain[11:v1]d1
co2 entry domain: domain[11:v1]d1
fn2 arg domain: nil
----------------------
Shell> +++++++++++++++++++++++++++
entry domain: domain[11:v1]d1
+++++++++++++++++++++++++++
+++++++++++++++++++++++++++
entry domain: domain[11:v1]d1
+++++++++++++++++++++++++++ -
coroutine.create_in_thread:创建协程时,优先使用入口函数的参数域做为协程的执行域,当入口函数参数域为nil时,使用参数指定的域做为协程执行域;特别的,为这个协程准备一个单独的OS线程进行调度
// test.gs
#pragma parallel
public void test()
{
domain d = domain.create();
// 故意指定函数的参数域
function fn1 = (: co_entry :);
domain d1 = domain.create();
fn1.bind_arg_domain(d1);
// 无参数域
function fn2 = (: co_entry :);
// 创建协程
coroutine co1 = coroutine.create_in_thread(nil, d, fn1);
coroutine co2 = coroutine.create_in_thread(nil, d, fn2);
// 协程执行域就是函数的参数域
printf("----------------------\n");
printf("this domain:%M, d: %M\n", this_domain(), d);
printf("co1 entry domain: %M\n", co1.get_entry_domain());
printf("fn1 arg domain: %M\n", fn1.get_arg_domain());
printf("co2 entry domain: %M\n", co2.get_entry_domain());
printf("fn2 arg domain: %M\n", fn2.get_arg_domain());
printf("----------------------\n");
}
void co_entry()
{
coroutine.sleep(1);
printf("+++++++++++++++++++++++++++\n");
coroutine co = this_coroutine();
printf("entry domain: %M\n", co.get_entry_domain());
printf("+++++++++++++++++++++++++++\n");
}import .test;
test.test();----------------------
this domain:domain[0:v1]zero, d: domain[21:v1]d11
co1 entry domain: domain[22:v1]d12
fn1 arg domain: domain[22:v1]d12
co2 entry domain: domain[21:v1]d11
fn2 arg domain: nil
----------------------
Shell> +++++++++++++++++++++++++++
entry domain: domain[22:v1]d12
+++++++++++++++++++++++++++
+++++++++++++++++++++++++++
entry domain: domain[21:v1]d11
+++++++++++++++++++++++++++ -
协程的调度规则
- 相同执行域的协程在任一时刻最多只会有一个协程正在运行:这些协程由相同的OS线程调度,必然不可能同时运行;
- 不同执行域的协程是可以并发的:不同执行域的协程可能由相同的OS线程调度也可能不是,当不同执行域的协程由相同的OS线程调度时,这些协程是不会并发执行的;反之,则可以
// test.gs
#pragma parallel
readonly int _count := 0;
// 演示多协程串行执行(计数不会错)
public void test()
{
// co1和co2的执行域是相同的
// 根据协程调度规则:相同执行域的协程在任一时刻只有一个在运行
// 因此能保证_count的计数不会错
int N = 10000;
_count := 0;;
coroutine co1 = coroutine.create(nil, (: co_entry, N :));
coroutine co2 = coroutine.create(nil, (: co_entry, N :));
co1.wait();
co2.wait();
// 绝对不可能报错
assert(_count == N * 2);
}
// 演示多协程并发执行(计数会错)
public void test2()
{
// co1和co2的执行域是不相同的
// 根据协程调度规则:不同执行域的协程是可以并发执行的
// 因此不能保证_count的计数不会错
int N = 10000;
_count := 0;
coroutine co1 = coroutine.create_in_thread(nil, domain.create(), (: co_entry, N :));
coroutine co2 = coroutine.create_in_thread(nil, domain.create(), (: co_entry, N :));
co1.wait();
co2.wait();
// 这里9999.9999%会报错
assert(_count == N * 2);
}
void co_entry(int n)
{
for (int i = 1 upto n)
{
if (i % 100 == 0)
coroutine.sleep(0);
_count := _count + 1;
}
}
获取Coroutine返回值
目前已支持在协程结束后,通过co获取协程的返回值,只要协程入口函数有申明返回值且在协程预处CoState.ENDING状态之后,就可以通过co.get_ret()获取这个返回值,这样大大方便了多线程相关业务的开发,避免需要额外利用queue、share_value或额外的回调方法去同步协程的结果
map co_get_data()
{
coroutine.sleep(0.5);
return make_parallel(get_system_info());
}
coroutine co = coroutine.create(nil, (: co_get_data :));
co.wait();
write(co.get_ret());
可以看到获取了system_info,结果为:
{ /* sizeof() == 56 */
"version" : "1.0.220829.01",
"run_in_main_thread" : false,
"first_execution" : true,
"git_commit_version" : "784cd0ccb2dc980199160a99e6d9661a5de609d2 @ origin/master @ 2022-09-16 07:27:53 +0000",
...后续略...
}
要注意的是,返回值如果是引用类型,必须为parallel或constant,以避免产生并发访问问题
Coroutine常用外部函数
下面列出coroutine类型一些常用的外部函数以及用法。
1. 协程休眠
函数原型:
void coroutine.sleep(int/real wait_time)
使用方法:
static函数,休眠当前线程(this_coroutine())
2. 等待协程结束
函数原型:
bool coroutine.wait(mixed wait_time = -1)
使用方法:
成员函数,等待协程结束
3. 当前协程
函数原型:
coroutine system.core.this_coroutine();
使用方法:
获取当前调用所处的协程
4. 所有协程
函数原型:
array coroutine.get_all();
使用方法:
获取系统中所有协程,调试时常用
5. 获取协程的调用栈信息
函数原型:
array? coroutine.get_call_stacks(handle co, bool detail = false, bool with_ob_domain = false)
使用方法:
成员函数,获取协程的调用栈信息,调试时常用
6. 获取协程执行时所在的域
函数原型:
domain coroutine.get_entry_domain(coroutine co)
使用方法:
这里entry不是指的创建协程时传入的函数;实际上当协程执行时所在的域和创建时传入的函数的参数域不一致时,协程执行时会报错
协程执行域(entry domain)示例:
public void test()
{
printf("current domain: %M\n", this_domain());
coroutine c0 = coroutine.create(nil, (: printf, "c0\n" :));
printf("c0 entry domain: %M\n", c0.get_entry_domain());
c0.wait();
coroutine c1 = coroutine.create_with_domain(nil, domain.create(), (: printf, "c1\n" :));
printf("c1 entry domain: %M\n", c1.get_entry_domain());
c1.wait();
coroutine c2 = coroutine.create_in_thread(nil, domain.create(), (: printf, "c2\n" :));
printf("c1 entry domain: %M\n", c2.get_entry_domain());
c2.wait();
}
协程执行域和传入函数的参数域不一致时报错示例:
public void test()
{
printf("current domain: %M\n", this_domain());
function fn = (: printf, "%O\n", { 1 : 2 } :);
printf("function arg domain:%M\n", fn.get_arg_domain());
coroutine c = coroutine.create_with_domain(nil, domain.create(), fn);
printf("entry domain: %M\n", c.get_entry_domain());
c.wait();
}