跳到主要内容
版本:master

队列(queue)

1. 概述

本文档系统性地介绍了 GS 语言中的核心同步数据结构——队列(queue)。队列作为一种线程安全的先进先出(FIFO)机制,是 GS 并发编程中实现数据传递、任务调度和跨域通信的关键工具。文档内容涵盖从队列的基本概念、创建方法、基本操作(发送、接收、查看)到其核心的同步特性,并通过一个完整的生产者-消费者示例演示了队列在实际并发场景中的应用。

本文档适用于所有 GS 语言开发者,特别是那些需要进行多协程并发编程、设计异步任务系统或实现跨域通信的开发者。

通过阅读本文档,您将能够理解队列在 GS 语言中的概念、特性及其重要性。掌握队列的创建、数据发送(send_raw/send_dup)、数据接收(receive)和数据查看(peek)等核心操作。深刻理解队列的同步机制及其如何协调不同协程间的执行。学会使用队列构建经典的生产者-消费者模型,实现高效的并发程序。

2. 基础概念

2.1 队列的概念

队列(queue)是GS语言中重要的同步数据结构,它提供了一种线程安全的先进先出(FIFO)数据存储和通信机制。作为派生自handle的类型,队列内置同步锁,广泛应用于协程间的数据传递、任务调度和跨域通信场景。

队列是一种先进先出(First-In-First-Out, FIFO)的线性数据结构,类似于现实生活中的排队场景。在GS中,队列不仅具备传统队列的基本特性,还集成了同步机制,使其成为并发编程中的重要工具。

核心特征

  • 数据从队尾进入,从队头取出。
  • 保证元素的处理顺序与到达顺序一致。
  • 内置线程安全机制,支持多协程并发或并行访问。

2.2 队列的创建

GS 中的队列通过外部函数queue.create(string name, int size = 0)创建,基本语法如下:

queue queue_handle = queue.create("queu_name", size);

参数说明

  • queue_name:字符串类型,队列名称用于标识队列(可选)
  • size:整数类型,指定队列能够容纳的最大元素数量
    • 在不指定size时,默认的队列大小为(1 << 24) - 1

创建示例:

// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);

// 创建默认大小的队列
queue temp_queue := queue.create("temp_queue");

示例2-1:队列的创建

3. 队列的基本操作

队列的基本操作分为三类:数据发送 send、数据接收 receive、数据查看 peek。

3.1 数据发送

队列的数据发送主要分为两种类型,发送初始数据及拷贝数据:

  • queue.send_dup(mixed val):发送拷贝数据至队列尾部。
  • 数据的发送域与接收域可能不同时使用。数据仅在发送时进行拷贝,接收时拷贝数据所有权转移至接收域。
  • queue.send_raw(mixed val):发送初始数据至队列尾部,数据在原域中。
  • 数据的发送域与接收域必须相同

示例如下:

// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);

map data = {"data":0};
map data1 = {"data1":1};

// 将数据发送到队列中, 数据无需复制,仍保留在原域中
msg_queue.send_raw(data);

// 将数据发送到队列中, 数据拷贝
msg_queue.send_dup(data1);

writeln("Receive raw data, test equal result is : ", msg_queue.receive() == data);

writeln("Receive dup data, test equal result is : ", msg_queue.receive() == data1);

示例3-1:队列的发送操作示例

​ 从示例的输出结果可以看到,send_raw发送的引用类型数据接收到的就是原数据,而通过send_dup发送的数据接收到的是原数据的拷贝而不是原数据。

3.2 数据接收

queue.receive(int|real wait_time = -1):接收队列的队首数据,数据会从队列队首被移除。

  • 原数据为send_raw发送时会检查接收域与发送域是否相同。
  • 原数据为send_dup发送时则不需要考虑域不同的问题。
  • 队列中无数据时,未设置等待时间的receive操作会阻塞协程。
  • 队列中无数据时,设置了等待时间的receive超时时返回nil

