Post

网络框架及消息处理系统--MMO服务器开发技术点记录

网络框架及消息处理系统--MMO服务器开发技术点记录

本篇文章记录MMO服务器项目开发中学习的技术点和遇到的问题。大致分成几个模块进行记录。

包管理和构建工具

采用cocan和cmake进行依赖库的管理和源代码的构建工作。

网络框架

任何联网应用离不开网络通信协议,该服务器选用TCP作为通信协议。通过IO多路复用实现多客户端的并发通信,为了减轻开发负担(服务器的网络框架和普通互联网的网络框架本质上没有太大的区别),这里采用libevent框架作为网络库进行封装。TcpServer监听端口,对每个建立的连接创建一个TcpConnection进行读写。

TcpServer的启动函数 start

在指定的event_base(libevent的基本使用会单独开篇文章进行讲解,这里不做深入)上创建监听器listener监听ipv4端口。创建函数:

1
2
3
_listener = evconnlistener_new_bind(base, accept_conn_cb, this,
    LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, -1,
    (struct sockaddr*)&addr_v4, sizeof(addr_v4));

参数解释:

  • base: libevent 事件循环基础结构
  • accept_conn_cb: 新连接回调函数
  • ctx: 上下文,传递回调函数的用户数据指针
  • LEV_OPT_CLOSE_ON_FREELEV_OPT_REUSEABLE:
    • LEV_OPT_CLOSE_ON_FREE: 释放监听器时自动关闭套接字
    • LEV_OPT_REUSEABLE: 允许地址重用(SO_REUSEADDR) -1: 监听队列长度(-1 表示使用系统默认值)

地址重用:支持快速重启,防止老socket占用端口未释放导致重新启动服务器时bind失败

TcpServer的停止函数 stop

调用libevent的释放函数释放监听体

1
2
    evconnlistener_disable(_listener);
    evconnlistener_free(_listener);

TcpConnection 重点

设计高性能的TcpConnection的重点是做到读写分离,互不影响。借助libevent实现这一功能。

libevent中使用bufferevent对象实现IO多路复用式读写,需要注册读写事件回调。首先使用和Server相同的事件循环体base创建bufferevent:

1
_bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);

然后注册读写回调,特殊事件(关闭事件、错误事件等)回调,这里写回调不设置,因为暂时没有这方面的需求。

1
bufferevent_setcb(_bev, socket_read_cb, NULL, socket_event_cb, (void*)this);

读回调中像缓冲区每次读取2048字节的数据,上传到上层处理。读取的字节流通过recv buffer解析成string数组。注意,我们的读写都要遵循7-bits格式,因为我们的客户端也是通过这个格式来解析的。7-bits编码怎么工作的,我们放在下一章和消息一起讲。

消息处理系统

服务器性能很大程度上依赖于一个高效的消息处理系统。对于我们的MMO服务器,该如何设计这个消息处理系统呢?首先socket从网络中收取的是字节流,我们需要将其解析成我们想要的格式,同时解决tcp的沾包问题。

RecvBuffer 字节流解析器

RecvBuffe类实现了一个基于长度前缀的消息解析器,采用状态机模式来处理 TCP 流数据的粘包和分包问题。

首先定义消息格式:Length-Prefixed模式。 ` [长度字段][消息数据] `

  • 长度字段使用 7-bit 编码(类似 Protocol Buffers 的 varint 编码)
  • 消息数据为实际的业务数据,后续要交由protobuf进行解析。

状态机设计

两个解析阶段

1
2
3
4
enum class ParseStage {
    LEN,    // 解析长度字段
    DATA    // 解析消息数据
};

在类中定义需要的常量:

1
2
3
4
5
6
7
8
9
// 长度解析相关
char _len_bytes[MAX_LEN_SIZE] = { 0 };  // 存储长度字节(最多5字节)
size_t _len_bytes_position = 0;         // 当前长度字节位置
size_t _need_bytes = 1;                 // 还需要读取的字节数

// 数据解析相关
ParseStage _parse_stage = ParseStage::LEN;  // 当前解析阶段
size_t _position = 0;                       // 数据缓冲区当前位置
char* _buffer = nullptr;                    // 消息数据缓冲区

解析流程详细解释:

  • 阶段1:解析消息长度字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
