0%

聊天项目(三) ChatServer

1. Main函数初始化

ChatServer 同时兼具两种服务器的功能,一种是 TCP 服务器,用于聊天,一种是 gRPC 服务器,用于消息的转发。

所以初始化的时候就需要两种初始化,分别跑在不同的线程里。之前是让 ioc.run() 单独跑在一个线程里的,这次让 gRPC 的监听单独跑在一个线程里:

1
2
3
4
5
6
7
8
9
10
11
ChatServiceImpl service;
grpc::ServiceBuilder builder;
// 监听端口和添加服务
builder.AddListeningPort(server_address);
builder.RegisterService(service);
// 构建并启动gRPC服务器
unique_ptr<grpc::Server> server(builder.BuildAndStart());
// 单独启动一个线程处理grpc服务
std::thread grpc_server_thread([&server](){
server.Wait();
});

然后再初始化 boost::io_context:

1
2
3
4
5
6
7
8
9
10
boost::asio::io_context ioc;
boost::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&ioc, pool, &server](){
ioc.stop();
pool->stop;
server->Shutdown;
});
// 只需要传入 port 地址,任意 IP 都可以监听
CServer s(ioc, port_str);
ioc.run();

由于 StatusServer 需要从 Redis 中读取 TCP 服务器的连接数,所以在最开始时要往 Redis 中写入连接数:

1
2
//将登录数设置为0
RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name,"0");

服务器关闭后要从 Redis 中删除:

1
2
RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name);
RedisMgr::GetInstance()->Close();

2. 数据的接收和逻辑处理

记住,一定是 CSession 中解析出 data(通过 tlv 协议解析的),然后把 recv_node 节点交给逻辑类去处理(与 GateServer 不同,ChatServer 的逻辑类单独开了一个线程)。先来看一下 ChatServer 中 CSession 类的成员函数和成员变量,这和 GateServer 中存在较大差别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class CSession: public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& io_context, CServer* server);
~CSession();
tcp::socket& GetSocket();
std::string& GetSessionId();
void SetUserId(int uid);
int GetUserId();
void Start();
void Send(char* msg, short max_length, short msgid);
void Send(std::string msg, short msgid);
void Close();
std::shared_ptr<CSession> SharedSelf();
void AsyncReadBody(int length);
void AsyncReadHead(int total_len);
private:
void asyncReadFull(std::size_t maxLength, std::function<void(const boost::system::error_code& , std::size_t)> handler);
void asyncReadLen(std::size_t read_len, std::size_t total_len,std::function<void(const boost::system::error_code&, std::size_t)> handler);
void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self);
tcp::socket _socket;
std::string _session_id;
char _data[MAX_LENGTH]; //存储读取的数据
CServer* _server;
bool _b_close;
std::queue<shared_ptr<SendNode> > _send_que;
std::mutex _send_lock;

//收到的消息结构
std::shared_ptr<RecvNode> _recv_msg_node;
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;
int _user_uid;
};

CSession 是怎么读数据的?

首先来看消息节点:

解析的数据放在 msg_node 中,msg_node 包含以下成员变量:

1
2
3
short _cur_len;
short _total_len;
char* _data;

而为了区分接收节点和发送节点,还派生了 recv_node 和 send_node,两个子类都添加了 msg_id 这个成员变量。除此之外,还封装了用于逻辑线程的逻辑节点,这个节点由 recv_node 和 解析这个 recv_node 的 csession 组成:

1
2
3
4
5
6
7
8
class LogicNode {
friend class LogicSystem;
public:
LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>);
private:
shared_ptr<CSession> _session;
shared_ptr<RecvNode> _recvnode;
};

主要有以下几个函数:

  • asyncReadLen 读指定长度的字节数,主要通过 _socket.async_read_some 这个函数,通过 read_len 和 total_len 来控制读取,数据是读到 CSession 自己的 _data 里面的,后续要把这个数据复制到 CSession 的成员节点中。注意,参数里面有个回调函数,这个回调函数其实是在 AsyncReadHead 和 AsyncReadBody 中定义的 lambda 表达式。
  • asyncReadFull 读之前把 buffer 清空,相当于开始一次新的读,然后调用 asyncReadLen
  • AsyncReadHead 用于读取头部的 ID 和 LEN,数据存放在_recv_head_node,这里涉及到字节序的转换:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
