0%

聊天项目(一) GateServer

1. Main函数的初始化

GateServer 既作为服务器,又作为客户端:

  • 作为聊天客户端的网关服务器
  • 作为gRPC的客户端,gRPC的客户端比较简单,不需要在 Main 函数中进行初始化
    • 邮箱验证服务
    • 状态查询服务
  • 作为验证服务的客户端

Main 函数中需要连接 Reids 服务器和 MySQL 服务器:

1
2
MysqlMgr::GetInstance();
RedisMgr::GetInstance();

Main 函数中需要实现上下文的初始化和服务器的优雅关闭:

  • 需要一个上下文服务来轮询事件,只负责关闭信号和 acceptor(注意并没有从服务池里面取): net::io_context ioc
  • 需要设置信号来优雅关闭服务器:boost::asio::signals_set signals(ioc, SIGINT, SIGTERM) 以及 signals.async_wait()

Main 函数中开启服务器:

1
2
3
4
make_shared<CServer>(ioc, gate_port)->Start();
//在 Start 里面由于会执行异步的监听函数,所以会注册一个事件循环
//所以 ioc 不会退出
ioc.run();

2. CServer

CServer 一共就只有两个函数,一个是构造函数用于初始化 acceptor 这个 socket一个是 Start 函数用于监听并把连接丢给一个新的 CSession的成员变量 socket,在一个新的线程中进行通信。

CServer 在初始化的时候会初始化一个 acceptor 来监听连接。 acceptor 会在指定端口监听连接,要通过绑定端点来实现,任意 IP 地址的连接都能够监听:

1
CServer::CServer(boost::asio::io_context &ioc, unsigned short& port):_ioc(ioc), _acceptor(ioc, endpoint(ip::v4, port)){}

要在 Start 函数中监听连接,并通过 acceptor 把新来的连接派发给不同的上下文,这些都交给 CSession 类来管理(包含 socket),因此需要在 CServer 的函数中创建一个 CSession 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CServer::Start(){
//从池中取一个上下文,这是一个服务池,本质上是一个线程池,每一个线程都跑一个服务
auto& ioc = AsioIOServicePool::GetInstance()->GetIOService();
//创建一个连接对象管理连接
shared_ptr<CSession> new_con = make_shared<CSession>(ioc);
//通过 lambda 表达式注册回调函数
//self 很关键,在值捕获的时候能够使智能指针的引用技术加 1,所以能够实现伪闭包的操作
auto self = shared_from_this(); // 需要在定义类的时候继承 enabled_shared_from_this
_acceptor.async_wait(new_con->socket, [self, new_con](ec){
//这个 new_con 运行在一个单独的线程中,因为它的上下文 ioc 是在一个线程池中创建的
new_con->Start();
//继续监听连接
self->Start();
})
}

注意:std::make_shared 直接创建并返回一个 std::shared_ptr 对象,相较于 new ,其拥有更高的效率,只涉及到一次分配(分配 T 类型的内存),而 new 需要分配两次,(一次分配 T 类型的内存,一次分配 shared_ptr 的内存)。

3. HttpConnection

CServer 接收到连接之后,网关服务器通过 HttpConnection 和客户端建立连接。是在 CSession 中和客户端进行 HTTP 通信的,而在建立 CSession 连接之前是通过 TCP 进行通信的。HttpConnection 的成员函数:

  • Start
  • PreParseGetParam
  • HandReq
  • WriteResponse
  • CheckDeadline

3.1 HTTP 接收

因为客户端发送的是数据是基于 HTTP 协议的,所以不需要像后面对 TCP 进行手动的切包操作,而是直接使用 beast 库进行读取就可以了。beast 库提供了两个类,一个类是 http::request<heetp::dynamic_body>, 一个类是http::response<http::dynamic_body> ,分别是请求报文和响应报文。而数据存放在哪呢?HTTP 协议有请求行/状态行、消息头/消息体,而数据就包含在消息体中。如何获取到数据呢?通过成员变量的方式直接获取 requst.body().data()。data的数据是什么格式呢?data的数据是一块 buffer ,转成字符串之后是一个 json 对象序列化后的字符串。

1
2
3
4
5
6
7
8
9
10
11
12
void CSession::Start()
{
auto self = shared_from_this();
http::async_read(_socket, _buffer, _request, [self](beast::error_code ec,
std::size_t bytes_transferred) {
//处理读到的数据
boost::ignore_unused(bytes_transferred);
self->HandleReq();
self->CheckDeadline();
}
);
}