char* _len_bytes; // char:1字节

case ParseStage::LEN:
    _len_bytes[_len_bytes_position++] = bytes[bindex]; // 读取首位进_len_bytes
    if ((bytes[bindex] & 0x80) == 0)  // 检查最高位,0表示长度字段结束 0x80 = 10000000
        _need_bytes = 0;
    
    bindex += 1;
    if (_need_bytes == 0) {
        _parse_stage = ParseStage::DATA;
        _need_bytes = calc_package_data_length();  // 计算消息数据长度
        _len_bytes_position = 0;
        
        assert(_need_bytes <= MAX_PACKAGE_SIZE);
        _buffer = new char[_need_bytes];  // 分配消息数据缓冲区
    }
    break;

解释下7-bits编码规则,这是常用的分包手段。

  • 每个字节的最高1位作为是否继续的标志
    • 1:还有后续字节,说明1位不足以存储长度数据
    • 0:最后一个字节

示例:

长度 127: 0x7F (一个字节:01111111) 长度 128: 0x80 0x01 (两个字节:10000000 00000001)

读取长度数据进字符数组,然后计算其数值。给个计算思路的示范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
size_t calc_package_data_length() {
    size_t result = 0;
    size_t shift = 0;
    
    for (size_t i = 0; i < _len_bytes_position; i++) {
        // 取低7位的值
        size_t byte_value = _len_bytes[i] & 0x7F;  // 0x7F = 01111111
        
        // 左移相应位数后累加
        result |= (byte_value << shift);
        
        // 每个字节贡献7位,所以shift增加7
        shift += 7;
    }
    
    return result;
}
  • 阶段2:数据阶段 这段代码是TCP连接中解析数据包的DATA阶段,用于读取消息的实际内容。让我详细解释一下:
  1. 计算剩余字节数
1
size_t leftBytesNum = n - bindex;
  • n 是本次接收到的总字节数
  • bindex 是当前处理到的位置
  • leftBytesNum 是还未处理的字节数
  1. 数据不足的情况
1
2
3
4
5
6
7
if (leftBytesNum < _need_bytes)
{
    memcpy(_buffer + _position, bytes + bindex, leftBytesNum);
    _need_bytes -= leftBytesNum;
    bindex += leftBytesNum;
    _position += leftBytesNum;
}

当剩余数据不足以完成当前消息时:

  • 将所有剩余数据拷贝到缓冲区
  • 更新还需要的字节数 (_need_bytes)
  • 更新读取位置 (bindex)
  • 更新缓冲区写入位置 (_position)

保存解析器的状态,下次socket接收到数据时,会继续完成解析工作。

  1. 数据充足的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
else
{
    memcpy(_buffer + _position, bytes + bindex, _need_bytes);
    bindex += _need_bytes;

    // finish one msg
    std::string msg{ _buffer, _position + _need_bytes };
    msgs.push_back(std::move(msg));

    // reset to initial state
    _parse_stage = ParseStage::LEN;
    _need_bytes = 1;

    delete[] _buffer;
    _buffer = nullptr;
    _position = 0;
}

当数据足够完成当前消息时:

  • 只拷贝需要的字节数到缓冲区
  • 构造完整的消息字符串
  • 将消息添加到结果向量中
  • 重置解析器状态
    • 回到长度解析阶段 (ParseStage::LEN)
    • 重置需要字节数为1(准备读取下一个消息的长度)
    • 释放数据缓冲区
    • 重置位置指针

消息结构设计

处理完tcp的沾包问题和分包工作后,要开始定义我们的消息结构。每个消息定义为两部分:消息长度+消息内容,分两次发送。所以我们的发送消息流是这样的:

消息长度+消息内容+消息长度+消息内容 ···

对应recv buffer中的两个阶段:Len Data。讲到这里,我们的并发网络框架就搭建完了,我们的服务器已经是个基本的tcp服务器哩。下一篇文章,我们讲开始踏入MMO服务器的大门。

缓冲区

在网络层和应用层之间,我们需要放置读写缓冲区,将两层的IO操作解耦开来,提高系统的性能和可维护性,可扩展性。因为如果拿一般的字符数组作为缓冲区,提供的功能就太简单了。但本质上,我们的缓冲区也就是一个具有自定义功能的char数组。