队列的接收操作示例如下:

parallel void receive_data(queue q)
{
q.receive();
}

void test()
{
// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);

// 带超时的接收 - 在指定时间内等待数据,超时时返回nil
printf("Queue receive time out return: %O\n", msg_queue.receive(1));

timer.create(4,"Delay queue send",(:msg_queue.send_raw, []:));

// 基本接收 - 队列空导致阻塞,直至延迟8s后数据被发送入队列中
printf(HIG"Coroutine blocking, wait for 4s\n"NOR);
mixed val = msg_queue.receive();
printf("msg_queue received %O\n", val);

// 由 send_raw 发送receive时检查域一致性
msg_queue.send_raw([]);

// 尝试在另一个域里接收 send_raw 发送数据,会报错 Expected receive in owner zero, but current domain is domain[3058944:v0]xxx
coroutine.create_with_domain(nil, domain.create("xxx"), (:receive_data, msg_queue:));
}
test();

示例3-2:队列的接收操作示例

从示例的输出结构可以看到,当队列中为空时,队列的receive会阻塞当前协程,直至队列中有数据可以接收或者receive操作超过设置的等待时间。以及接收由send_raw发送的引用类型数据在接收时会检查数据域与当前域的一致性。

3.3 数据查看

queue.peek(int|real wait_time = -1):查看队列队首数据,与receive的区别在于队首数据不会从队列中移除,以及不会阻塞或等待。

  • 原数据为send_raw发送时会检查接收域与发送域是否相同。
  • 原数据为send_dup发送时则不需要考虑域不同的问题。
  • 队列中无数据时,直接返回nil
parallel void peek_data(queue q)
{
q.peek();
}

void test()
{
// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);

// 基本接收 - 队列中无数据则直接返回 nil
printf("Queue peek empty queue return: %O\n", msg_queue.peek());

// 由 send_raw 发送,peek时检查域一致性
msg_queue.send_raw([]);

// 尝试在另一个域里查看 send_raw 发送数据,会报错 Error(-1): Got bad element (domain_id = zero) in queue when poping.
coroutine.create_with_domain(nil, domain.create("xxx"), (:peek_data, msg_queue:));
}
test();

示例3-3:队列 peek 操作实例

3.4 其他操作

  • queue_handle.send_raw_n(array_datas):向队列中发送多个原始数据

  • queue_handle.get_queue_size():获取队列中数据数量

  • queue_handle.is_empty():查询缺队列中是否为空,在内置脚本 /gs/lang/handle.gs中定义。

  • queue.find("queue_name"):通过队列名称查找队列,在内置脚本 /gs/lang/handle.gs中定义。

  • queue_handle.clear(bool shrink = false):清空队列

import gs.lang.handle; // 导入内置脚本定义的队列方法
queue msg_queue = queue.create("message_queue", 10);
// 向队列中发送多个数据
msg_queue.send_raw_n([1,2,3,2]);
// 获取队列中数据数量
printlnf("Current queue size is %d", msg_queue.get_queue_size());
// 查询缺队列中是否为空
printlnf("Current queue is empty? %O", msg_queue.is_empty());
// 通过队列名称查找队列
printlnf("Find queue by name 'message_queue' reuslt is %O", queue.find("message_queue"));
// 清空队列
msg_queue.clear();
printlnf("Cleared queue size is %d", msg_queue.get_queue_size());

示例3-4:队列的其他操作示例

4 队列的同步

队列持有同步锁,作为一种消息同步机制广泛应用于跨域操作中。其同步行为体现在:

发送端行为

  • 当队列已满时,发送数据的协程将会挂起并等待队列空间。
  • 直到有其他协程从队列中取出数据或超时后协程恢复。

接收端行为

  • 当队列为空时,接收数据的协程将会挂起并等待数据。
  • 直到有其他协程向队列发送数据或超时后协程恢复。

4.1 实战应用:生产者-消费者模型