3.2 HTTP 的解析和注册函数调用

读到数据之后需要在 HandleReq 中初步处理 HTTP 请求,然后会解析出请求类别,并在回调函数中进行数据的处理。HandleReq 的解析过程中,需要一边解析一边设置响应报文的 状态行消息头, 和 消息体 。要如何处理呢?主要是根据 HTTP 的 method 来进行解析,解析完之后交给逻辑类进行处理(这里不需要再为逻辑类单独创建线程了,因为本来服务就是跑在多个线程的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void CSession::HandleReq(){
//设置版本
_response.version(_request.version());
//设置为短链接
_response.keep_alive(false);
if(_requeset.method() == http::verb::get){
//!!!! HTTP URL 的解析 !!!
PreParseGetParam();
//交给逻辑类中的 HandleGet 处理
bool success = LogicSystem::GetInstance()->GetInstance()->HandleGet(_get_url, shared_from_this());
//设置响应报文的状态
_response.result(http::status::ok); // 设置状态码为200 OK
_response.set(http::field::server, "GateServer"); // 设置服务器头为GateServer
//!!! HTTP !!!的接收和响应一定是都在 CSession 中完成的
//不要在逻辑类中取调用这个 WriteResponse
WriteResponse();
}
else if(_requeset.method() == http::verb::post){
//直接从逻辑类取函数就行了,不需要解析 URL
bool success = LogicSystem::GetInstance()->HandlePost(_request.target(), shared_from_this());
WriteResponse();
}
}

这里需要设置响应报文的几个状态,记住:

  • 设置版本
  • 设置为短链接
  • 设置状态码:200 ok
  • 设置服务器头,也就是 Host
  • 设置错误码:如 gRPC是否调用成功
  • 设置内容长度(在调用 http::async_write 之前才统计长度)

注意:这里只有当 get 请求的时候才会调用 URL 解析函数,而 post 请求的时候直接调用回调函数就可以了。 这个聊天项目中的 request.target() 就是 URI ,并且没有设置参数,返回?前的字符串就得到了 URL 。HTTP 的路由解析函数:

什么是URI?什么又是URL?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1. 提取URI:
- 从HTTP请求中获取目标URI(_request.target())。
2. 查找查询字符串起始位置:
- 使用find方法找到查询字符串开始的位置(即?的位置)。
3. 分离路径与查询字符串:
- 如果没有查询字符串(即没有?),则整个URI作为路径存储到_get_url中。
- 如果存在查询字符串,则分割URI为路径部分和查询字符串部分,并分别存储到_get_url和query_string变量中。
4. 解析查询字符串:
- 使用find方法查找&字符的位置,这标志着一个键值对的结束。
- 每次循环中,从当前查询字符串中截取出一个键值对。
- 使用find方法找到等号(=)的位置,将键值对分割成键和值两部分。
- 对键和值进行URL解码(假设有一个UrlDecode函数)。
- 将解码后的键值对存储到_get_params哈希表中。
- 移除已处理过的键值对及其后面的&字符,继续处理剩余的查询字符串。
5. 处理最后一个键值对:
- 如果查询字符串中没有&字符,则认为这是最后一个键值对,对其进行相同的处理。

3.3 HTTP 注册的回调函数

需要根据 HTTP 的路由来判断客户端的请求类别,这些请求的执行函数需要提前在逻辑类的初始化函数中注册(怎么注册呢?通过哈希表来存储这些{请求,函数}键值对),这样才能够调用。包含以下请求:

  • “/get_varifycode”
  • “/user_register”
  • “/reset_pwd”
  • “/user_login”

以下是这4个回调函数的具体代码逻辑。

3.3.1 get_varifycode

GateServer 作为客户端向 gRPC 服务端发送获取验证码的请求,包含且仅包含 GetVerifyCode 这个函数。

首先是解析 json 字符串,需要把请求的 json 对象解析到新的 json 对象,这需要用到 reader 这个类:

1
2
3
4
5
6
7
8
//设置content type字段
connecion->_response.set(http::field::content_type, "text/json");
//发序列化
Json::Value root;
Json::Reader reader;
Json::Value src_root;
auto body_str = boost::beast::buffers_to_string(connection->_request.body().data());
reader.parse(body_str, src_root);

如何把 json 序列化后的字符串写进 response 中?

1
2
std::string jsonstr = root.toStyledString();
beast::ostream(connection->_response.body()) << jsonstr;

当解析出了 src_root 之后,src_root 中会包含 email 键,那么就通过调用 gRPC 服务去向这个邮箱发送验证码。gRPC客户端远程调用的步骤是什么?首先,调用的过程是很简单的,直接通过客户端的单例去调用:

1
Getresponse rsp = VerifyGrpcClient->GetInstance()->GetVerifyCode(email);

要如何实现这个gRPC的单例呢?

gRPC 的底层本质是基于 TCP 的传输协议,类似于 HTTP。因此,发送的时候也需要一些必要的对象,以下是服务端需要的一些对象:

  • 通过 builder 来构建和开始 gRPC 服务——>io_context
  • 通过 channel 来封装连接——>socket
  • 通过 stub 来建立连接——>endpoint

由于前面存在多个线程在跑 io_context ,因此可能存在多个客户端注册的情况,为了满足高并发的要求,客户端就需要通过连接池来取 stub ,从而向服务端发起连接。整个这个 VerifyGrpcClient 只有一个函数,这个函数中要初始化 gRPC 的各种变量,来远程调用 GetVerifyCode 服务,比如,肯定要把 request 的相关参数进行设置,这样服务端才知道是什么请求。除此之外,在 VerifyGrpcClient 的构造函数中,还需要 new 一个连接池。只需要记住,gRPC 客户端只要两个变量来构建:

  • ClientContext:用来查询调用是否完成,异步非阻塞
  • Stub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GetVarifyRsp GetVarifyCode(std::string email) {
ClientContext context;
GetVarifyRsp reply;
GetVarifyReq request;
request.set_email(email);
auto stub = pool_->getConnection();
Status status = stub->GetVarifyCode(&context, request, &reply);

if (status.ok()) {
//一定要还回去连接,采用了移动构造
pool_->returnConnection(std::move(stub));
return reply;
}
else {
pool_->returnConnection(std::move(stub));
reply.set_error(ErrorCodes::RPCFailed);
return reply;
}
}

返回了这个 GetVarifyRsp 中包含了一些变量,这些变量是在 proto 文件中定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
service VarifyService {
rpc GetVarifyCode (GetVarifyReq) returns (GetVarifyRsp) {}
}

message GetVarifyReq {
string email = 1;
}

message GetVarifyRsp {
int32 error = 1;
string email = 2;
string code = 3;
}

注意:返回的 rsp 包含了3种信息,分别是 error,email,code。但是只有 error 写进了 HTTP 的响应报文中,在 CSession 中通过 WriteResponse 来发送回去。

连接池如何构造呢?

在 VerifyGrpcClient 的构造函数中,需要 new 一个连接池,初始化连接池需要3个参数,包括:pool_size, host, port 。host 和 port 是 gRPC 服务器的 IP 和 端口。这样就能在 gRPC 的构造函数中创建多个连向 server 的 channel,从而能够满足多个 stub 连接的需求。

连接池的主要有3个函数:

  • 构造函数
  • std::unique_ptr< VarifyService::Stub > getConnection():从队列里取 stub ,如果队列为空就条件变量阻塞
  • void returnConnection(std::unique_ptr< VarifyService::Stub > context):往队列里返回 stub,同时唤醒一个条件变量

注意:stub 都是放在 _connections 这个队列中来管理的,用一个的时候就移动出去使用,用完之后再移动回来。所以有可能这个队列是空的,需要使用条件变量阻塞,而一旦有一个 stub 还回去了,就要唤醒一个条件变量。

3.3.2 user_register

注册用户的逻辑是什么?

首先回顾注册界面包含哪些变量:

  • 用户名:name
  • 邮箱:email
  • 密码:pwd
  • 确认密码:confirm
  • 验证码:verify_code

所以首先是先从 HTTP 的 body_str 中解析出这些相关信息,然后开始判断:

  1. 如果 pwd != confirm,返回一个 ERROR 的响应报文。
  2. 从 Redis 中查找验证码是否合理。这个函数的参数包括2个:email, verify_code ,返回一个 bool 值,代表发送成功或者失败,并且这个 email 对应的验证码赋值给了 verify_code 这个变量。然后需要判断这个 verify_code 和用户填的验证码 src_root["verifycode"] 是否相等。
  3. 从 Mysql 中查找用户是否存在。这里直接调用了 MysqlMgr 的 RegUser 函数,这个函数的参数包括4个:name, email, pwd, icon。返回一个 int 型的 uid 值,如果为0或者-1就是存在,返回一个带 ERROR 的 HTTP 响应报文;如果不存在则会在 RegUser 函数中进行注册,并返回其 uid。

上述检查都通过后,返回一个错误码为 0 的 HTTP 响应报文给客户端,其他的信息可添加可不添加。所有的信息都是在 root 这个 json 对象中的,序列化之后写入到 HTTP 的 body 中,然后发送给客户端。

这中间涉及到 Redis 数据库和 Mysql 数据库的操作。下面对这两个数据库的操作进行分析。首先,它们都是通过一个管理类来进行数据的操作的,在管理类中,封装了上层服务器调用的函数,把面向过程的操作封装到了一个管理类里,这种叫做数据访问对象设计模式。

  • Redis操作:

  • Mysql操作:

3.3.3 reset_pwd

重置密码的逻辑是什么?

首先回顾重置界面包含哪些变量:

  • 用户名:name
  • 邮箱:email
  • 更新的密码:pwd

所以首先是先从 HTTP 的 body_str 中解析出这些相关信息,然后开始判断:

  1. 从 Redis 中查找验证码是否合理。这个函数的参数包括2个: email, verify_code ,返回一个 bool 值,代表发送成功或者失败,并且这个 email 对应的验证码赋值给了 verify_code 这个变量。然后需要判断这个 verify_code 和用户填的验证码 src_root["verifycode"] 是否相等。
  2. 从 Mysql 中查找用户名和邮箱是否匹配。这个函数的参数包括2个: email, name ,返回一个 bool 值,代表是否匹配。
  3. 更新 Mysql 中的密码为最新的密码。这个函数的参数包括2个: name, pwd

上述检查都通过后,返回一个错误码为 0 的 HTTP 响应报文给客户端,其他的信息可添加可不添加。所有的信息都是在 root 这个 json 对象中的,序列化之后写入到 HTTP 的 body 中,然后发送给客户端。

3.3.4 user_login

重置密码的逻辑是什么?

首先回顾用户登录包含哪些变量:

  • 邮箱
  • 密码

所以首先是先从 HTTP 的 body_str 中解析出这些相关信息,然后开始判断:

  1. 用户名和密码是否匹配,在 Mysql 中进行查询,并且把查询到的其他信息赋值给 UserInfo。这里用到了一个管理用户信息的数据结构 UserInfo,包含 name, pwd, email, uid
  2. 调用 gRPC 服务获取 TPC 服务器。然后返回的 rsp 中包含了在 proto 文件中定义的消息,包括 host, port, token, error

4. 逻辑类的函数

逻辑类的构造函数中会注册上述四个请求的回调函数,除此之外还包含4个成员函数:

  • RegGet:通过 insert 操作把路由和相对应的回调函数插入到 Get 哈希表中
  • RegPost:通过 insert 操作把路由和相对应的回调函数插入到 Post 哈希表中
  • HandleGet:直接通过 key 从 Get 哈希表中取出回调函数来执行
  • HandlePost:直接通过 key 从 Post 哈希表中取出回调函数来执行

5. 总结

各有哪些地方用到了 Redis 和 Mysql,存储的数据是什么?

首先注意,一定是在请求的调用中才会使用到数据库,而回顾有哪几个请求?

  1. 获取验证码:通过 gRPC 服务调用 Node.js 来完成验证码的发送,不需要从数据库中读取数据
  2. 用户注册:需要去通过邮箱查询验证码对不对,因为在请求发送验证码的时候只用到了 email,所以只需要通过邮箱去匹配,这里用到了 Redis。验证码正确之后还需要判断用户是否存在,避免重复注册,这里就需要用到 name、email、pwd、icon 四个变量,这个是存储在 Mysql 中的。
  3. 用户登录:用户靠邮箱和密码登录,只需要从 Mysql 中判断邮箱、密码是否匹配就可以了。
  4. 密码重置:密码重置需要接收验证码,所以这里会用到 Redis 查询验证码是否有效。然后回检查用户名和邮箱是否匹配,这个是通过 Mysql 进行查询的,然后再在 Mysql 中更新密码。

总结来说,在验证码的收发方面,用到了 Redis,主要存储 email 和 verifycode。而在用户信息方面使用了 Mysql,主要存储了 name、email、pwd、uid。