Post

定时器系统和层级时间轮技术细节和linux下常见的定时器相关接口

定时器系统和层级时间轮技术细节和linux下常见的定时器相关接口

常见的定时器相关接口 / 创建epoll循环驱动的定时器

任何定时器系统,不管是任务队列型定时器,还是时间轮定时器系统,都必然需要一个底层的待回调的高精度的定时器作为驱动。这里给出在Linux下的基于事件循环驱动的毫秒级定时器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <functional>

// 定时器任务回调类型
using TimerCallback = std::function<void()>;

// 定时器任务 首次触发时间,重复间隔,回调函数
void run_ms_timer(int initial_msec, int interval_msec, TimerCallback cb) {
    int epfd = epoll_create1(0);
    int tfd = timerfd_create(CLOCK_MONOTONIC, 0);

    struct itimerspec new_value = {};
    new_value.it_value.tv_nsec = initial_msec * 1000000; // 毫秒转纳秒
    new_value.it_interval.tv_nsec = interval_msec * 1000000; // 毫秒转纳秒
    timerfd_settime(tfd, 0, &new_value, NULL);

    struct epoll_event ev = {};
    ev.events = EPOLLIN;
    ev.data.fd = tfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);

    while (1) {
        struct epoll_event events[1];
        int nfds = epoll_wait(epfd, events, 1, -1);
        if (nfds > 0 && events[0].data.fd == tfd) {
            uint64_t expirations;
            read(tfd, &expirations, sizeof(expirations));
            cb(); // 回调
        }
    }
    close(tfd);
    close(epfd);
}

在给出window系统下的定时器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 首次触发时间,重复间隔,回调函数
void run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb) {
    HANDLE hTimer = CreateWaitableTimer(NULL, FALSE, NULL);
    if (!hTimer) return;

    LARGE_INTEGER liDueTime;
    liDueTime.QuadPart = -initial_msec * 10000LL; // 负值表示相对时间,单位100纳秒

    // 设置定时器
    SetWaitableTimer(hTimer, &liDueTime, interval_msec, NULL, NULL, FALSE);

    while (1) {
        DWORD dwRet = WaitForSingleObject(hTimer, INFINITE);
        if (dwRet == WAIT_OBJECT_0) {
            cb(); // 回调
        }
    }
    CloseHandle(hTimer);
}

堆任务定时器

就是在循环定时器的基础上,添加维护底层任务队列的功能。在任务定时器的底层定时器触发时,检查过期的任务并执行它们。优先级队列按照任务的到期时间进行排序。触发精度是毫秒级的。

首先定义任务结构体:

1
2
3
4
5
struct Task {
    uint16_t tid;
    uint64_t timestamp;
    TimerCallback callback;
};
  • tid: 标记任务,用于懒删除任务
  • timestamp: 任务的到期时间戳
  • callback: 任务的回调函数

然后定义我们的任务处理系统:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 定时器任务回调类型
using TimerCallback = std::function<void()>;
HANDLE run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb);

std::function<bool(const Task&, const Task&)> CMP = [](const Task& a, const Task& b) {
    return a.timestamp > b.timestamp;
};

class TimerQueue {
public:
    TimerQueue() : task_queue(CMP), next_task_id(1) {}
    ~TimerQueue() = default;

    void start() {
        timer_handle = run_ms_timer_win(1, 1, [this](){ update(); });
    }
    void update() {
        auto now = GetTickCount();
        while (!task_queue.empty()) {
            Task task = task_queue.top();
            if (task.timestamp <= now) {
                task.callback();
                task_queue.pop();
            } else {
                break;
            }
        }
    }

    // 添加定时任务 :  毫秒级
    uint64_t add_task(uint64_t delay_ms, TimerCallback cb) {
        uint64_t tid = next_task_id++;
        uint64_t timestamp = GetTickCount() + delay_ms;
        task_queue.push({ tid, timestamp, cb });
        return tid;
    }
private:
    uint64_t next_task_id;
    HANDLE timer_handle;
    std::priority_queue<Task, std::vector<Task>, decltype(CMP)> task_queue;
};

