同步对象(sync_object)
1. 概述
本章节介绍了GS中的同步机制,包括三种核心同步对象:互斥锁(Mutex)、信号量(Semaphore)和事件(Event)。GS中,锁(Mutex)、信号量(Semaphore)、事件(Event)都属于同步对象(SyncObject),同步对象被用于在并行逻辑中保证数据完整性、操作原子性或者维持工作的同步和等待关系。所有的同步对象都通过 sync_object.create_* 方法进行创建。
本教程主要面向需要学习和使用同步机制来编写并发安全的代码的同学,需要处理多协程/多线程环境下的数据同步和任务协调的同学。
阅读完本教程,应能够使用mutex实现资源的互斥访问,能够使用semaphore进行精确的任务同步控制,能够使用event实现一对多的协程唤醒机制。
2. 锁(mutex)
2.1 概念
互斥锁(Mutex)是最基本的同步机制,用于保护共享资源,防止多个协程同时访问或修改造成数据不一致。其特性就像现实生活中的"一把钥匙开一把锁"——谁拿到了钥匙(锁),谁就能进入房间(访问资源),其他人必须等待。
核心特性:
- 互斥性:同一时刻只有一个协程能持有锁。
- 递归锁:同一个协程可以多次加锁,但需要对应次数的解锁。
- 阻塞机制:其他协程对这个锁的加锁操作均会阻塞,直到锁因为
unlock重新回到空闲状态。
2.2 创建与操作
互斥锁(mutex)属于同步对象(sync_object)的一种。通过外部 函数sync_object.create_mutex(string? name = nil)创建,互斥锁调用sync_object.lock(int|real wait_time = -1)外部函数加锁,调用sync_object.unlock()外部函数解锁。在加锁与解锁之间做一些锁保护的逻辑或数据操作内容。
基本操作:
lock():互斥锁上锁,同一协程中可多次上锁。unlock():释放互斥锁,需要对应加锁次数的释放。
一个简单的示例如下:
// 创建互斥锁
sync_object mutex = sync_object.create_mutex();
// 带名称的创建(便于调试和监控)
sync_object named_mutex = sync_object.create_mutex("resource_lock");
// 互斥锁加锁
mutex.lock();
// 做一些所保护的逻辑或数据操作......
mutex.unlock();
do
{
named_mutex.lock();
defer named_mutex.unlock(); // 通过defer语句在代码块结束时自动释放锁
// 做一些所保护的逻辑或数据操作......
};
示例2-1:声明与使用互斥锁简单示例
需要小心使用的 Mutex
如果有其它语言的编程经验,就知道互斥锁使用时必须遵循如下两点:
- 在使用数据前必须先获取锁
- 在数据使用完成后,必须及时释放锁,比如最初的例子中,使用defer语句块就是为了保证及时的释放锁
这两点看起来不起眼,但要正确的使用,其实是相当不简单的,忘记释放锁是经常发生的事情...。
2.4 并发编程
在之前的只读和并行(readonly and parallel)章节中列出的数据竞争问题我们就可以使用锁来解决,通过锁来保证多个并行协程对计数器 读->操作->写入的原子性,示例如下:
#pragma parallel
parallel int pa_num := 0;
parallel sync_object counter_mutex := sync_object.create_mutex();
void add_pa()
{
for(int i = 0; i < 10000; i++)
{
counter_mutex.lock();
pa_num := pa_num + 1;
counter_mutex.unlock();
}
}
void test()
{
array co_list = [];
for(int i = 0 upto 7)
{
// 开启8个并行协程为 parallel 变量做加法
co_list.push_back(coroutine.create_with_domain(nil, domain.create(), (:add_pa:)));
}
// 等待所有协程结束
for(coroutine co:co_list)
{
co.wait();
}
printf("Parallel number after add is %d\n", pa_num);
}
test();
示例2-2:并行计数使用锁保护示例
输出结果如下:
Parallel number after add is 80000
可以看到我们通过锁保证了计数器读取->计数器加法->计数器写回操作的原子性,我们得到了正确的计数结果80000如果我们移除锁的保护,并行计数结果就会远小于目标值80000了。
2.5 死锁
死锁是并发编程中最常见且最棘手的问题之一。它发生在两个或多个协程互相等待对方持有的资源,导致所有相关协程 都无法继续执行。典型的死锁场景(哲学家就餐问题):
#pragma parallel
parallel sync_object chopstick1 := sync_object.create_mutex();
parallel sync_object chopstick2 := sync_object.create_mutex();
void philosopher_A() {
chopstick1.lock(); // 拿到筷子1
coroutine.sleep(0.1); // 模拟一些操作
chopstick2.lock(); // 等待筷子2(但可能被B拿着)
// 就餐...
printf(HIG"Unlock all\n"NOR);
chopstick2.unlock();
chopstick1.unlock();
}
void philosopher_B() {
chopstick2.lock(); // 拿到筷子2
coroutine.sleep(0.1); // 模拟一些操作
chopstick1.lock(); // 等待筷子1(但被A拿着)
// 就餐...
printf(HIG"Unlock all\n"NOR);
chopstick1.unlock();
chopstick2.unlock();
}
void test_deadlock() {
coroutine co1 = coroutine.create_with_domain(0, domain.create(), (:philosopher_A:));
// coroutine.sleep(0.2); // 模拟极端情况下协程创建或执行延迟
coroutine co2 = coroutine.create_with_domain(0, domain.create(), (:philosopher_B:));
co1.wait();
co2.wait();
write(HIG"run success\n"NOR);
}
test_deadlock();
示例2-3:死锁问题
运行上述示例,死锁问题就极大概率发生。但不是必然发生的,部分极端情况下,由于协程的初始化顺序和执行速度并不确定,我们无法确定哪个线程中的锁先被执行,因此也无法确定两个线程对锁的具体使用顺序。有可能协程co1入口函数全部执行完成,其占用的所有锁均被释放后,协程co2的入口函数才开始执行,这种情况下死锁不会发生。
当死锁发生的情况下协程 1 锁住了chopstick1并且线程2锁住了chopstick2,然后线程 1 试图去访问chopstick2,同时线程2试图去访问chopstick1,就会死锁。 因为线程 2 需要等待线程 1 释放chopstick1后,才会释放chopstick2,而与此同时,线程 1 需要等待线程 2 释放chopstick2后才能释放chopstick1,这种情况造成了两个线程都无法释放对方需要的锁,最终死锁。
2.6 死锁的预防与避免
死锁形成有如下的四个必要条件
-
互斥:某种资源一次只允许一个进程访问,即该资源一旦分配给某个进程,其他进程就不能再访问,直到该进程访问结束。
-
占有且等待:一个进程本身占有资源(一种或多种)同时还有资源未得到满足,正在等待其他进程释放该资源。
-
不可抢占:别人已经占有了某项资源,你不能因为自己也需要该资源,就去把别人的资源抢过来。
-
循环等待:存在一个进程链,使得每个进程都占有下一个进程所需的至少一种资源。
在设计锁的使用时破坏上述条件的任意一个即可避免死锁的发生。GS 的域(Domain)通过在跨域时释放原有域破坏了第二个占有且等待条件,所以GS 的域(Domain)保护的资源不会出现死锁问题。
2.6.1 锁顺序一致性
示例2-3中,我们可以确保所有协程以相同的顺序获取锁,避免循环等待。如始终保持先锁chopstick1再锁chopstick2的顺序,示例如下:
#pragma parallel
parallel sync_object chopstick1 := sync_object.create_mutex();
parallel sync_object chopstick2 := sync_object.create_mutex();
void philosopher_A() {
chopstick1.lock(); // 拿到筷子1
coroutine.sleep(0.1); // 模拟一些操作
chopstick2.lock(); // 等待筷子2
// 就餐...
printf(HIG"Unlock all\n"NOR);
chopstick2.unlock();
chopstick1.unlock();
}
void philosopher_B() {
chopstick1.lock(); // 拿到筷子1
coroutine.sleep(0.1); // 模拟一些操作
chopstick2.lock(); // 等待筷子2
// 就餐...
printf(HIG"Unlock all\n"NOR);
chopstick2.unlock();
chopstick1.unlock();
}
void test_deadlock() {
coroutine co1 = coroutine.create_with_domain(0, domain.create(), (:philosopher_A:));
// coroutine.sleep(0.2); // 模拟极端情况下协程创建或执行延迟
coroutine co2 = coroutine.create_with_domain(0, domain.create(), (:philosopher_B:));
co1.wait();
co2.wait();
write(HIG"run success\n"NOR);
}
test_deadlock();
示例2-4:破坏循环等待条件示例
在保证了同样顺序加锁的情况下,死锁不再可能产生,两位哲学家就可以愉快的就餐了。
2.6.2 使用超时机制
可以通过超时释放锁,可以避免永久阻塞的问题。示例如下:
#pragma parallel
parallel sync_object chopstick1 := sync_object.create_mutex();
parallel sync_object chopstick2 := sync_object.create_mutex();
void philosopher_A()
{
bool eat_enough = false;
while(!eat_enough)
{
chopstick1.lock(1); // 拿到筷子1
coroutine.sleep(0.1); // 模拟一些操作
int has_chopstick2 = chopstick2.lock(random(4)); // 等待筷子2(但可能被B拿着)
if(has_chopstick2 != 0)
{
chopstick1.unlock(); // 不吃了,扔掉筷子
printlnf("I won't eat without chopsticks.");
continue;
}
// 就餐...
printf(HIG"Unlock all\n"NOR);
eat_enough = true;
chopstick2.unlock();
chopstick1.unlock();
}
}
void philosopher_B()
{
bool eat_enough = false;
while(!eat_enough)
{
chopstick2.lock(1); // 拿到筷子2
coroutine.sleep(0.1); // 模拟一些操作
int has_chopstick1 = chopstick1.lock(random(4)); // 等待筷子1(但被A拿着)
if(has_chopstick1 != 0)
{
chopstick2.unlock(); // 不吃了,扔掉筷子
printlnf("I won't eat without chopsticks.");
continue;
}
// 就餐...
printf(HIG"Unlock all\n"NOR);
eat_enough = true;
chopstick1.unlock();
chopstick2.unlock();
}
}
void test_deadlock() {
coroutine co1 = coroutine.create_with_domain(0, domain.create(), (:philosopher_A:));
// coroutine.sleep(0.2); // 模拟极端情况下协程创建或执行延迟
coroutine co2 = coroutine.create_with_domain(0, domain.create(), (:philosopher_B:));
co1.wait();
co2.wait();
write(HIG"run success\n"NOR);
}
test_deadlock();
示例2-5:超时机制避免死锁
使用超时机制,当我申请一个资源超时的情况下,释放已持有的所有资源,避免了死锁的产生。只需要循环重试几次,两人都有机会就餐。
2.6.3 defer 解锁
在之前我们已经知道,及时的释放互斥锁资源非常重要,但锁保护的逻辑中,有些可能会报错的逻辑会进入异常处理流程,导致直接跳过后续的锁释放逻辑,锁资源就无法被释放。示例如下:
#pragma parallel
parallel sync_object mutex := sync_object.create_mutex();
void some_may_exception_logic()
{
if(random(8) == 6)
error("Error occured");
else
printf(HIG"Task finished\n"NOR);
}
void lock_and_dosomthing()
{
int lock_result = mutex.lock(4);// 尝试加锁,超时时间4s
if(lock_result != ErrorCode.OK)
{
// 尝试获取锁超时,其他携程占有锁后未及时释放
printlnf(HIR"Who is stuck? Release the lock quickly!!!!"NOR);
return;
}
some_may_exception_logic();// 释放锁资源
mutex.unlock();
}
for(int i = 0 upto 32)
{
coroutine.create_with_domain("co" + i, domain.create(), (:lock_and_dosomthing:));
}
示例2-6:异常处理导致锁无法释放示例
运行实例,在报错发生后等待几秒就有会有其他携程上锁失败的输出信息显示。解锁过程直接被报错+异常处理过程跳过导致了该问题。此时考虑使用defer语句释放锁资源,异常处理在回溯调用栈的过程中会执行相应的defer语句处理。示例如下:
#pragma parallel
parallel sync_object mutex := sync_object.create_mutex();
void some_may_exception_logic()
{
if(random(8) == 6)
{
error("Error occured");
}
else
printf(HIG"Task finished\n"NOR);
}
void lock_and_dosomthing()
{
int lock_result = mutex.lock(4);// 尝试加锁,超时时间4s
defer
{ // defer 处理锁的释放逻辑
if(lock_result == ErrorCode.OK)
mutex.unlock();// 释放锁资源
}
if(lock_result != ErrorCode.OK)
{
// 尝试获取锁超时,其他携程占有锁后未及时释放
printlnf(HIR"Who is stuck? Release the lock quickly!!!!"NOR);
return;
}
some_may_exception_logic();
}
for(int i = 0 upto 32)
{
coroutine.create_with_domain("co" + i, domain.create(), (:lock_and_dosomthing:));
}
示例2-7:defer进行锁释放示例
自行运行示例,从输出结果中可以看到,尽管报错仍在发生,但却再也没有协程因为上锁超时导 致的问题产生了。defer 语句在异常处理流程中也会执行,保证了锁的及时释放。
3. 信号量(semaphore)
3.1 概念
信号量(Semaphore)是一种计数器,用于控制对有限资源的访问。其特性就像一个初始是满的数量固定的停车场,当有车辆驶出give()后导致停车场有空位时,其他车辆才可以take()进入空位。
- 计数机制:记录可用的资源数量。
- 等待队列:当资源不足时,协程进入等待。
- 一对一唤醒:每次
give()只唤醒一个等待的协程。
3.2 创建与操作
信号量可以通过sync_object.create_semaphore(string? name = nil, int max_count = -1)外部函数创建、支持两种创建方式,在max_count最大容量参数不指定时,默认最大容量为0x7FFFFFFF,示例如下:
// 创建匿名信号量,初始计数为0,需要至少give一次后才能进行take操作
sync_object sem = sync_object.create_semaphore();
// 创建命名信号量,指定最大容量为8
sync_object named_sem = sync_object.create_semaphore("resource_sem", 8);
// 释放信号量许可
sem.give();
// 获取信号量许可
sem.take();
// 获取信号量许可超时,
writeln(sem.take(1) == ErrorCode.WAIT_ERR_TIMEOUT);
基本操作:
take(int|real wait_time = -1):获取信号量许可,如果计数为0则阻塞等待。超时返回give():释放信号量许可,增加计数并唤醒一个等待协程。
3.3 使用示例
一个简单的以用户登入登出为例的使用示例如下:
// 创建一个容量为2,且初始为满的服务器
#pragma parallel
#define MAX_SERVER_PLAYER_NUM 2
parallel sync_object sem := sync_object.create_semaphore("server", MAX_SERVER_PLAYER_NUM);
void log_in(int i)
{
sem.take(); //获取信号量许可
printlnf(HIG "Player %d log in."NOR, i);
coroutine.sleep(random(8));
sem.give();
printlnf(HIY"Player %d log out."NOR, i);
}
void init_server()
{
// 初始化,服务器准备玩家登入资源
for(int i = 0; i < MAX_SERVER_PLAYER_NUM; i++)
{
sem.give(); // 释放信号量许可
printlnf("Server resource for player %d is ready", i);
}
}
void login_test()
{
array cos = [];
for(int i = 0 upto 8)
{
cos.push_back(coroutine.create_with_domain("user"+ i, domain.create(), (:log_in, i:)));
}
for(coroutine co: cos)
{
co.wait();
}
}
init_server();
login_test();
示例3-1:semphore 的创建与使用示例
示例代码创建了一个容量为 2 的信号量作为服务器玩家数量上限,在初始化时准备相应资源并释放信号量许可。之后创建8个玩家协程登录服务器。当正在登入的玩家超过 2 时,剩下的玩家需要等待正在服务器内的玩家登出并减少信号量到 2 以内后,才能登入。这里的关键就在于:信号量的申请和归还。进入服务器前需要take申请信号量,如果没有信号量许可了,就需要等待;使用后需要释放give信号量,以便其它等待者可以继续。
4. 事件(event)
4.1 概念
事件(Event)用于协程间的通知机制,允许一个协程通知多个等待协程某个条件已经满足。与信号量不同,事件是"广播式"的——一次触发唤醒所有等待者。学校下课铃——铃声一响(raise),所有班级的学生(等待协程)同时开始活动。
核心特性:
- 广播机制:一次触发唤醒所有等待协程。
- 状态记忆:事件触发后,后续的等待者会立即继续执行。
- 手动重置:需要通过
reset()方法清除事件状态。
4.2 创建与操作
事件通过create_event(string? name = nil)创建:
// 创建匿名事件
sync_object evt = sync_object.create_event();
// 创建命名事件
sync_object named_evt = sync_object.create_event("game_start");
// 事件尚未触发,等待超时,返回 ErrorCode.WAIT_ERR_TIMEOUT
writeln(evt.wait(1) == ErrorCode.WAIT_ERR_TIMEOUT);
// 触发事件
evt.raise();
// 事件已触发,事件等待不会再被阻塞或超时
writeln(evt.wait(1) == ErrorCode.OK);
writeln(evt.wait() == ErrorCode.OK);
// 重置事件至未触发状态
evt.wait();
基本操作:
sync_object.wait(int|real wait_time = -1):等待事件触发 ,如果事件已触发则立即返回,超时等待也返回。sync_object.raise():触发事件,唤醒所有等待协程。sync_object.reset():重置事件状态,使后续等待者阻塞。
4.3 使用示例
一个简单的事件使用示例如下,我们使用事件来通知所有的玩家游戏开始:
// 游戏场景:多个玩家等待游戏开始信号,然后同时开始游戏
void player(string player_name, sync_object game_start_event, sync_object player_ready_sem)
{
coroutine.sleep(random(4)); // 模拟准备过程
printf("%s player ready...\n", player_name);
// 通知主协程玩家已准备好
player_ready_sem.give();
// 等待游戏开始信号
game_start_event.wait();
// 游戏开始后的逻辑
printf(HIG"%s game start!\n"NOR, player_name);
coroutine.sleep(random(4)); // 模拟游戏过程
printf(HIY"%s game over!\n"NOR, player_name);
}
void test_game_start()
{
sync_object game_start = sync_object.create_event(); // 游戏开始事件
sync_object players_ready = sync_object.create_semaphore(); // 玩家准备就绪信号量
array player_names = ["Warrior", "Mage", "Rogue", "Priest", "Hunter"];
array co_list = [];
printf("=== Game preparation ===\n");
// 创建所有玩家协程
for(string name:player_names)
{
co_list.push_back(coroutine.create(0, (: player, name, game_start, players_ready :)));
}
// 等待所有玩家准备就绪
printf("Waiting for all players to prepare...\n");
for(int i = 0; i < player_names.length(); i++) {
players_ready.take();
}
printf("All players prepared!\n");
printf("=== Game start! ===\n");
game_start.raise(); // 广播游戏开始信号
// 等待所有玩家游戏结束
for(coroutine co:co_list) {
co.wait();
}
printf(HIC"=== game over ===\n"NOR);
}
test_game_start();
示例4-1:event事件使用示例
这个示例重点演示了事件(event)在多线程/协程同步中的核心作用。事件对象game_start_event在这里扮演着全局触发器的关键角色,其工作流程表现为:
- 阻塞等待机制:所有玩家协程执行到
game_start_event.wait()时会被挂起,形成同步屏障。此时5个玩家线程全部进入等待状态,就像运动员在起跑线等待发令枪。 - **广播唤醒:**当主线程调用
game_start.raise()时,事件对象会一次性唤醒所有等待的玩家线程(类似发令枪响)。这是事件最核心的特性——一对多广播机制,与信号量/互斥锁的点对点通信形成鲜明对比。 - 无竞争触发:由于事件唤醒操作不涉及共享数据修改,完全避免了传统锁机制带来的竞态条件风险。玩家线程被唤醒后直接从等待处继续执行,不需要重新获取锁。
5 拓展阅读
更多有关同步对象的内容或细节信息请参照以下文档:
6 总结
本章详细介绍了 GS 语言中三种核心同步对象:互斥锁(Mutex)、信号量(Semaphore)和事件(Event),它们是构建安全、可靠并发程序的基石。
三种同步对象各有其核心应用场景。互斥锁 (Mutex) 主要用于提供对共享资源的互斥访问,解决数据竞争问题,保证临界区代码的原子性;使用时需严格遵循“加锁后必解锁”原则,并警惕死锁,可通过保持锁顺序、使用超时机制和defer语句来避免。信号量 (Semaphore) 作为一个计数器,用于精确控制对有限资源(如连接池、任务队列)的访问,通过take和give操作管理资源许可数量。而事件 (Event) 则提供广播式的通知机制,用于协调多个协程基于某个条件进行同步,一次raise操作可唤醒所有等待者,适用于“一对多”的场景,如游戏开始或系统初始化完成事件。