定时器系统和层级时间轮技术细节和linux下常见的定时器相关接口
常见的定时器相关接口 / 创建epoll循环驱动的定时器
任何定时器系统,不管是任务队列型定时器,还是时间轮定时器系统,都必然需要一个底层的待回调的高精度的定时器作为驱动。这里给出在Linux下的基于事件循环驱动的毫秒级定时器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <functional>
// 定时器任务回调类型
using TimerCallback = std::function<void()>;
// 定时器任务 首次触发时间,重复间隔,回调函数
void run_ms_timer(int initial_msec, int interval_msec, TimerCallback cb) {
int epfd = epoll_create1(0);
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec new_value = {};
new_value.it_value.tv_nsec = initial_msec * 1000000; // 毫秒转纳秒
new_value.it_interval.tv_nsec = interval_msec * 1000000; // 毫秒转纳秒
timerfd_settime(tfd, 0, &new_value, NULL);
struct epoll_event ev = {};
ev.events = EPOLLIN;
ev.data.fd = tfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);
while (1) {
struct epoll_event events[1];
int nfds = epoll_wait(epfd, events, 1, -1);
if (nfds > 0 && events[0].data.fd == tfd) {
uint64_t expirations;
read(tfd, &expirations, sizeof(expirations));
cb(); // 回调
}
}
close(tfd);
close(epfd);
}
timerfd 系列函数
1
int timerfd_create(int clockid, int flags);
- 功能:创建一个定时器文件描述符(fd),用于后续定时器操作。
- 参数:
clockid
:指定定时器使用的时钟类型。常用值:CLOCK_REALTIME
:系统实时时钟,受系统时间影响。CLOCK_MONOTONIC
:单调递增时钟,不受系统时间调整影响,推荐用于定时器。
flags
:文件描述符属性。常用值:TFD_NONBLOCK
:非阻塞模式。TFD_CLOEXEC
:执行 exec 时自动关闭 fd。
- 返回值:成功返回定时器 fd,失败返回 -1。
1
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
- 功能:设置定时器的初始超时时间和周期性超时时间。
- 参数:
fd
:由timerfd_create
返回的定时器文件描述符。flags
:常用 0,或TFD_TIMER_ABSTIME
(绝对时间)。new_value
:定时器新设置,结构体如下:1 2 3 4
struct itimerspec { struct timespec it_interval; // 周期性触发间隔 struct timespec it_value; // 首次触发时间 };
timespec
结构体:1 2 3 4
struct timespec { time_t tv_sec; // 秒 long tv_nsec; // 纳秒 };
old_value
:可选,保存定时器之前的设置。
- 返回值:成功返回 0,失败返回 -1。
1
int timerfd_gettime(int fd, struct itimerspec *curr_value);
- 功能:获取定时器当前剩余时间和周期设置。
- 参数:
fd
:定时器文件描述符。curr_value
:用于保存当前定时器状态。
- 返回值:成功返回 0,失败返回 -1。
4. 事件读取
定时器到期后,需用 read
读取触发次数:
1
2
uint64_t expirations;
read(fd, &expirations, sizeof(expirations));
- 说明:每次定时器到期,
read
返回自上次读取以来的触发次数(uint64_t 类型)。
epoll 系统函数
1
int epoll_create1(int flags);
- 功能:创建一个 epoll 实例,返回 epoll 文件描述符。
- 参数:
flags
:常用 0 或EPOLL_CLOEXEC
(exec 时自动关闭)。
- 返回值:成功返回 epoll fd,失败返回 -1。
1
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
- 功能:管理 epoll 监听的文件描述符。
- 参数:
epfd
:epoll 文件描述符。op
:操作类型:EPOLL_CTL_ADD
:添加监听 fd。EPOLL_CTL_MOD
:修改监听 fd。EPOLL_CTL_DEL
:删除监听 fd。
fd
:要操作的文件描述符。event
:监听事件类型及用户数据。1 2 3 4
struct epoll_event { uint32_t events; // 事件类型,如 EPOLLIN、EPOLLOUT epoll_data_t data; // 用户数据,可存 fd 或指针 };
- 返回值:成功返回 0,失败返回 -1。
1
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
- 功能:等待 epoll 监听的事件发生。
- 参数:
epfd
:epoll 文件描述符。events
:用于保存发生的事件数组。maxevents
:数组大小,建议小于等于 1024。timeout
:等待超时时间(毫秒),-1 表示无限等待。
- 返回值:返回发生事件的数量,失败返回 -1。
在给出window系统下的定时器实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 首次触发时间,重复间隔,回调函数
void run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb) {
HANDLE hTimer = CreateWaitableTimer(NULL, FALSE, NULL);
if (!hTimer) return;
LARGE_INTEGER liDueTime;
liDueTime.QuadPart = -initial_msec * 10000LL; // 负值表示相对时间,单位100纳秒
// 设置定时器
SetWaitableTimer(hTimer, &liDueTime, interval_msec, NULL, NULL, FALSE);
while (1) {
DWORD dwRet = WaitForSingleObject(hTimer, INFINITE);
if (dwRet == WAIT_OBJECT_0) {
cb(); // 回调
}
}
CloseHandle(hTimer);
}
堆任务定时器
就是在循环定时器的基础上,添加维护底层任务队列的功能。在任务定时器的底层定时器触发时,检查过期的任务并执行它们。优先级队列按照任务的到期时间进行排序。触发精度是毫秒级的。
首先定义任务结构体:
1
2
3
4
5
struct Task {
uint16_t tid;
uint64_t timestamp;
TimerCallback callback;
};
- tid: 标记任务,用于懒删除任务
- timestamp: 任务的到期时间戳
- callback: 任务的回调函数
然后定义我们的任务处理系统:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 定时器任务回调类型
using TimerCallback = std::function<void()>;
HANDLE run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb);
std::function<bool(const Task&, const Task&)> CMP = [](const Task& a, const Task& b) {
return a.timestamp > b.timestamp;
};
class TimerQueue {
public:
TimerQueue() : task_queue(CMP), next_task_id(1) {}
~TimerQueue() = default;
void start() {
timer_handle = run_ms_timer_win(1, 1, [this](){ update(); });
}
void update() {
auto now = GetTickCount();
while (!task_queue.empty()) {
Task task = task_queue.top();
if (task.timestamp <= now) {
task.callback();
task_queue.pop();
} else {
break;
}
}
}
// 添加定时任务 : 毫秒级
uint64_t add_task(uint64_t delay_ms, TimerCallback cb) {
uint64_t tid = next_task_id++;
uint64_t timestamp = GetTickCount() + delay_ms;
task_queue.push({ tid, timestamp, cb });
return tid;
}
private:
uint64_t next_task_id;
HANDLE timer_handle;
std::priority_queue<Task, std::vector<Task>, decltype(CMP)> task_queue;
};
// 首次触发时间,重复间隔,回调函数
HANDLE run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb) {
HANDLE hTimer = CreateWaitableTimer(NULL, FALSE, NULL);
if (!hTimer) return NULL;
LARGE_INTEGER liDueTime;
liDueTime.QuadPart = -initial_msec * 10000LL; // 负值表示相对时间,单位100纳秒
// 设置定时器
SetWaitableTimer(hTimer, &liDueTime, interval_msec, NULL, NULL, FALSE);
while (1) {
DWORD dwRet = WaitForSingleObject(hTimer, INFINITE);
if (dwRet == WAIT_OBJECT_0) {
cb(); // 回调
}
}
return hTimer;
}
层级时间轮定时器任务系统
详细代码可以参见:层级时间轮定时器实现
这里讲下一些关键的实现。
首先我们要健壮一下我们的定时器任务结构,使其支持取消、重复执行的功能:
1
2
3
4
5
6
7
8
9
10
11
struct TimerTask {
using TimerCallback = std::function<void()>;
TimerTask(size_t i, size_t ms, size_t fr, TimerCallback cb, bool repeat) : interval(i), timeout_ms(ms), timeout_frame(fr), callback(cb), is_repeat(repeat) {}
size_t interval = 0; // 任务间隔
size_t timeout_ms = 0; // 超时时间(毫秒)
size_t timeout_frame = 0; // 超时时间(帧数)
TimerCallback callback;
bool is_repeat = false;
bool is_canceled = false;
};
这里讲下帧数:帧数实际上就是时间轮中的槽代表的时间长度,用于计算某一时间点属于第几帧;
一般的层级时间轮共有三个时间轮组成,每个时间轮由固定大小的槽组成的环;时间轮中含有一个当前索引,指向下一个要处理的槽。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TimeWheel {
public:
struct Wheel {
using Slot = std::vector<TimerTask*>;
size_t cur_slot_index = 0; // 当前槽索引
std::vector<Slot> slots;
};
private:
size_t cur_frame_ = 0; // 当前计时器运行到的帧数(tick次数),用于记录时间进度
size_t interval_ = 33; // 每一帧的时间间隔,单位为毫秒(即每33ms触发一次tick)
size_t max_slots_ = 1024; // 每个时间轮的槽数量,决定了能管理的最大定时长度
int next_timer_id_ = 0; // 下一个分配的定时器ID,用于唯一标识每个定时任务
Wheel wheels_[WHEEL_COUNT]; // 多级时间轮结构体数组,用于高效管理大量定时任务
std::unordered_map<int, TimerTask*> id_2_timer_task_; // 定时器ID到定时任务对象的映射,方便查找和删除定时任务
size_t left_ms_ = 0; // 上一次tick后剩余未处理的毫秒数,用于时间补偿
size_t ms_since_start_ = 0; // 计时器启动以来累计经过的毫秒数
std::chrono::steady_clock::time_point last_update_clock_; // 上一次调用update的时间点,用于计算实际经过的时间
};
下面讲下timewheel工作的几个重要函数:
update:外部定时器触发函数。在高精度的定时器系统中,外部定时器触发的时间间隔不一定就是33ms,可能会因为系统调度等原因产生抖动。因此,我们需要在update函数中进行时间补偿,确保每个定时任务都能在正确的时间点被触发。时间补偿的机制是这样的,我们需要记录待处理的时间长度 left_ms,并在left_ms内按每帧去处理定时任务,直到left_ms不够一帧的长度。
这里给个伪代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void update() {
size_t now = get_now_ms();
size_t duration = last_update_clock_ - now;
left_ms_ += duration;
last_update_clock_ = now;
while (left_ms_ >= interval_) {
left_ms_ -= interval_;
cur_frame_++; // 更新当前帧数
tick();
}
last_update_clock_ = now;
}
tick: 每一帧 / 33ms 到来时的处理函数。在tick函数中,我们要更新时间轮的当前指针,槽任务的移动,以及执行到期的定时任务。对于一级时间轮内的槽,我们需要遍历当前槽中的所有定时任务,并检查它们是否到期。如果到期,就执行相应的回调函数,并将任务移除。对于高级时间轮的槽,我们需要将其任务取出,并移动到低级的时间轮种。这是tick函数的工作内容。
注意只有当低级的时间轮的指针走了一圈,也就是指向0时,才会触发下一层时间轮的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void tick() {
for(int i = 0; i < WHEEL_COUNT; i++) {
Wheel& wheel = wheels_[i];
wheel.cur_index = (wheel.cur_index + 1) % max_slots_;
if (i < 1) {
run_tasks(index, wheel.cur_index);
} else {
move_tasks(index, wheel.cur_index);
}
if (wheel.cur_index == 0) {
// 触发下一层时间轮的处理
continue;
}
}
}
run_tasks: 处理当前时间轮槽中的所有定时任务,并执行到期的回调函数。注意下细节,每个槽代表一段时间,也就是内部的任务是有先后顺序的,我们在调用前先排序然后再执行。任务分为非重复任务和重复任务两种,对于重复任务,在执行完后,要计算其下一次的槽的位置,并放入该槽中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void run_tasks(int wheel_index, int slot_index) {
Wheel wheel = wheels[wheel_index];
Slot slot = wheel.slots[slot_index];
sort(slot.begin(), slot.end(), [](TimerTask* a, TimerTask* b) {
return a->timeout_ms < b->timeout_ms
})
for(auto & task : slot.tasks) {
if (task.is_cancel) {
delete task;
continue;
}
task->callback();
if (task->repeat) {
size_t new_wheel_index, new_slot_index;
size_t offset = max((interval_ / 33), 1);
calculate_next_slot(offset, new_wheel_index, new_slot_index);
wheels[new_wheel_index].slots[new_slot_index].push_back(task);
} else {
delete task;
}
}
slot.clear();
}
move_tasks:将高级时间轮中指定槽的任务重新分配到低级时间轮中,就是将槽中的任务重新计算所属的时间轮和槽。
1
2
3
4
5
6
7
8
9
10
11
12
void move_tasks(int wheel_index, int slot_index) {
Wheel wheel = wheels[wheel_index];
Slot slot = wheel.slots[slot_index];
for(auto task : slot.tasks) {
size_t new_wheel_index, new_slot_index;
size_t offset = max((interval_ / 33), 1);
calculate_next_slot(offset, new_wheel_index, new_slot_index);
wheels[new_wheel_index].slots[new_slot_index].push_back(task);
}
}
calculate_next_slot: 非常重要的函数,用于计算距离当前帧offset帧的槽的位置。计算逻辑是这样的,一格高级的时间轮的槽代表低级时间轮所有的槽数量;从低级的时间轮向高级的时间轮转换时,offset要除以低级时间轮的总槽数;下面是代码:
```cpp bool calculate_next_slot(size_t offset, size_t& new_wheel_index, size_t& new_slot_index) { new_wheel_index = 0; new_slot_index = 0;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int i = 0; i < WHEEL_COUNT; i++) {
Wheel& wheel = wheels_[i];
size_t slots_count = wheel.slots.size();
if (slots_count >= max_slot_ && i < WHEEL_COUNT - 1) {
offset /= slots_count;
} else {
new_wheel_index = i;
break;
}
}
if (offset > final_max_slots_count) return false;
new_slot_index = (wheels[i].cur_index + offset) % wheels[i].slot_size();
return true; }
说到这里,核心的方法已经讲完了,剩下一些基本的操作,比如添加定时任务,删除定时任务,逻辑比较简单,就是调用上面的方法去计算新任务的位置然后添加仅需即可。
定时器的运行环境
定时器可以和IO多路复用结合,实现非阻塞式的定时系统。将定时器注册进事件循环中,和网络IO事件共享一个事件循环,就可以具备单核运行能力。如果要多核,就需要单独开一个线程,但此时就存在资源竞争的问题,需要对任务队列的访问进行加锁同步。