网络框架及消息处理系统--MMO服务器开发技术点记录
本篇文章记录MMO服务器项目开发中学习的技术点和遇到的问题。大致分成几个模块进行记录。
包管理和构建工具
采用cocan和cmake进行依赖库的管理和源代码的构建工作。
网络框架
任何联网应用离不开网络通信协议,该服务器选用TCP作为通信协议。通过IO多路复用实现多客户端的并发通信,为了减轻开发负担(服务器的网络框架和普通互联网的网络框架本质上没有太大的区别),这里采用libevent框架作为网络库进行封装。TcpServer监听端口,对每个建立的连接创建一个TcpConnection进行读写。
TcpServer的启动函数 start
在指定的event_base(libevent的基本使用会单独开篇文章进行讲解,这里不做深入)上创建监听器listener监听ipv4端口。创建函数:
1
2
3
_listener = evconnlistener_new_bind(base, accept_conn_cb, this,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
(struct sockaddr*)&addr_v4, sizeof(addr_v4));
参数解释:
- base: libevent 事件循环基础结构
- accept_conn_cb: 新连接回调函数
- ctx: 上下文,传递回调函数的用户数据指针
LEV_OPT_CLOSE_ON_FREE LEV_OPT_REUSEABLE: - LEV_OPT_CLOSE_ON_FREE: 释放监听器时自动关闭套接字
- LEV_OPT_REUSEABLE: 允许地址重用(SO_REUSEADDR) -1: 监听队列长度(-1 表示使用系统默认值)
地址重用:支持快速重启,防止老socket占用端口未释放导致重新启动服务器时bind失败
TcpServer的停止函数 stop
调用libevent的释放函数释放监听体
1
2
evconnlistener_disable(_listener);
evconnlistener_free(_listener);
TcpConnection 重点
设计高性能的TcpConnection的重点是做到读写分离,互不影响。借助libevent实现这一功能。
libevent中使用bufferevent对象实现IO多路复用式读写,需要注册读写事件回调。首先使用和Server相同的事件循环体base创建bufferevent:
1
_bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
然后注册读写回调,特殊事件(关闭事件、错误事件等)回调,这里写回调不设置,因为暂时没有这方面的需求。
1
bufferevent_setcb(_bev, socket_read_cb, NULL, socket_event_cb, (void*)this);
读回调中像缓冲区每次读取2048字节的数据,上传到上层处理。读取的字节流通过recv buffer解析成string数组。注意,我们的读写都要遵循7-bits格式,因为我们的客户端也是通过这个格式来解析的。7-bits编码怎么工作的,我们放在下一章和消息一起讲。
消息处理系统
服务器性能很大程度上依赖于一个高效的消息处理系统。对于我们的MMO服务器,该如何设计这个消息处理系统呢?首先socket从网络中收取的是字节流,我们需要将其解析成我们想要的格式,同时解决tcp的沾包问题。
RecvBuffer 字节流解析器
RecvBuffe类实现了一个基于长度前缀的消息解析器,采用状态机模式来处理 TCP 流数据的粘包和分包问题。
首先定义消息格式:Length-Prefixed模式。 ` [长度字段][消息数据] `
- 长度字段使用 7-bit 编码(类似 Protocol Buffers 的 varint 编码)
- 消息数据为实际的业务数据,后续要交由protobuf进行解析。
状态机设计
两个解析阶段
1
2
3
4
enum class ParseStage {
LEN, // 解析长度字段
DATA // 解析消息数据
};
在类中定义需要的常量:
1
2
3
4
5
6
7
8
9
// 长度解析相关
char _len_bytes[MAX_LEN_SIZE] = { 0 }; // 存储长度字节(最多5字节)
size_t _len_bytes_position = 0; // 当前长度字节位置
size_t _need_bytes = 1; // 还需要读取的字节数
// 数据解析相关
ParseStage _parse_stage = ParseStage::LEN; // 当前解析阶段
size_t _position = 0; // 数据缓冲区当前位置
char* _buffer = nullptr; // 消息数据缓冲区
解析流程详细解释:
- 阶段1:解析消息长度字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
char* _len_bytes; // char:1字节
case ParseStage::LEN:
_len_bytes[_len_bytes_position++] = bytes[bindex]; // 读取首位进_len_bytes
if ((bytes[bindex] & 0x80) == 0) // 检查最高位,0表示长度字段结束 0x80 = 10000000
_need_bytes = 0;
bindex += 1;
if (_need_bytes == 0) {
_parse_stage = ParseStage::DATA;
_need_bytes = calc_package_data_length(); // 计算消息数据长度
_len_bytes_position = 0;
assert(_need_bytes <= MAX_PACKAGE_SIZE);
_buffer = new char[_need_bytes]; // 分配消息数据缓冲区
}
break;
解释下7-bits编码规则,这是常用的分包手段。
- 每个字节的最高1位作为是否继续的标志
- 1:还有后续字节,说明1位不足以存储长度数据
- 0:最后一个字节
示例:
长度 127: 0x7F (一个字节:01111111) 长度 128: 0x80 0x01 (两个字节:10000000 00000001)
读取长度数据进字符数组,然后计算其数值。给个计算思路的示范:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
size_t calc_package_data_length() {
size_t result = 0;
size_t shift = 0;
for (size_t i = 0; i < _len_bytes_position; i++) {
// 取低7位的值
size_t byte_value = _len_bytes[i] & 0x7F; // 0x7F = 01111111
// 左移相应位数后累加
result |= (byte_value << shift);
// 每个字节贡献7位,所以shift增加7
shift += 7;
}
return result;
}
- 阶段2:数据阶段 这段代码是TCP连接中解析数据包的DATA阶段,用于读取消息的实际内容。让我详细解释一下:
- 计算剩余字节数
1
size_t leftBytesNum = n - bindex;
n
是本次接收到的总字节数bindex
是当前处理到的位置leftBytesNum
是还未处理的字节数
- 数据不足的情况
1
2
3
4
5
6
7
if (leftBytesNum < _need_bytes)
{
memcpy(_buffer + _position, bytes + bindex, leftBytesNum);
_need_bytes -= leftBytesNum;
bindex += leftBytesNum;
_position += leftBytesNum;
}
当剩余数据不足以完成当前消息时:
- 将所有剩余数据拷贝到缓冲区
- 更新还需要的字节数 (
_need_bytes
) - 更新读取位置 (
bindex
) - 更新缓冲区写入位置 (
_position
)
保存解析器的状态,下次socket接收到数据时,会继续完成解析工作。
- 数据充足的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
else
{
memcpy(_buffer + _position, bytes + bindex, _need_bytes);
bindex += _need_bytes;
// finish one msg
std::string msg{ _buffer, _position + _need_bytes };
msgs.push_back(std::move(msg));
// reset to initial state
_parse_stage = ParseStage::LEN;
_need_bytes = 1;
delete[] _buffer;
_buffer = nullptr;
_position = 0;
}
当数据足够完成当前消息时:
- 只拷贝需要的字节数到缓冲区
- 构造完整的消息字符串
- 将消息添加到结果向量中
- 重置解析器状态:
- 回到长度解析阶段 (
ParseStage::LEN
) - 重置需要字节数为1(准备读取下一个消息的长度)
- 释放数据缓冲区
- 重置位置指针
- 回到长度解析阶段 (
消息结构设计
处理完tcp的沾包问题和分包工作后,要开始定义我们的消息结构。每个消息定义为两部分:消息长度+消息内容,分两次发送。所以我们的发送消息流是这样的:
消息长度+消息内容+消息长度+消息内容 ···
对应recv buffer中的两个阶段:Len Data。讲到这里,我们的并发网络框架就搭建完了,我们的服务器已经是个基本的tcp服务器哩。下一篇文章,我们讲开始踏入MMO服务器的大门。
缓冲区
在网络层和应用层之间,我们需要放置读写缓冲区,将两层的IO操作解耦开来,提高系统的性能和可维护性,可扩展性。因为如果拿一般的字符数组作为缓冲区,提供的功能就太简单了。但本质上,我们的缓冲区也就是一个具有自定义功能的char数组。
BitStream
BitStream 分为用于序列化和反序列化数据的 OutputBitStream 和 InputBitStream 类,这些类允许将数据写入和读取到位级别,这在网络编程中对于有效地利用带宽非常有用。这里的序列化是指将具体类型的数据序列化为字符数组,反序列化是指将字符数组反序列化为具体类型。
序列化和反序列化通常伴随错误,我们需要定义自己的错误类。
StreamException 类
- StreamException 是一个自定义异常类,继承自 std::exception。
- 它用于在流操作中发生错误时抛出异常,例如读取超出流的末尾。
- 它包含一个 message_ 成员变量,用于存储错误消息。
- what() 方法返回错误消息的 C 风格字符串。
同时要设计一个约束,实现这个约束的类说明具备网络序列化的能力。
NetSerializable
NetSerializable 是一个概念,用于约束可以进行网络序列化的类型。
- 它要求类型 T 必须是一个类,并且必须具有 net_serialize 和 net_delta_serialize 成员函数,分别用于完整序列化和增量序列化。
- net_serialize 函数接受一个 OutputBitStream 对象和一个 bool 值 to_self 作为参数,并返回 void。
- net_delta_serialize 函数接受一个 OutputBitStream 对象和一个 bool 值 to_self 作为参数,并返回 bool,表示数据是否已更改。
OutputBitStream
用于将数据写入位流。
- 它包含以下成员变量:
- m_buffer: 指向用于存储数据的缓冲区的指针。
- m_pos: 当前写入位置(以字节为单位)。
- m_capacity: 缓冲区的容量(以字节为单位)。
- 它提供以下方法:
- OutputBitStream(): 默认构造函数。
- ~OutputBitStream(): 析构函数,释放缓冲区。
- tellp(): 返回当前写入位置。
- seekp(size_t pos): 将写入位置移动到指定位置。
- get_buffer(): 返回指向缓冲区的指针。
- write(const void* data, size_t bytes): 将指定数量的字节写入流。
- write(const std::string& str): 将字符串写入流。
- write(const T& data) (NetSerializable): 使用 net_serialize 方法将可网络序列化的对象写入流。
- write(T data) (arithmetic): 将算术类型的数据写入流。
- write(const std::vector
& v): 将 std::vector 写入流。首先写入向量的大小,然后写入每个元素。 - write(const std::array<T, N>& v): 将 std::array 写入流。
- net_delta_serialize(T& data): 使用 net_delta_serialize 方法将可网络序列化的对象写入流。
- realloc_buffer(size_t new_length): 重新分配缓冲区,使其具有指定的大小。
- OutputBitStream(): 默认构造函数。
InputBitStream
将位流解析入数据中
- InputBitStream类用于从位流读取数据。
- 它包含以下成员变量:
- m_buffer: 指向用于存储数据的缓冲区的指针。
- m_pos: 当前读取位置(以字节为单位)。
- m_capacity: 缓冲区的容量(以字节为单位)。
- 它提供以下方法:
- InputBitStream(const char* data, size_t n): 构造函数,使用指定的数据和大小初始化流。
is_end()
: 检查是否已到达流的末尾。bypass(size_t bytes)
: 将读取位置移动指定数量的字节。tellp()
: 返回当前读取位置。seekp(size_t pos)
: 将读取位置移动到指定位置。get_buffer()
: 返回指向缓冲区的指针。check_overflow(size_t bytes)
: 检查读取是否会超出流的末尾。- read(void* dest, size_t bytes): 从流中读取指定数量的字节。
- read(std::string& str): 从流中读取字符串。
read_string()
: 从流中读取字符串。- read() (NetSerializable): 使用
net_serialize
方法从流中读取可网络序列化的对象。 - read(T& ref) (NetSerializable): 使用
net_serialize
方法从流中读取可网络序列化的对象。 - read() (arithmetic): 从流中读取算术类型的数据。
- read(T& ref) (arithmetic): 从流中读取算术类型的数据。
- read(std::vector& v): 从流中读取
std::vector
。首先读取向量的大小,然后读取每个元素。 - read(std::array& v): 从流中读取
std::array
。