0%

聊天项目(二) StatusServer

1. Main函数的初始化

StatusServer 主要用作 GateServer 的 gRPC 服务器以及 ChatServer 的 gRPC 的客户端。

主函数是如何初始化 gRPC 服务器的?

在主函数里面,只需要启动 gRPC 的服务端就可以了。作为服务端,肯定是要一直处于监听状态,那么就需要一个上下文和一个监听端点:

  • 这里的端点就是服务器自己设定的端口号,在 init 文件初始化的 50052,而 IP 是所有地址的 IP 都可以监听,所以是 0.0.0.0,这样 IP 和 Port 组成了一个地址(端点)传给这个 builder。

  • 这里的上下文和客户端的 ClientContext 有区别,用的是 builder 来开始和监听服务的。而服务的具体实现需要通过一个类来实例化,这个实例包含了 proto 文件中定义的 rpc 接口。

首先来看如何实现这个服务接口的。StatusServer 定义了两个 gRPC 接口:

  • GetChatServer
  • Login

它们在 proto 文件中的定义如下所示,每一个都会定义一个请求消息和响应消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
service StatusService {
rpc GetChatServer (GetChatServerReq) returns (GetChatServerRsp) {}
rpc Login(LoginReq) returns(LoginRsp);
}

message GetChatServerReq {
int32 uid = 1;
}
message GetChatServerRsp {
int32 error = 1;
string host = 2;
string port = 3;
string token = 4;
}
message LoginReq{
int32 uid = 1;
string token= 2;
}
message LoginRsp {
int32 error = 1;
int32 uid = 2;
string token = 3;
}

知道上述两个接口之后,就要定义一个类 StatusServiceImpl 来从 gRPC 生成的 StatusService::Service 中继承,实现两个接口函数:

  • GetChatServer
  • Login
1
2
3
4
5
6
7
8
9
10
11
12
13
class StatusServiceImpl final : public StatusService::Service
{
public:
StatusServiceImpl();
Status GetChatServer(ServerContext* context, const GetChatServerReq* request, GetChatServerRsp* reply) override;
Status Login(ServerContext* context, const LoginReq* request, LoginRsp* reply) override;
private:
void insertToken(int uid, std::string token);
ChatServer getChatServer();
std::unordered_map<std::string, ChatServer> _servers;
std::mutex _server_mtx;

};

回到主函数,我们用 grpc::Server 定义了一个 Server,并且通过 builder 将其绑定到指定的端点以及注册相应的服务(一个 StatusServiceImpl 对象),这样启动了 gRPC 服务器之后客户端就能够远程调用了:

1
2
3
4
5
6
7
8
9
10
11
std::string server_address(cfg["StatusServer"]["Host"]+":"+ cfg["StatusServer"]["Port"]);
StatusServiceImpl service;

grpc::ServerBuilder builder;
// 监听端口和添加服务
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);

// 构建并启动gRPC服务器
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;

主函数是如何优雅退出的?

状态服务器的优雅退出和网关服务器是一样的,都需要通过 asio 库定义一个上下文和一个信号组,信号组需要处于一个异步非阻塞监听的状态,而上下文用于轮询信号是否产生:

1
2
3
4
5
6
boost::asio::io_context ioc;
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&server, &io_context](){
server->shutdown();
ioc.stop();
});

注意:如果没有调用 ioc.stop(),那么主函数会一直阻塞在 ioc.run() 中,而调用了 ioc.stop() 之后,就不会再去轮询事件了,由于使用了伪闭包的技术,那么在各个线程的程序执行完之后,主线程的智能指针引用计数为 0,那么服务器可以优雅退出。

StatusServer 的 ioc.run() 和 GateServer 中的不太一样,GateServer 中的 ioc.run() 是在主线程中执行的,而 StatusServer 中把主线程的轮询分配给了 gRPC 服务器,所以单独启了一个线程给 ioc 来跑监听 signals:

1
2
3
4
5
// 在单独的线程中运行io_context
std::thread([&io_context]() { io_context.run(); }).detach();

// 等待服务器关闭
server->Wait();

2. gRPC服务的重写

StatusServer 有两个 gRPC 服务:

  • GetChatServer
  • Login

记住,gRPC 请求的参数和返回的参数都是定义在 proto 文件中的,所有的 response 消息都有一个 error:

  • GetChatServer 的 request 参数:
    • uid
  • GetChatServer 的 response 是一个 ChatServer 对象,包含以下参数:
    • host
    • port
    • token
    • error
  • Login 的 request 参数:
    • uid
    • token
  • Login 的 response 参数:
    • uid
    • token
    • error

如何来访问这些变量呢?通过成员函数的方式来访问,比如,GetChatServerRsp.host()。

2.1 GetChatServer

1
2
3
4
5
6
7
8
9
10
11
Status StatusServiceImpl::GetChatServer(ServerContext* context, const GetChatServerReq* request, GetChatServerRsp* reply)
{
std::string prefix("llfc status server has received : ");
const auto& server = getChatServer();
reply->set_host(server.host);
reply->set_port(server.port);
reply->set_error(ErrorCodes::Success);
reply->set_token(generate_unique_string());
insertToken(request->uid(), reply->token());
return Status::OK;
}

注意:GetChatServer 是 gRPC 定义的接口函数,这个接口函数调用了自定义的一个 getChatServer 函数,用于获取可用的 server 的信息,并且把这些信息写入了 gRPC 的 response 中,这个 response 包含了 proto 文件中定义的4个参数,分别是host, port, token, error

那么这个 getChatServer 如何实现呢?

首先,在 StatusServiceImpl 的构造函数中,会从 config.ini 中解析出两台 TCP 服务器的 IP 和 Port 以及服务器 name:

1
2
3
4
5
6
7
8
9
10
11
12
13
for (auto& word : words) {
if (cfg[word]["Name"].empty()) {
continue;
}
//定义了一个 ChatServer 对象,里面包含了 host name port con_count
//然后把这个 server 和名字作为键值对存储到 ServiceImpl 这个对象的哈希表 _servers 中
//server 名字作为键,server 对象作为值
ChatServer server;
server.port = cfg[word]["Port"];
server.host = cfg[word]["Host"];
server.name = cfg[word]["Name"];
_servers[server.name] = server;
}

然后 getChatServer 就是要获取到连接数最小的 server。server 的连接数都是存在 Redis 中的,为什么存在 Redis 中?因为 ChatServer 的连接数只有自己才能更新,而 StatusServer 不知道当前 ChatServer 中有多少个连接数,所以 ChatServer 需要向 Redis 中写入自己连接数,这样 StatusServer 直接就能够通过访问 Redis 基于服务器的名字来获取 连接数。使用 for 循环遍历 _servers 哈希表,去最小的一个 server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 使用范围基于for循环
for ( auto& server : _servers) {

if (server.second.name == minServer.name) {
continue;
}
//从 Redis 中获取服务器的连接数
auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name);
if (count_str.empty()) {
server.second.con_count = INT_MAX;
}
else {
server.second.con_count = std::stoi(count_str);
}

if (server.second.con_count < minServer.con_count) {
minServer = server.second;
}
}

return minServer;

2.2 Login

Login 这个 gRPC 接口函数是用于登录的 token 验证的。这里先暂时不管。