// 首次触发时间,重复间隔,回调函数
HANDLE run_ms_timer_win(int initial_msec, int interval_msec, TimerCallback cb) {
    HANDLE hTimer = CreateWaitableTimer(NULL, FALSE, NULL);
    if (!hTimer) return NULL;

    LARGE_INTEGER liDueTime;
    liDueTime.QuadPart = -initial_msec * 10000LL; // 负值表示相对时间,单位100纳秒

    // 设置定时器
    SetWaitableTimer(hTimer, &liDueTime, interval_msec, NULL, NULL, FALSE);

    while (1) {
        DWORD dwRet = WaitForSingleObject(hTimer, INFINITE);
        if (dwRet == WAIT_OBJECT_0) {
            cb(); // 回调
        }
    }

    return hTimer;
}

层级时间轮定时器任务系统

详细代码可以参见:层级时间轮定时器实现

这里讲下一些关键的实现。

首先我们要健壮一下我们的定时器任务结构,使其支持取消、重复执行的功能:

1
2
3
4
5
6
7
8
9
10
11
struct TimerTask {
    using TimerCallback = std::function<void()>;
    TimerTask(size_t i, size_t ms, size_t fr, TimerCallback cb, bool repeat) : interval(i), timeout_ms(ms), timeout_frame(fr), callback(cb), is_repeat(repeat) {}

    size_t interval = 0; // 任务间隔
    size_t timeout_ms = 0; // 超时时间(毫秒)
    size_t timeout_frame = 0; // 超时时间(帧数)
    TimerCallback callback;
    bool is_repeat = false;
    bool is_canceled = false;
};

这里讲下帧数:帧数实际上就是时间轮中的槽代表的时间长度,用于计算某一时间点属于第几帧;

一般的层级时间轮共有三个时间轮组成,每个时间轮由固定大小的槽组成的环;时间轮中含有一个当前索引,指向下一个要处理的槽。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TimeWheel {
public:
    struct Wheel {
        using Slot = std::vector<TimerTask*>;
        size_t cur_slot_index = 0; // 当前槽索引
        std::vector<Slot> slots;
    };

private:
    size_t cur_frame_ = 0; // 当前计时器运行到的帧数(tick次数),用于记录时间进度
    size_t interval_ = 33; // 每一帧的时间间隔,单位为毫秒(即每33ms触发一次tick)
    size_t max_slots_ = 1024; // 每个时间轮的槽数量,决定了能管理的最大定时长度
    int next_timer_id_ = 0; // 下一个分配的定时器ID,用于唯一标识每个定时任务

    Wheel wheels_[WHEEL_COUNT]; // 多级时间轮结构体数组,用于高效管理大量定时任务
    std::unordered_map<int, TimerTask*> id_2_timer_task_; // 定时器ID到定时任务对象的映射,方便查找和删除定时任务

    size_t left_ms_ = 0; // 上一次tick后剩余未处理的毫秒数,用于时间补偿
    size_t ms_since_start_ = 0; // 计时器启动以来累计经过的毫秒数
    std::chrono::steady_clock::time_point last_update_clock_; // 上一次调用update的时间点,用于计算实际经过的时间
};

下面讲下timewheel工作的几个重要函数:

update:外部定时器触发函数。在高精度的定时器系统中,外部定时器触发的时间间隔不一定就是33ms,可能会因为系统调度等原因产生抖动。因此,我们需要在update函数中进行时间补偿,确保每个定时任务都能在正确的时间点被触发。时间补偿的机制是这样的,我们需要记录待处理的时间长度 left_ms,并在left_ms内按每帧去处理定时任务,直到left_ms不够一帧的长度。

这里给个伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void update() {
    size_t now = get_now_ms();
    size_t duration = last_update_clock_ - now;

    left_ms_ += duration;
    last_update_clock_ = now;

    while (left_ms_ >= interval_) {
        left_ms_ -= interval_;
        cur_frame_++; // 更新当前帧数
        tick();
    }

    last_update_clock_ = now;
}