一个简单的生产者消费者的并发编程示例如下,用户作为生产者发送登录请求,服务器作为消费者处理登录请求,示例如下:

#pragma parallel
readonly queue _login_queue;

void do_login(array req)
{
string username = req[0];
int user_id = req[1];
write(HIY"user " + username + "(" + user_id + ") start login...\n"NOR);
coroutine.sleep(1); // 模拟登录处理
write(HIG"user " + username + " loging success\n"NOR);
}

void login_daemon(string daemon_name)
{
while(true)
{
array req = _login_queue.receive();
do_login(req);
}
}

void user_daemon(array login_request)
{
string username = login_request[0];
write(HIC"usr " + username + " send login request.\n"NOR);
_login_queue.send_dup(login_request);
}

void setup_login_system()
{
// 创建登录队列,容量为5
_login_queue := queue.create("login_queue", 5);

// 创建2个登录处理协程
for(int i = 0; i < 2; i++)
{
string name = "login_daemon_" + i;
domain d = domain.create(name);
coroutine.create_in_thread(name, d, make_parallel((:login_daemon, name:)));
}

// 创建10个用户协程发送登录请求
for(int i = 0; i < 10; i++)
{
string name = "user_" + i;
domain d = domain.create(name);
array request = [name, i];
coroutine.create_in_thread(name, d, make_parallel((:user_daemon, request:)));
}
}

setup_login_system();

示例4-1:生产者消费者示例 该示例模拟了一个登录系统,其中多个用户(生产者)发送登录请求,而多个登录守护进程(消费者)处理这些请求。

  • 创建了一个容量为5的队列。

  • 创建2个登录处理协程(消费者),每个协程在一个循环中不断从队列中取出请求并处理。

  • 创建10个用户协程(生产者),每个用户协程向队列发送一个登录请求。

队列机制体现

  1. 同步机制
    • 当队列为空时,消费者协程在receive()处阻塞,直到有数据可用。
    • 当队列满时,生产者协程在send_dup()处阻塞,直到有空间可用。 在这个示例中,队列容量为5,而生产者有10个,消费者有2个。因此,当生产者快速生产时,队列会被填满,后续的生产者会阻塞直到消费者处理完一些请求后队列有空间。
  2. 跨域通信
    • 使用send_dup()发送数据,这意味着数据会被拷贝到队列中,因此可以在不同的域(domain)中安全传递。示例中生产者和消费者在不同的域中,所以使用send_dup()是合适的。
  3. 并发与负载均衡
    • 多个消费者协程同时等待同一个队列,当有请求到来时,只有其中一个消费者会取到请求并处理。这样实现了负载均衡,即多个消费者共同处理请求,提高了处理能力。
  4. 生产者-消费者模式
    • 通过队列解耦了生产者和消费者,它们不需要知道彼此的存在,只需要通过队列交互。

5. 拓展阅读

更多队列相关内容与实现细节请参照如下文档:

6. 总结

本章对 GS 语言中的队列进行了全面而深入的剖析。文档始于队列作为 “先进先出”数据结构的基本定义,强调了其作为 “内置同步机制的 handle 派生类型” 在并发编程中的独特价值。核心部分通过清晰的代码示例,详细对比和演示了关键操作:明确了 send_raw(同域传递所有权)send_dup(跨域传递拷贝) 的本质区别及其对数据域一致性的要求;区分了 receive(消费性获取并可能阻塞)peek(非消费性查看) 的不同行为。文档重点阐释了队列最核心的同步特性,即通过自动管理协程在队列空/满时的挂起与恢复,来安全、有序地实现不同执行流间的协作,这构成了其作为消息同步机制的基石。

通过通过一个示例展现了如何利用队列构建生产者-消费者模型,从而实现任务解耦和负载均衡。可以说,熟练且恰当地使用队列,是编写出高效、健壮 GS 并发程序的必备技能。本文档不仅提供了具体的 API 参考,更旨在引导开发者建立正确的并发编程思维模型。