_recv_head_node->Clear();
//拷贝数据到这个头部节点里面
memcpy(_recv_head_node->_data, _data, bytes_transfered);

//获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);

short msg_len = 0;
//这样 msg_len 就是消息的长度
memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);

//开启消息体的读取
//!!!注意这里传入了 msg_id 来构造接收节点!!!
_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);
AsyncReadBody(msg_len);
  • AsyncReadBody 用于读取消息体,数据存放在 _recv_msg_node 中:
1
2
3
4
5
6
7
8
memcpy(_recv_msg_node->_data , _data , bytes_transfered);
_recv_msg_node->_cur_len += bytes_transfered;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处将消息投递到逻辑队列中
LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node));
//继续监听头部接受事件
AsyncReadHead(HEAD_TOTAL_LEN);

AsyncReadBody 读完数据之后,会把 _recv_msg_node 这个节点投放到消息队列当中,注意:并不是直接 push,这个消息队列是一个 LogicNode 组成的队列,LogicNode 包含两个成员变量,一个是 CSession,一个是 _recvnode,而这个 _recvnode 才是 AsyncReadBody 中读到的 _recv_msg_node。消息队列会取出最原始的 msg_node 和 msg_id。

逻辑线程有一个 DealMsg 的成员函数,这个函数会一直循环地从队列中取出 recv_node。这个 DealMsg 函数会进行两次判断,首先判断消息队列是否为空,如果消息队列为空那么就使用条件变量阻塞,当 PostMsgToQue 执行后会唤醒一个条件变量;然后会判断服务器是否要关闭,如果要关闭就把队列里面的消息节点依次取出执行完成。

如果没有关服并且服务器里面有消息节点,那么就要开始取出消息节点并且解析,解析出消息 ID 后根据 ID 调用注册函数。注册函数及其对应的 ID 存放在一个 map 里,map 的键是 ID,map 的值是 function 对象。由于 function 对象是通过 ChatServer 的成员函数绑定的,所以需要设置占位符来传递参数到实际的成员函数处:

1
2
3
4
5
6
7
8
9
10
11
12
//如果没有停服,且说明队列中有数据
auto msg_node = _msg_que.front();
cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl;
auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id);
if (call_back_iter == _fun_callbacks.end()) {
_msg_que.pop();
std::cout << "msg id [" << msg_node->_recvnode->_msg_id << "] handler not found" << std::endl;
continue;
}
call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id,
std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len));
_msg_que.pop();

逻辑线程是怎么处理数据的?

逻辑线程有5个注册函数:

  • LoginHandler:登录的时候要更新服务器的连接数和在 Redis 中添加 uid-server ip
  • SearchInfo:
  • AddFriendApply:用于申请好友,首先看本机有没有,没有再通过 Redis 查找 server,然后通过 gRPC 转发
  • AuthFriendApply:用于认证好友,在认证的时候首先要更新数据库,然后路由服务器
  • DealChatTextMsg:由于消息已经在接受的时候解析了,所以直接转发 Json 对象就可以了。依然是先查询本机,然后路由服务器

3. gRPC接口的实现

首先查看 proto 文件中有哪些参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
service ChatService {
rpc NotifyAddFriend(AddFriendReq) returns (AddFriendRsp) {}
rpc RplyAddFriend(RplyFriendReq) returns (RplyFriendRsp) {}
rpc SendChatMsg(SendChatMsgReq) returns (SendChatMsgRsp) {}
rpc NotifyAuthFriend(AuthFriendReq) returns (AuthFriendRsp) {}
rpc NotifyTextChatMsg(TextChatMsgReq) returns (TextChatMsgRsp){}
}

message RplyFriendReq {
int32 rplyuid = 1;
bool agree = 2;
int32 touid = 3;
}
message RplyFriendRsp {
int32 error = 1;
int32 rplyuid = 2;
int32 touid = 3;
}

