GS中的多线程
基础概念
- 并发和并行
- 吃饭吃到一半,电话来了,一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
- 吃饭吃到一半,电话来了,停了下来接了电话,接完后继续吃饭,这说明你支持并发。
- 吃饭吃到一半,电话来了,一边打电话一边吃饭,这说明你支持并行。
- 线程和协程:GS支持多线程,多协程,默认的调度器中有N个线程调度M个协程(一般M远大于N),一般N的个数会配置为略大于核数,启动时通过 /s 指定,如果过大,一个进程开了远多于核数的线程,会增加线程间切换的开销,对性能不会提升反而会下降;配得太小,无法充分利用多核性能,所以是略大于核数,具体还是看使用场景,如果有非常大量的IO操作,多开点线程还是有好处的
- 跨域/域锁:GS使用域(domain)来保证数据的多线程安全,一个域可以看成是一个互斥锁,当访问某个对象的时候,就会加锁,读写操作结束后再释放锁,这个过程称作跨域,
- RW/RO对象:普通对象都是RW(read-write)对象,即一定会和某个域绑定在一起,确保访问数据的线程安全,如果对象的文件中有#pragma parallel,说明是RO(read-only)对象,只能有parallel或readonly的成员变量,且所有方法都是parallel的
正确的并行
先来一个常见的错误的并行执行的示例,我们使用 最裸不带优化的计算素数作为示例:
void run()
{
// 希望10个协程并行计算不同阶段的素数
int step = 10000;
int co_num = 10;
array cos = [];
for (int i = 1 upto co_num)
cos.push_back(coroutine.create(0, (: foo, (i - 1) * step, i * step - 1 :)));
printf("Calc prime num (%d)\n", step * co_num);
int t = time.time_ms();
int sum = 0;
for (coroutine co : cos)
{
co.wait();
sum += co.get_ret();
}
printf("ret = %d, cost = %dms\n", sum, time.time_ms() - t);
}
int foo(int from, int to)
{
int num = 0;
for (int i = from; i <= to; ++i)
if (is_prime(i))
num ++;
return num;
}
bool is_prime(int v)
{
if (v < 2)
return false;
for (int i = 2 upto v - 1)
if (v % i == 0)
return false;
return true;
}
run();
输出的结果是:
Calc prime num (100000)
ret = 9592, cost = 3439ms
我们没有声明对象parallel,所以默认大家都跑在一个RW对象上,显然所以这些协程都会卡在当前这个对象的域,不能并行
那我们在对象中增加 #pragma parallell,是不是就可以了呢?跑了一下,结果还是一样,因为我们创建协程使用的coroutine.create
默认会把当前域作为协程的入口域,虽然对象是parallel的,方法是parallel的,但是大家的入口是同一个域,所以还是不能并行
正确的做法是改成coroutine.create_with_domain
正确的示例:
#pragma parallel
void run()
{
// 希望10个协程并行计算不同阶段的素数
int step = 10000;
int co_num = 10;
array cos = [];
for (int i = 1 upto co_num)
cos.push_back(coroutine.create_with_domain(0, domain.create(), (: foo, (i - 1) * step, i * step - 1 :)));
printf("Calc prime num (%d)\n", step * co_num);
int t = time.time_ms();
int sum = 0;
for (coroutine co : cos)
{
co.wait();
sum += co.get_ret();
}
printf("ret = %d, cost = %dms\n", sum, time.time_ms() - t);
}
int foo(int from, int to)
{
int num = 0;
for (int i = from; i <= to; ++i)
if (is_prime(i))
num ++;
return num;
}
bool is_prime(int v)
{
if (v < 2)
return false;
for (int i = 2 upto v - 1)
if (v % i == 0)
return false;
return true;
}
run();
这样跑下来的结果是:
Calc prime num (100000)
ret = 9592, cost = 908ms
有个简化的写法,使用语法糖:@new:foo(),实际转调的是 coroutine.create_for_post_new
线程与协程
协程底层
GS语言中,线程和协程的底层实现在gcoroutine这个工程中,我们封装了一套和GS本身无关的协程库,最底层的协程调度有一套自己用汇编实现的,也有一套依赖ucontext和fiber的实现,目前实际使用的是依赖ucontext和fiber的实现,感觉更可靠一些,核心的底层resume/yield的代码见:
ContextWin ContextLinux 很简单,没几行代码,其余具体实现见:gcoroutine
协程调度
我们在上层封装了一层通过不同的调度器对协程进行调度的机制,比较常用的是并行调度器ParallelScheuler,顾名思义,使用多个线程去并行调度很多个协程,同一个协程可能在短时间内被不同的线程调度,反复横跳,底层也做了一些如果太久没有被调度到,就插到其他调度队列的实现,进行基础的负载均衡
上层通过coroutine.create或coroutine.create_with_domain来创建
同时我们也有另一个调度器的实现,每个线程精准调度一个协程,保证这个协程不会因为调度而不能被及时执行,特别是在CPU比较忙的情况下,GS中通过coroutine.create_in_thread来创建
关于线程如何调度协程的细节,我们的实现和GO是类似的,但这些细节对于我们使用的影响不大,我们只要知道,协程是可以在真的不同的系统线程中执行的,只要执行时所在的域不同,大家就可以真的并行
只读与并行
对应GS中readonly和parallell两个关键字
readonly可以修饰成员变量
parallel可以修饰成员变量和方法
只读变量
readonly的变量,每次赋值都会深拷贝,如果写得很频繁,开销很大,出现过很多次ro变量频繁赋值导致的性能问题
void VmBase::do_RO_ASSIGN(Object* this_ob, ValueType type, ValueSubType sub_type, ValueAttrib attr, Value* p1, Value* p2)
{
// RO assign do not need to call check_before_write,
// since multiple assignments in different threads may occur
// at the same time, which will cause assertion in check_before_write
// Call this_ob->_check_after_written_no_assert() directly
// Assign to readonly object's var
// Allocate new readonly value if the value is reference value
if (p2->is_reference_value() && p2->m_reference->is_modifiable())
{
// Clone the source value & make it readonly
auto* current_domain = Coroutine::get_current_co_domain();
CO_VALUE(cloned_val);
p2->clone_entirely(&cloned_val, current_domain, current_domain);
ReferenceRoParallel::make_readonly(&cloned_val);
_check_and_assign(type, sub_type, attr, p1, &cloned_val);
}
else
_check_and_assign(type, sub_type, attr, p1, p2);
// Notify this_ob was changed if it an old unit
this_ob->_check_after_written_no_assert();
}
举个例子:
map m = get_system_info();
void run()
{
map s = get_system_info();
int t = time.time_ms();
for (int i = 1 upto 1000000)
m = s;
t = time.time_ms() - t;
printf("t = %dms\n", t);
}
run();
输出结果:
t = 12ms
readonly map m := get_system_info();
void run()
{
map s = get_system_info();
int t = time.time_ms();
for (int i = 1 upto 1000000)
m := s;
t = time.time_ms() - t;
printf("t = %dms\n", t);
}
run();
输出结果:
t = 1273ms
并行变量
parallel变量,如果是个容器(map或者array),可以并行读写容器中的某个元素,但是如果容器本身要改变(添加或减少元素)就需要重新类似RO变量那样做一次深拷贝
parallel变量实现的目的就是改进RO变量的性能,在很多场景下,我们容器中元素的数量是稳定的,只是元素要频繁变动,这种情况没必要每次写都深拷贝,可以使用share_value,但是使用起来麻烦,并且也有读写加锁的开销,及时有读写锁,还是比直接访问慢不少
举个例子,我们把每一段的结果分别存起来,存在一个数组中,这是个比较常见的应用场景
#pragma parallel
const int SLICE = 10;
parallel array _multi_results := make_parallel(array.allocate(SLICE, 0));
int run()
{
int step = 10000;
int co_num = SLICE;
array cos = [];
for (int i = 1 upto co_num)
cos.push_back(coroutine.create_with_domain(0, domain.create(), (: foo, i - 1, (i - 1) * step, i * step - 1 :)));
printf("Calc prime num (%d)\n", step * co_num);
int t = time.time_ms();
int sum = 0;
for (coroutine co : cos)
{
co.wait();
sum += co.get_ret();
}
printf("ret = %d, cost = %dms\n", sum, time.time_ms() - t);
printf("rets = %O\n", _multi_results);
return sum;
}
int foo(int n, int from, int to)
{
int num = 0;
for (int i = from; i <= to; ++i)
if (is_prime(i))
num ++;
_multi_results[n] := num;
return num;
}
bool is_prime(int v)
{
if (v < 2)
return false;
for (int i = 2 upto v - 1)
if (v % i == 0)
return false;
return true;
}
run();
关于性能方面,有些情况下会比RO变量好非常多
readonly map m := get_system_info();
void run()
{
int t = time.time_ms();
for (int i = 1 upto 1000000)
{
m := m + {"jit_type" : i};
}
t = time.time_ms() - t;
printf("t = %dms\n", t);
}
run();
运行结果是:
t = 2540ms
parallel map m := make_parallel(get_system_info());
void run()
{
int t = time.time_ms();
for (int i = 1 upto 1000000)
{
m["jit_type"] := i;
}
t = time.time_ms() - t;
printf("t = %dms\n", t);
}
run();
运行结果是:
t = 25ms