任务队列设计与实现
如果读者所在的领域是后端,应该对消息队列并不陌生。存在这样一种队列,它内部也是消息队列,但会对消息执行处理函数,这样的队列就被称为任务队列。
为什么要在本篇文章介绍任务队列呢?可能大部分读者认为任务队列的作用就是提供异步执行的手段,其实并不然,任务队列甚至可以成为系统并发机制的核心。这并不是本篇文章介绍的重点,笔者会在之后的文章中介绍如何依托任务队列来实现系统的并发机制。
任务队列
先介绍任务队列的定义,任务队列是FIFO的数据结构,底层使用环形队列来实现。每个任务队列会绑定一个消息处理函数,当环形队列中存在消息时,任务队列会pop消息并对消息执行处理函数。
我们不希望任务队列的任务执行通过while循环的方式来展现,这样会产生一个计算密集的协程,在没有任务时也会浪费CPU资源。因此任务队列的任务执行是通过外部push来主动触发的。当一个协程向任务队列push消息时,如果任务队列不处于活动状态(存在协程处理消息),任务队列会创建一个新协程来执行任务,在执行任务的期间,有其他的协程向任务队列push消息时,这个协程会继续执行任务,直到任务队列为空。
这里处理函数并不一定只有一个协程,可以设定协程池来处理队列中的任务,读者自由发挥即可。但是协程池的数量不是越多越好的,具体原因这里不做过多解释。
如果有经验丰富的读者读到这里会发现并发问题,没错,由于队列是多协程共享的,存在并发问题。
队列存在两个状态,running态和idle态。running态表示队列中存在协程在处理消息,idle态表示队列中没有协程在处理消息。假设有两个协程A和B几乎同时向任务队列push消息,A协程调用push,检查队列状态为idle并创建一个新协程来处理消息,B协程也调用push,检查队列状态为idle并创建一个新协程来处理消息。这样就会导致两个协程都创建了新的处理协程,造成资源浪费。
下面是无锁解决这个问题的方案,将状态原子化。通过go中的sync.atomic来创建原子变量,这样就可以保证状态的原子性。
1
2
atomic.CompareAndSwapInt32(&t.state, idle, running)
atomic.CompareAndSwapInt32(&t.state, running, idle)
这样就不会出现线程读到过期的idle状态。本质上就是读改的原子化。
实现
接下来来介绍实现。
队列大致由三个部分组成:环形队列、任务函数、状态。
1
2
3
4
5
6
7
8
9
10
11
const (
idle = iota // 空闲状态
running // 运行状态
)
type FuncQueue[T] struct {
state int32
count uint32
queue *ring.queue[T] // 环形队列 [可以自己实现或者选择成熟的库]
f func(msg T) // 任务函数
}
任务队列提供的接口就一个:Dispatch方法。向队列推送消息并尝试唤醒任务协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (fq *FuncQueue) Dispatch(msg T) {
queue.push(msg)
atomic.AddInt32(&fq.count, 1)
fq.dispatch()
}
func (fq *FuncQueue) dispatch() {
if atomic.CompareAndSwapInt32(&fq.state, idle, running) {
go fq.run()
}
}
func (fq *FuncQueue) run() {
begin:
processHandle()
atomic.CompareAndSwapInt32(&fq.state, running, idle)
usercount := atomic.LoadInt32(&fq.count)
if usercount > 0 {
if atomic.CompareAndSwapInt32(&fq.state, idle, running) {
goto begin
}
}
}
func (fq *FuncQueue) processHandle() {
var msg any
var ok bool
for {
if msg, ok := fq.queue.pop(); ok == nil {
atomic.AddInt32(&fq.count, -1)
if msg, ok := msg.(T); ok {
defer func(){
if reason := recover(); reason != nil {
// 处理panic,记录日志或其他操作
}
}()
fq.f(msg)
}
} else {
return
}
}
}
下面来介绍一下实现的细节。
读者可能会对方法中大量的原子操作表示不理解,这些原子操作都是为了保证仅有一个协程处理队列中的消息。为什么一定要保证只有一个协程来处理队列消息呢?这是因为这样的话,队列的执行环境就是同步的,没有并发问题。这也是使用这样的队列实现系统并发机制的核心。
处理完队列中的消息后,原子的检查队列消息的数量,如果数量大于0(说明处理完队列消息后存在其他协程向队列推送消息),则继续原子检查队列的状态(因为此时可能有别的协程去启动队列的执行状态),原子的替换状态为running,这个时候就可以放心地继续处理队列中的消息了。上述两个条件存在一个不为真,说明无需处理或者有已经有协程在处理,这是就可以直接返回了。
讲到这里,弹性创建单协程处理的任务队列的实现机制读者应该就非常熟悉了,无锁技术是比较复杂的难处理的技术,读者要明晰无锁技术的核心是对变量的原子操作(读和写不可打断,原子化)。