message SendChatMsgReq{
int32 fromuid = 1;
int32 touid = 2;
string message = 3;
}
message SendChatMsgRsp{
int32 error = 1;
int32 fromuid = 2;
int32 touid = 3;
}

message AuthFriendReq{
int32 fromuid = 1;
int32 touid = 2;
}
message AuthFriendRsp{
int32 error = 1;
int32 fromuid = 2;
int32 touid = 3;
}

message TextChatMsgReq {
int32 fromuid = 1;
int32 touid = 2;
repeated TextChatData textmsgs = 3;
}
message TextChatData{
string msgid = 1;
string msgcontent = 2;
}

message TextChatMsgRsp {
int32 error = 1;
int32 fromuid = 2;
int32 touid = 3;
repeated TextChatData textmsgs = 4;
}

总结为以下5个服务:

  • 通知添加好友
  • 通知验证好友
  • 通知聊天消息
  • 回复添加好友
  • 聊天消息发送

3.1 LoginHandler

在解析登录消息的时候,客户端会把自己的 uid 存放到 json 对象中,CSession 也会随机生成一个 session_id。注意:这个 uid 是存放在 Mysql 中的,回顾一下哪些信息要存放在 Mysql 中?是在注册的时候会存放在 Mysql 中的,请求注册的时候会在 HTTP 的 request.body.data 中携带这些信息

  1. name
  2. email
  3. pwd
  4. icon

然后会调用 Mysql 封装的函数进行注册,注册过程中会先从一张 user_id 的表中获取 id 值,并且把这个 id 值进行 +1 操作,然后把这个 id 值连同用户的name, email, pwd, nickname, icon 一同写入到 user 表中。注册完之后会返回 uid,然后写入到 json 对象中:

1
2
3
4
5
6
7
8
9
10
root["error"] = 0;
root["uid"] = uid;
root["email"] = email;
root ["user"]= name;
root["passwd"] = pwd;
root["confirm"] = confirm;
root["icon"] = icon;
root["varifycode"] = src_root["varifycode"].asString();
std::string jsonstr = root.toStyledString();
beast::ostream(connection->_response.body()) << jsonstr;

这样注册完之后用户就知道自己的 uid 了。然后连接的时候会携带这个 uid 发送给 ChatServer。

ChatServer 在接收到 LoginHandler 的 msg_id 的时候,会从数据库中通过 uid 来获取客户端的完全信息(先从 Redis 中获取,如果 Redis 中没有,则去 Mysql 查询,查询到后还要写入到 Redis 中)。然后再通过 uid 来从数据库获取申请列表,以及获取好友列表。这个时候 ChatServer 会从 Redis 中获取自己的连接数,然后把这个连接数 +1 并再次存到 Redis 中:

1
2
3
4
5
6
7
8
auto rd_res = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server_name);
int count = 0;
if (!rd_res.empty()) {
count = std::stoi(rd_res);
}
count++;
auto count_str = std::to_string(count);
RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, count_str);

最后还需要把用户的 id 和 session 绑定起来,并且存储起来,这样后续可以直接根据 uid 在 Redis 中查询其对应的服务器:

1
2
3
4
5
//session绑定用户uid
session->SetUserId(uid);
//为用户设置登录ip server的名字
std::string ipkey = USERIPPREFIX + uid_str;
RedisMgr::GetInstance()->Set(ipkey, server_name);

3.1 NotifyAddFriend

首先根据 proto 文件查看请求和响应各有哪些参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
message AddFriendReq {
int32 applyuid = 1;
string name = 2;
string desc = 3;
string icon = 4;
string nick = 5;
int32 sex = 6;
int32 touid = 7;
}
message AddFriendRsp {
int32 error = 1;
int32 applyuid = 2;
int32 touid = 3;
}

然后是逻辑类中的成员函数:

  1. 把 msg_node 中的 data 字符串解析成 json 对象
  2. 解析完了之后要先更新数据库,为什么要更新数据库?
  3. 根据客户端的 uid 去 Redis 中查找对应的 IP,什么时候存储了 uid 和 IP?