队列(queue)
1. 概述
本文档系统性地介绍了 GS 语言中的核心同步数据结构— —队列(queue)。队列作为一种线程安全的先进先出(FIFO)机制,是 GS 并发编程中实现数据传递、任务调度和跨域通信的关键工具。文档内容涵盖从队列的基本概念、创建方法、基本操作(发送、接收、查看)到其核心的同步特性,并通过一个完整的生产者-消费者示例演示了队列在实际并发场景中的应用。
本文档适用于所有 GS 语言开发者,特别是那些需要进行多协程并发编程、设计异步任务系统或实现跨域通信的开发者。
通过阅读本文档,您将能够理解队列在 GS 语言中的概念、特性及其重要性。掌握队列的创建、数据发送(send_raw/send_dup)、数据接收(receive)和数据查看(peek)等核心操作。深刻理解队列的同步机制及其如何协调不同协程间的执行。学会使用队列构建经典的生产者-消费者模型,实现高效的并发程序。
2. 基础概念
2.1 队列的概念
队列(queue)是GS语言中重要的同步数据结构,它提供了一种线程安全的先进先出(FIFO)数据存储和通信机制。作为派生自handle的类型,队列内置同步锁,广泛应用于协程间的数据传递、任务调度和跨域通信场景。
队列是一种先进先出(First-In-First-Out, FIFO)的线性数据结构,类似于现实生活中的排队场景。在GS中,队列不仅具备传统队列的基本特性,还集成了同步机制,使其成为并发编程中的重要工具。
核心特征:
- 数据从队尾进入,从队头取出。
- 保证元素的处理顺序与到达顺序一致。
- 内置线程安全机制,支持多协程并发或并行访问。
2.2 队列的创建
GS 中的队列通过外部函数queue.create(string name, int size = 0)创建,基本语法如下:
queue queue_handle = queue.create("queu_name", size);
参数说明:
queue_name:字符串类型,队列名称用于标识队列(可选)size:整数类型,指定队列能够容纳的最大元素数量- 在不指定
size时,默认的队列大小为(1 << 24) - 1
- 在不指定
创建示例:
// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);
// 创建默认大小的队列
queue temp_queue := queue.create("temp_queue");
示例2-1:队列的创建
3. 队列的基本操作
队列的基本操作分为三类:数据发送 send、数据接收 receive、数据查看 peek。
3.1 数据发送
队列的数据发送主要分为两种类型,发送初始数据及拷贝数据:
queue.send_dup(mixed val):发送拷贝数据至队列尾部。- 数据的发送域与接收域可能不同时使用。数据仅在发送时进行拷贝,接收时拷贝数据所有权转移至接收域。
queue.send_raw(mixed val):发送初始数据至队列尾部,数据在原域中。- 数据的发送域与接收域必须相同。
示例如下:
// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);
map data = {"data":0};
map data1 = {"data1":1};
// 将数据发送到队列中, 数据无需复制,仍保留在原域中
msg_queue.send_raw(data);
// 将数据发送到队列中, 数据拷贝
msg_queue.send_dup(data1);
writeln("Receive raw data, test equal result is : ", msg_queue.receive() == data);
writeln("Receive dup data, test equal result is : ", msg_queue.receive() == data1);
示例3-1:队列的发送操作示例
从示例的输出结果可以看到,send_raw发送的引用类型数据接收到的就是原数据,而通过send_dup发送的数据接收到的是原数据的拷贝而不是原数据。
3.2 数据接收
queue.receive(int|real wait_time = -1):接收队列的队首数据,数据会从队列队首被移除。
- 原数据为
send_raw发送时会检查接收域与发送域是否相同。 - 原数据为
send_dup发送时则不需要考虑域不同的问题。 - 队列中无数据时,未设置等待时间的
receive操作会阻塞协程。 - 队列中无数据时,设置了等待时间的
receive超时时返回nil。
队列的接收操作示例如下:
parallel void receive_data(queue q)
{
q.receive();
}
void test()
{
// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);
// 带超时的接收 - 在指定时间内等待数据,超时时返回nil
printf("Queue receive time out return: %O\n", msg_queue.receive(1));
timer.create(4,"Delay queue send",(:msg_queue.send_raw, []:));
// 基本接收 - 队列空导致阻塞,直至延迟8s后数据被发送入队列中
printf(HIG"Coroutine blocking, wait for 4s\n"NOR);
mixed val = msg_queue.receive();
printf("msg_queue received %O\n", val);
// 由 send_raw 发送receive时检查域一致性
msg_queue.send_raw([]);
// 尝试在另一个域里接收 send_raw 发送数据,会报错 Expected receive in owner zero, but current domain is domain[3058944:v0]xxx
coroutine.create_with_domain(nil, domain.create("xxx"), (:receive_data, msg_queue:));
}
test();
示例3-2:队列的接收操作示例
从示例的输出结构可以看到,当队列中为空时,队列的receive会阻塞当前协程,直至队列中有数据可以接收或者receive操作超过设置的等待时间。以及接收由send_raw发送的引用类型数据在接收时会检查数据域与当前域的一致性。
3.3 数据查看
queue.peek(int|real wait_time = -1):查看队列队首数据,与receive的区别在于队首数据不会从队列中移除,以及不会阻塞或等待。
- 原数据为
send_raw发送时会检查接收域与发送域是否相同。 - 原数据为
send_dup发送时则不需要考虑域不同的问题。 - 队列中无数据时,直接返回
nil。
parallel void peek_data(queue q)
{
q.peek();
}
void test()
{
// 创建容量为10的队列
queue msg_queue = queue.create("message_queue", 10);
// 基本接收 - 队列中无数据则直接返回 nil
printf("Queue peek empty queue return: %O\n", msg_queue.peek());
// 由 send_raw 发送,peek时检查域一致性
msg_queue.send_raw([]);
// 尝试在另一个域里查看 send_raw 发送数据,会报错 Error(-1): Got bad element (domain_id = zero) in queue when poping.
coroutine.create_with_domain(nil, domain.create("xxx"), (:peek_data, msg_queue:));
}
test();
示例3-3:队列 peek 操作实例
3.4 其他操作
-
queue_handle.send_raw_n(array_datas):向队列中发送多个原始数据 -
queue_handle.get_queue_size():获取队列中数据数量 -
queue_handle.is_empty():查询缺队列中是否为空,在内置脚本/gs/lang/handle.gs中定义。 -
queue.find("queue_name"):通过队列名称查找队列,在内置脚本/gs/lang/handle.gs中定义。 -
queue_handle.clear(bool shrink = false):清空队列
import gs.lang.handle; // 导入内置脚本定义的队列方法
queue msg_queue = queue.create("message_queue", 10);
// 向队列中发送多个数据
msg_queue.send_raw_n([1,2,3,2]);
// 获取队列中数据数量
printlnf("Current queue size is %d", msg_queue.get_queue_size());
// 查询缺队列中是否为空
printlnf("Current queue is empty? %O", msg_queue.is_empty());
// 通过队列名称查找队列
printlnf("Find queue by name 'message_queue' reuslt is %O", queue.find("message_queue"));
// 清空队列
msg_queue.clear();
printlnf("Cleared queue size is %d", msg_queue.get_queue_size());