BitStream

BitStream 分为用于序列化和反序列化数据的 OutputBitStream 和 InputBitStream 类,这些类允许将数据写入和读取到位级别,这在网络编程中对于有效地利用带宽非常有用。这里的序列化是指将具体类型的数据序列化为字符数组,反序列化是指将字符数组反序列化为具体类型。

序列化和反序列化通常伴随错误,我们需要定义自己的错误类。

StreamException 类

  • StreamException 是一个自定义异常类,继承自 std::exception。
  • 它用于在流操作中发生错误时抛出异常,例如读取超出流的末尾。
  • 它包含一个 message_ 成员变量,用于存储错误消息。
  • what() 方法返回错误消息的 C 风格字符串。

同时要设计一个约束,实现这个约束的类说明具备网络序列化的能力。

NetSerializable

NetSerializable 是一个概念,用于约束可以进行网络序列化的类型。

  • 它要求类型 T 必须是一个类,并且必须具有 net_serialize 和 net_delta_serialize 成员函数,分别用于完整序列化和增量序列化。
    • net_serialize 函数接受一个 OutputBitStream 对象和一个 bool 值 to_self 作为参数,并返回 void。
    • net_delta_serialize 函数接受一个 OutputBitStream 对象和一个 bool 值 to_self 作为参数,并返回 bool,表示数据是否已更改。

OutputBitStream

用于将数据写入位流。

  • 它包含以下成员变量:
    • m_buffer: 指向用于存储数据的缓冲区的指针。
    • m_pos: 当前写入位置(以字节为单位)。
    • m_capacity: 缓冲区的容量(以字节为单位)。
  • 它提供以下方法:
    • OutputBitStream(): 默认构造函数。
      • ~OutputBitStream(): 析构函数,释放缓冲区。
      • tellp(): 返回当前写入位置。
      • seekp(size_t pos): 将写入位置移动到指定位置。
      • get_buffer(): 返回指向缓冲区的指针。
      • write(const void* data, size_t bytes): 将指定数量的字节写入流。
      • write(const std::string& str): 将字符串写入流。
      • write(const T& data) (NetSerializable): 使用 net_serialize 方法将可网络序列化的对象写入流。
      • write(T data) (arithmetic): 将算术类型的数据写入流。
      • write(const std::vector& v): 将 std::vector 写入流。首先写入向量的大小,然后写入每个元素。
      • write(const std::array<T, N>& v): 将 std::array 写入流。
      • net_delta_serialize(T& data): 使用 net_delta_serialize 方法将可网络序列化的对象写入流。
      • realloc_buffer(size_t new_length): 重新分配缓冲区,使其具有指定的大小。

InputBitStream

将位流解析入数据中

  • InputBitStream类用于从位流读取数据。
  • 它包含以下成员变量:
    • m_buffer: 指向用于存储数据的缓冲区的指针。
    • m_pos: 当前读取位置(以字节为单位)。
    • m_capacity: 缓冲区的容量(以字节为单位)。
  • 它提供以下方法:
    • InputBitStream(const char* data, size_t n): 构造函数,使用指定的数据和大小初始化流。
    • is_end(): 检查是否已到达流的末尾。
    • bypass(size_t bytes): 将读取位置移动指定数量的字节。
    • tellp(): 返回当前读取位置。
    • seekp(size_t pos): 将读取位置移动到指定位置。
    • get_buffer(): 返回指向缓冲区的指针。
    • check_overflow(size_t bytes): 检查读取是否会超出流的末尾。
    • read(void* dest, size_t bytes): 从流中读取指定数量的字节。
    • read(std::string& str): 从流中读取字符串。
    • read_string(): 从流中读取字符串。
    • read() (NetSerializable): 使用 net_serialize 方法从流中读取可网络序列化的对象。
    • read(T& ref) (NetSerializable): 使用 net_serialize 方法从流中读取可网络序列化的对象。
    • read() (arithmetic): 从流中读取算术类型的数据。
    • read(T& ref) (arithmetic): 从流中读取算术类型的数据。
    • read(std::vector& v): 从流中读取 std::vector。首先读取向量的大小,然后读取每个元素。
    • read(std::array& v): 从流中读取 std::array
This post is licensed under CC BY 4.0 by the author.