定时器系统和层级时间轮技术细节和linux下常见的定时器相关接口
常见的定时器相关接口 / 创建epoll循环驱动的定时器
任何定时器系统,不管是任务队列型定时器,还是时间轮定时器系统,都必然需要一个底层的待回调的高精度的定时器作为驱动。这里给出在Linux下的基于事件循环驱动的毫秒级定时器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <functional>
// 定时器任务回调类型
using TimerCallback = std::function<void()>;
// 定时器任务 首次触发时间,重复间隔,回调函数
void run_ms_timer(int initial_msec, int interval_msec, TimerCallback cb) {
int epfd = epoll_create1(0);
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec new_value = {};
new_value.it_value.tv_nsec = initial_msec * 1000000; // 毫秒转纳秒
new_value.it_interval.tv_nsec = interval_msec * 1000000; // 毫秒转纳秒
timerfd_settime(tfd, 0, &new_value, NULL);
struct epoll_event ev = {};
ev.events = EPOLLIN;
ev.data.fd = tfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);
while (1) {
struct epoll_event events[1];
int nfds = epoll_wait(epfd, events, 1, -1);
if (nfds > 0 && events[0].data.fd == tfd) {
uint64_t expirations;
read(tfd, &expirations, sizeof(expirations));
cb(); // 回调
}
}
close(tfd);
close(epfd);
}
在给出window系统下的定时器实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 首次触发时间,重复间隔,回调函数
void run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb) {
HANDLE hTimer = CreateWaitableTimer(NULL, FALSE, NULL);
if (!hTimer) return;
LARGE_INTEGER liDueTime;
liDueTime.QuadPart = -initial_msec * 10000LL; // 负值表示相对时间,单位100纳秒
// 设置定时器
SetWaitableTimer(hTimer, &liDueTime, interval_msec, NULL, NULL, FALSE);
while (1) {
DWORD dwRet = WaitForSingleObject(hTimer, INFINITE);
if (dwRet == WAIT_OBJECT_0) {
cb(); // 回调
}
}
CloseHandle(hTimer);
}
堆任务定时器
就是在循环定时器的基础上,添加维护底层任务队列的功能。在任务定时器的底层定时器触发时,检查过期的任务并执行它们。优先级队列按照任务的到期时间进行排序。触发精度是毫秒级的。
首先定义任务结构体:
1
2
3
4
5
struct Task {
uint16_t tid;
uint64_t timestamp;
TimerCallback callback;
};
- tid: 标记任务,用于懒删除任务
- timestamp: 任务的到期时间戳
- callback: 任务的回调函数
然后定义我们的任务处理系统:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 定时器任务回调类型
using TimerCallback = std::function<void()>;
HANDLE run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb);
std::function<bool(const Task&, const Task&)> CMP = [](const Task& a, const Task& b) {
return a.timestamp > b.timestamp;
};
class TimerQueue {
public:
TimerQueue() : task_queue(CMP), next_task_id(1) {}
~TimerQueue() = default;
void start() {
timer_handle = run_ms_timer_win(1, 1, [this](){ update(); });
}
void update() {
auto now = GetTickCount();
while (!task_queue.empty()) {
Task task = task_queue.top();
if (task.timestamp <= now) {
task.callback();
task_queue.pop();
} else {
break;
}
}
}
// 添加定时任务 : 毫秒级
uint64_t add_task(uint64_t delay_ms, TimerCallback cb) {
uint64_t tid = next_task_id++;
uint64_t timestamp = GetTickCount() + delay_ms;
task_queue.push({ tid, timestamp, cb });
return tid;
}
private:
uint64_t next_task_id;
HANDLE timer_handle;
std::priority_queue<Task, std::vector<Task>, decltype(CMP)> task_queue;
};
// 首次触发时间,重复间隔,回调函数
HANDLE run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb) {
HANDLE hTimer = CreateWaitableTimer(NULL, FALSE, NULL);
if (!hTimer) return NULL;
LARGE_INTEGER liDueTime;
liDueTime.QuadPart = -initial_msec * 10000LL; // 负值表示相对时间,单位100纳秒
// 设置定时器
SetWaitableTimer(hTimer, &liDueTime, interval_msec, NULL, NULL, FALSE);
while (1) {
DWORD dwRet = WaitForSingleObject(hTimer, INFINITE);
if (dwRet == WAIT_OBJECT_0) {
cb(); // 回调
}
}
return hTimer;
}
层级时间轮定时器任务系统
详细代码可以参见:层级时间轮定时器实现
这里讲下一些关键的实现。
首先我们要健壮一下我们的定时器任务结构,使其支持取消、重复执行的功能:
1
2
3
4
5
6
7
8
9
10
11
struct TimerTask {
using TimerCallback = std::function<void()>;
TimerTask(size_t i, size_t ms, size_t fr, TimerCallback cb, bool repeat) : interval(i), timeout_ms(ms), timeout_frame(fr), callback(cb), is_repeat(repeat) {}
size_t interval = 0; // 任务间隔
size_t timeout_ms = 0; // 超时时间(毫秒)
size_t timeout_frame = 0; // 超时时间(帧数)
TimerCallback callback;
bool is_repeat = false;
bool is_canceled = false;
};
这里讲下帧数:帧数实际上就是时间轮中的槽代表的时间长度,用于计算某一时间点属于第几帧;
一般的层级时间轮共有三个时间轮组成,每个时间轮由固定大小的槽组成的环;时间轮中含有一个当前索引,指向下一个要处理的槽。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TimeWheel {
public:
struct Wheel {
using Slot = std::vector<TimerTask*>;
size_t cur_slot_index = 0; // 当前槽索引
std::vector<Slot> slots;
};
private:
size_t cur_frame_ = 0; // 当前计时器运行到的帧数(tick次数),用于记录时间进度
size_t interval_ = 33; // 每一帧的时间间隔,单位为毫秒(即每33ms触发一次tick)
size_t max_slots_ = 1024; // 每个时间轮的槽数量,决定了能管理的最大定时长度
int next_timer_id_ = 0; // 下一个分配的定时器ID,用于唯一标识每个定时任务
Wheel wheels_[WHEEL_COUNT]; // 多级时间轮结构体数组,用于高效管理大量定时任务
std::unordered_map<int, TimerTask*> id_2_timer_task_; // 定时器ID到定时任务对象的映射,方便查找和删除定时任务
size_t left_ms_ = 0; // 上一次tick后剩余未处理的毫秒数,用于时间补偿
size_t ms_since_start_ = 0; // 计时器启动以来累计经过的毫秒数
std::chrono::steady_clock::time_point last_update_clock_; // 上一次调用update的时间点,用于计算实际经过的时间
};
下面讲下timewheel工作的几个重要函数:
update:外部定时器触发函数。在高精度的定时器系统中,外部定时器触发的时间间隔不一定就是33ms,可能会因为系统调度等原因产生抖动。因此,我们需要在update函数中进行时间补偿,确保每个定时任务都能在正确的时间点被触发。时间补偿的机制是这样的,我们需要记录待处理的时间长度 left_ms,并在left_ms内按每帧去处理定时任务,直到left_ms不够一帧的长度。
这里给个伪代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void update() {
size_t now = get_now_ms();
size_t duration = last_update_clock_ - now;
left_ms_ += duration;
last_update_clock_ = now;
while (left_ms_ >= interval_) {
left_ms_ -= interval_;
cur_frame_++; // 更新当前帧数
tick();
}
last_update_clock_ = now;
}
tick: 每一帧 / 33ms 到来时的处理函数。在tick函数中,我们要更新时间轮的当前指针,槽任务的移动,以及执行到期的定时任务。对于一级时间轮内的槽,我们需要遍历当前槽中的所有定时任务,并检查它们是否到期。如果到期,就执行相应的回调函数,并将任务移除。对于高级时间轮的槽,我们需要将其任务取出,并移动到低级的时间轮种。这是tick函数的工作内容。
注意只有当低级的时间轮的指针走了一圈,也就是指向0时,才会触发下一层时间轮的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void tick() {
for(int i = 0; i < WHEEL_COUNT; i++) {
Wheel& wheel = wheels_[i];
wheel.cur_index = (wheel.cur_index + 1) % max_slots_;
if (i < 1) {
run_tasks(index, wheel.cur_index);
} else {
move_tasks(index, wheel.cur_index);
}
if (wheel.cur_index == 0) {
// 触发下一层时间轮的处理
continue;
}
}
}
run_tasks: 处理当前时间轮槽中的所有定时任务,并执行到期的回调函数。注意下细节,每个槽代表一段时间,也就是内部的任务是有先后顺序的,我们在调用前先排序然后再执行。任务分为非重复任务和重复任务两种,对于重复任务,在执行完后,要计算其下一次的槽的位置,并放入该槽中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void run_tasks(int wheel_index, int slot_index) {
Wheel wheel = wheels[wheel_index];
Slot slot = wheel.slots[slot_index];
sort(slot.begin(), slot.end(), [](TimerTask* a, TimerTask* b) {
return a->timeout_ms < b->timeout_ms
})
for(auto & task : slot.tasks) {
if (task.is_cancel) {
delete task;
continue;
}
task->callback();
if (task->repeat) {
size_t new_wheel_index, new_slot_index;
size_t offset = max((interval_ / 33), 1);
calculate_next_slot(offset, new_wheel_index, new_slot_index);
wheels[new_wheel_index].slots[new_slot_index].push_back(task);
} else {
delete task;
}
}
slot.clear();
}
move_tasks:将高级时间轮中指定槽的任务重新分配到低级时间轮中,就是将槽中的任务重新计算所属的时间轮和槽。
1
2
3
4
5
6
7
8
9
10
11
12
void move_tasks(int wheel_index, int slot_index) {
Wheel wheel = wheels[wheel_index];
Slot slot = wheel.slots[slot_index];
for(auto task : slot.tasks) {
size_t new_wheel_index, new_slot_index;
size_t offset = max((interval_ / 33), 1);
calculate_next_slot(offset, new_wheel_index, new_slot_index);
wheels[new_wheel_index].slots[new_slot_index].push_back(task);
}
}
calculate_next_slot: 非常重要的函数,用于计算距离当前帧offset帧的槽的位置。计算逻辑是这样的,一格高级的时间轮的槽代表低级时间轮所有的槽数量;从低级的时间轮向高级的时间轮转换时,offset要除以低级时间轮的总槽数;下面是代码:
```cpp bool calculate_next_slot(size_t offset, size_t& new_wheel_index, size_t& new_slot_index) { new_wheel_index = 0; new_slot_index = 0;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int i = 0; i < WHEEL_COUNT; i++) {
Wheel& wheel = wheels_[i];
size_t slots_count = wheel.slots.size();
if (slots_count >= max_slot_ && i < WHEEL_COUNT - 1) {
offset /= slots_count;
} else {
new_wheel_index = i;
break;
}
}
if (offset > final_max_slots_count) return false;
new_slot_index = (wheels[i].cur_index + offset) % wheels[i].slot_size();
return true; }
说到这里,核心的方法已经讲完了,剩下一些基本的操作,比如添加定时任务,删除定时任务,逻辑比较简单,就是调用上面的方法去计算新任务的位置然后添加仅需即可。
定时器的运行环境
定时器可以和IO多路复用结合,实现非阻塞式的定时系统。将定时器注册进事件循环中,和网络IO事件共享一个事件循环,就可以具备单核运行能力。如果要多核,就需要单独开一个线程,但此时就存在资源竞争的问题,需要对任务队列的访问进行加锁同步。