tick: 每一帧 / 33ms 到来时的处理函数。在tick函数中,我们要更新时间轮的当前指针,槽任务的移动,以及执行到期的定时任务。对于一级时间轮内的槽,我们需要遍历当前槽中的所有定时任务,并检查它们是否到期。如果到期,就执行相应的回调函数,并将任务移除。对于高级时间轮的槽,我们需要将其任务取出,并移动到低级的时间轮种。这是tick函数的工作内容。

注意只有当低级的时间轮的指针走了一圈,也就是指向0时,才会触发下一层时间轮的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void tick() {
    for(int i = 0; i < WHEEL_COUNT; i++) {
        Wheel& wheel = wheels_[i];
        wheel.cur_index = (wheel.cur_index + 1) % max_slots_;

        if (i < 1) {
            run_tasks(index, wheel.cur_index);
        } else {
            move_tasks(index, wheel.cur_index);
        }

        if (wheel.cur_index == 0) {
            // 触发下一层时间轮的处理
            continue;
        }
    }
}

run_tasks: 处理当前时间轮槽中的所有定时任务,并执行到期的回调函数。注意下细节,每个槽代表一段时间,也就是内部的任务是有先后顺序的,我们在调用前先排序然后再执行。任务分为非重复任务和重复任务两种,对于重复任务,在执行完后,要计算其下一次的槽的位置,并放入该槽中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void run_tasks(int wheel_index, int slot_index) {
    Wheel wheel = wheels[wheel_index];
    Slot slot = wheel.slots[slot_index];

    sort(slot.begin(), slot.end(), [](TimerTask* a, TimerTask* b) {
        return a->timeout_ms < b->timeout_ms
    })

    for(auto & task : slot.tasks) {
        if (task.is_cancel) {
            delete task;
            continue;
        }

        task->callback();
        if (task->repeat) {
            size_t new_wheel_index, new_slot_index;
            size_t offset = max((interval_ / 33), 1);
            calculate_next_slot(offset, new_wheel_index, new_slot_index);

            wheels[new_wheel_index].slots[new_slot_index].push_back(task);
        } else {
            delete task;
        }
    }

    slot.clear();
}

move_tasks:将高级时间轮中指定槽的任务重新分配到低级时间轮中,就是将槽中的任务重新计算所属的时间轮和槽。

1
2
3
4
5
6
7
8
9
10
11
12
void move_tasks(int wheel_index, int slot_index) {
    Wheel wheel = wheels[wheel_index];
    Slot slot = wheel.slots[slot_index];

    for(auto task : slot.tasks) {
        size_t new_wheel_index, new_slot_index;
        size_t offset = max((interval_ / 33), 1);
        calculate_next_slot(offset, new_wheel_index, new_slot_index);

        wheels[new_wheel_index].slots[new_slot_index].push_back(task);
    }
}

calculate_next_slot: 非常重要的函数,用于计算距离当前帧offset帧的槽的位置。计算逻辑是这样的,一格高级的时间轮的槽代表低级时间轮所有的槽数量;从低级的时间轮向高级的时间轮转换时,offset要除以低级时间轮的总槽数;下面是代码:

```cpp bool calculate_next_slot(size_t offset, size_t& new_wheel_index, size_t& new_slot_index) { new_wheel_index = 0; new_slot_index = 0;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int i = 0; i < WHEEL_COUNT; i++) {
    Wheel& wheel = wheels_[i];
    size_t slots_count = wheel.slots.size();

    if (slots_count >= max_slot_ && i < WHEEL_COUNT - 1) {
        offset /= slots_count;
    } else {
        new_wheel_index = i;
        break;
    }
}

if (offset > final_max_slots_count) return false;

new_slot_index = (wheels[i].cur_index + offset) % wheels[i].slot_size();
return true; }

说到这里,核心的方法已经讲完了,剩下一些基本的操作,比如添加定时任务,删除定时任务,逻辑比较简单,就是调用上面的方法去计算新任务的位置然后添加仅需即可。

定时器的运行环境

定时器可以和IO多路复用结合,实现非阻塞式的定时系统。将定时器注册进事件循环中,和网络IO事件共享一个事件循环,就可以具备单核运行能力。如果要多核,就需要单独开一个线程,但此时就存在资源竞争的问题,需要对任务队列的访问进行加锁同步。

This post is licensed under CC BY 4.0 by the author.