可休眠的并发安全的任务队列设计与实现
如果读者所在的领域是后端,应该对消息队列并不陌生。存在这样一种队列,它内部也是消息队列,但会对消息执行处理函数,这样的队列就被称为任务队列。
为什么要在本篇文章介绍任务队列呢?可能大部分读者认为任务队列的作用就是提供异步执行的手段,其实并不然,任务队列甚至可以成为系统并发机制的核心。这并不是本篇文章介绍的重点,笔者会在之后的文章中介绍如何依托任务队列来实现系统的并发机制。
在介绍如何实现可休眠的任务队列之前,我们需要先了解无锁的环形队列如何实现,和一般队列不同,无锁的环形队列在推入和弹出元素的过程的线程安全的。
无锁环形队列
无锁的环形队列的关键点在于使用原子操作来保证推入和弹出元素的线程安全。我们知道,环形队列写入数据和读取数据的关键点在于头指针和尾指针的移动,如果这两个指针的移动是并发安全的,那么数据的写入和读取就可以在多线程环境中安全进行。指针移动带来的临界区肯定比写入数据带来的临界区要小。
1
2
3
4
5
6
7
8
9
10
import (
"sync/atomic"
)
type FIFO[T any] struct {
head atomic.Uint64
tail atomic.Uint64
buffer []T
size uint64
}
所以我们的无锁队列的数据结构是这样的,下面我们来讲解下push和pop这两个关键函数。
pop
首先我们获取head和buffer头部值,然后拷贝一份buffer头部值,最后尝试原子操作head后移一位,如果失败回滚重新尝试;
1
2
3
4
5
6
7
8
9
10
11
12
13
func (f *FIFO[T]) pop() (T, error) {
var curHead uint64
var popValue T
do {
curHead = f.head.Load()
if curHead == f.tail.Load() {
return nil, errors.New("queue is empty")
}
popValue = f.buffer[curHead]
} while(!f.head.CompareAndSwap(curHead, (curHead + 1)%size))
return popValue
}
push
push操作也类似,先判断是否又空位置,如果有获取tail,然后尝试后移一位tail,失败则回滚获取的tail继续尝试
1
2
3
4
5
6
7
8
9
10
11
12
func (f *FIFO[T]) push(value T) error {
var curTail uint64
do {
curTail = f.tail.Load()
if (curTail + 1) % size == f.head.Load() {
return errors.New("queue is full")
}
} while(!f.tail.CompareAndSwap(curTail, (curTail + 1)%size))
f.buffer[curTail] = value
return nil
}
任务队列
了解无锁队列实现后,再介绍任务队列的定义,任务队列是FIFO的数据结构,底层使用无锁的环形队列来实现。每个任务队列会绑定一个消息处理函数,当存在线程向任务队列push消息时,会尝试开启线程执行任务或者推入消息让工作中的线程处理 。
我们不希望任务队列的任务执行通过while循环的方式来展现,这样会产生一个计算密集的协程,在没有任务时也会浪费CPU资源。因此任务队列的任务执行是通过外部push来主动触发的。当一个协程向任务队列push消息时,如果任务队列不处于活动状态(存在协程处理消息),任务队列会创建一个新协程来执行任务,在执行任务的期间,有其他的协程向任务队列push消息时,这个协程会继续执行任务,直到任务队列为空。
这里处理函数并不一定只有一个协程,可以设定协程池来处理队列中的任务,读者自由发挥即可。但是协程池的数量不是越多越好的,具体原因这里不做过多解释。
如果有经验丰富的读者读到这里会发现并发问题,没错,由于队列是多协程共享的,存在并发问题。如何保证只有一个协程在处理队列中的消息呢?
队列存在两个状态,running态和idle态。running态表示队列中存在协程在处理消息,idle态表示队列中没有协程在处理消息。假设有两个协程A和B几乎同时向任务队列push消息,A协程调用push,检查队列状态为idle并创建一个新协程来处理消息,B协程也调用push,检查队列状态为idle并创建一个新协程来处理消息。这样就会导致两个协程都创建了新的处理协程,造成资源浪费。
下面是无锁解决这个问题的方案,将状态原子化。通过go中的sync.atomic来创建原子变量,这样就可以保证状态的原子性。
1
2
atomic.CompareAndSwapInt32(&t.state, idle, running)
atomic.CompareAndSwapInt32(&t.state, running, idle)
这样就不会出现线程读到过期的idle状态。本质上就是读改的原子化。
实现
接下来来介绍实现。
队列大致由三个部分组成:环形队列、任务函数、状态。
1
2
3
4
5
6
7
8
9
10
11
const (
idle = iota // 空闲状态
running // 运行状态
)
type FuncQueue[T] struct {
state int32
count uint32
queue *ring.queue[T] // 环形队列 [可以自己实现或者选择成熟的库]
f func(msg T) // 任务函数
}
任务队列提供的接口就一个:Dispatch方法。向队列推送消息并尝试唤醒任务协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
type FuncQueue[T any] struct {
state atomic.Int32
count atomic.Int64
queue *Ring[T]
fn func(T)
}
const (
idle int32 = iota
running
)
func (q *FuncQueue[T]) Dispatch(v T) error {
if err := q.queue.Push(v); err != nil {
return err
}
q.count.Add(1)
q.tryStart()
return nil
}
func (q *FuncQueue[T]) tryStart() {
if q.state.CompareAndSwap(idle, running) {
go q.run()
}
}
func (q *FuncQueue[T]) run() {
for {
q.drain()
q.state.CompareAndSwap(running, idle)
// 再检查是否有新任务(竞争窗口内的突发任务)
if q.count.Load() > 0 && q.state.CompareAndSwap(idle, running) {
continue
}
return
}
}
func (q *FuncQueue[T]) drain() {
for {
v, ok := q.queue.Pop()
if !ok {
return
}
q.count.Add(-1)
func(msg T) {
defer func() {
if r := recover(); r != nil {
// 记录日志
}
}()
q.fn(msg)
}(v)
}
}
下面来介绍一下实现的细节。
读者可能会对方法中大量的原子操作表示不理解,这些原子操作都是为了保证仅有一个协程处理队列中的消息。为什么一定要保证只有一个协程来处理队列消息呢?这是因为这样的话,队列的执行环境就是同步的,没有并发问题。这也是使用这样的队列实现系统并发机制的核心。
处理完队列中的消息后,原子的检查队列消息的数量,如果数量大于0(说明处理完队列消息后存在其他协程向队列推送消息),则继续原子检查队列的状态(因为此时可能有别的协程去启动队列的执行状态),原子的替换状态为running,这个时候就可以放心地继续处理队列中的消息了。上述两个条件存在一个不为真,说明无需处理或者有已经有协程在处理,这是就可以直接返回了。
讲到这里,弹性创建单协程处理的任务队列的实现机制读者应该就非常熟悉了,无锁技术是比较复杂的难处理的技术,关键在于使用CAS语义对共享资源进行并发控制,读者要明晰无锁技术的核心是对变量的原子操作(读和写不可打断,原子化)。