1. Main函数初始化
ChatServer 同时兼具两种服务器的功能,一种是 TCP
服务器,用于聊天,一种是 gRPC 服务器,用于消息的转发。
所以初始化的时候就需要两种初始化,分别跑在不同的线程里。之前是让
ioc.run() 单独跑在一个线程里的,这次让 gRPC
的监听单独跑在一个线程里:
1 2 3 4 5 6 7 8 9 10 11 ChatServiceImpl service; grpc::ServiceBuilder builder; // 监听端口和添加服务 builder.AddListeningPort(server_address); builder.RegisterService(service); // 构建并启动gRPC服务器 unique_ptr<grpc::Server> server(builder.BuildAndStart()); // 单独启动一个线程处理grpc服务 std::thread grpc_server_thread([&server](){ server.Wait(); });
然后再初始化 boost::io_context:
1 2 3 4 5 6 7 8 9 10 boost::asio::io_context ioc; boost::signal_set signals (ioc, SIGINT, SIGTERM) ;signals.async_wait ([&ioc, pool, &server](){ ioc.stop (); pool->stop; server->Shutdown; }); CServer s (ioc, port_str) ;ioc.run ();
由于 StatusServer 需要从 Redis 中读取 TCP
服务器的连接数,所以在最开始时要往 Redis 中写入连接数:
1 2 //将登录数设置为0 RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name,"0");
服务器关闭后要从 Redis 中删除:
1 2 RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name); RedisMgr::GetInstance()->Close();
2. 数据的接收和逻辑处理
记住,一定是 CSession 中解析出 data(通过 tlv 协议解析的),然后把
recv_node 节点交给逻辑类去处理(与 GateServer 不同,ChatServer
的逻辑类单独开了一个线程)。先来看一下 ChatServer 中 CSession
类的成员函数和成员变量,这和 GateServer 中存在较大差别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class CSession : public std::enable_shared_from_this<CSession>{ public : CSession (boost::asio::io_context& io_context, CServer* server); ~CSession (); tcp::socket& GetSocket () ; std::string& GetSessionId () ; void SetUserId (int uid) ; int GetUserId () ; void Start () ; void Send (char * msg, short max_length, short msgid) ; void Send (std::string msg, short msgid) ; void Close () ; std::shared_ptr<CSession> SharedSelf () ; void AsyncReadBody (int length) ; void AsyncReadHead (int total_len) ; private : void asyncReadFull (std::size_t maxLength, std::function<void (const boost::system::error_code& , std::size_t )> handler) ; void asyncReadLen (std::size_t read_len, std::size_t total_len,std::function<void (const boost::system::error_code&, std::size_t )> handler) ; void HandleWrite (const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) ; tcp::socket _socket; std::string _session_id; char _data[MAX_LENGTH]; CServer* _server; bool _b_close; std::queue<shared_ptr<SendNode> > _send_que; std::mutex _send_lock; std::shared_ptr<RecvNode> _recv_msg_node; bool _b_head_parse; std::shared_ptr<MsgNode> _recv_head_node; int _user_uid; };
CSession 是怎么读数据的?
首先来看消息节点:
解析的数据放在 msg_node 中,msg_node 包含以下成员变量:
1 2 3 short _cur_len;short _total_len;char * _data;
而为了区分接收节点和发送节点,还派生了 recv_node 和
send_node,两个子类都添加了 msg_id
这个成员变量。除此之外,还封装了用于逻辑线程的逻辑节点,这个节点由
recv_node 和 解析这个 recv_node 的 csession 组成:
1 2 3 4 5 6 7 8 class LogicNode { friend class LogicSystem; public: LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>); private: shared_ptr<CSession> _session; shared_ptr<RecvNode> _recvnode; };
主要有以下几个函数:
asyncReadLen 读指定长度的字节数,主要通过 _socket.async_read_some
这个函数,通过 read_len 和 total_len 来控制读取,数据是读到 CSession
自己的 _data 里面的,后续要把这个数据复制到 CSession
的成员节点中。注意,参数里面有个回调函数,这个回调函数其实是在
AsyncReadHead 和 AsyncReadBody 中定义的 lambda 表达式。
asyncReadFull 读之前把 buffer 清空,相当于开始一次新的读,然后调用
asyncReadLen
AsyncReadHead 用于读取头部的 ID 和
LEN,数据存放在_recv_head_node,这里涉及到字节序的转换:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 _recv_head_node->Clear (); memcpy (_recv_head_node->_data, _data, bytes_transfered);short msg_id = 0 ;memcpy (&msg_id, _recv_head_node->_data, HEAD_ID_LEN);msg_id = boost::asio::detail::socket_ops::network_to_host_short (msg_id); short msg_len = 0 ;memcpy (&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN);msg_len = boost::asio::detail::socket_ops::network_to_host_short (msg_len); _recv_msg_node = make_shared <RecvNode>(msg_len, msg_id); AsyncReadBody (msg_len);
AsyncReadBody 用于读取消息体,数据存放在 _recv_msg_node 中:
1 2 3 4 5 6 7 8 memcpy (_recv_msg_node->_data , _data , bytes_transfered);_recv_msg_node->_cur_len += bytes_transfered; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0' ; cout << "receive data is " << _recv_msg_node->_data << endl; LogicSystem::GetInstance ()->PostMsgToQue (make_shared <LogicNode>(shared_from_this (), _recv_msg_node)); AsyncReadHead (HEAD_TOTAL_LEN);
AsyncReadBody 读完数据之后,会把 _recv_msg_node
这个节点投放到消息队列当中,注意:并不是直接 push,这个消息队列是一个
LogicNode 组成的队列,LogicNode 包含两个成员变量,一个是
CSession,一个是 _recvnode,而这个 _recvnode 才是 AsyncReadBody 中读到的
_recv_msg_node。消息队列会取出最原始的 msg_node 和 msg_id。
逻辑线程有一个 DealMsg 的成员函数,这个函数会一直循环地从队列中取出
recv_node。这个 DealMsg
函数会进行两次判断,首先判断消息队列是否为空,如果消息队列为空那么就使用条件变量阻塞,当
PostMsgToQue
执行后会唤醒一个条件变量;然后会判断服务器是否要关闭,如果要关闭就把队列里面的消息节点依次取出执行完成。
如果没有关服并且服务器里面有消息节点,那么就要开始取出消息节点并且解析,解析出消息
ID 后根据 ID 调用注册函数。注册函数及其对应的 ID 存放在一个 map 里,map
的键是 ID,map 的值是 function 对象。由于 function 对象是通过 ChatServer
的成员函数绑定的,所以需要设置占位符来传递参数到实际的成员函数处:
1 2 3 4 5 6 7 8 9 10 11 12 auto msg_node = _msg_que.front ();cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl; auto call_back_iter = _fun_callbacks.find (msg_node->_recvnode->_msg_id);if (call_back_iter == _fun_callbacks.end ()) { _msg_que.pop (); std::cout << "msg id [" << msg_node->_recvnode->_msg_id << "] handler not found" << std::endl; continue ; } call_back_iter->second (msg_node->_session, msg_node->_recvnode->_msg_id, std::string (msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len)); _msg_que.pop ();
逻辑线程是怎么处理数据的?
逻辑线程有5个注册函数:
LoginHandler:登录的时候要更新服务器的连接数和在 Redis 中添加
uid-server ip
SearchInfo:
AddFriendApply:用于申请好友,首先看本机有没有,没有再通过 Redis
查找 server,然后通过 gRPC 转发
AuthFriendApply:用于认证好友,在认证的时候首先要更新数据库,然后路由服务器
DealChatTextMsg:由于消息已经在接受的时候解析了,所以直接转发 Json
对象就可以了。依然是先查询本机,然后路由服务器
3. gRPC接口的实现
首先查看 proto 文件中有哪些参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 service ChatService { rpc NotifyAddFriend(AddFriendReq) returns (AddFriendRsp) {} rpc RplyAddFriend(RplyFriendReq) returns (RplyFriendRsp) {} rpc SendChatMsg(SendChatMsgReq) returns (SendChatMsgRsp) {} rpc NotifyAuthFriend(AuthFriendReq) returns (AuthFriendRsp) {} rpc NotifyTextChatMsg(TextChatMsgReq) returns (TextChatMsgRsp){} } message RplyFriendReq { int32 rplyuid = 1; bool agree = 2; int32 touid = 3; } message RplyFriendRsp { int32 error = 1; int32 rplyuid = 2; int32 touid = 3; } message SendChatMsgReq{ int32 fromuid = 1; int32 touid = 2; string message = 3; } message SendChatMsgRsp{ int32 error = 1; int32 fromuid = 2; int32 touid = 3; } message AuthFriendReq{ int32 fromuid = 1; int32 touid = 2; } message AuthFriendRsp{ int32 error = 1; int32 fromuid = 2; int32 touid = 3; } message TextChatMsgReq { int32 fromuid = 1; int32 touid = 2; repeated TextChatData textmsgs = 3; } message TextChatData{ string msgid = 1; string msgcontent = 2; } message TextChatMsgRsp { int32 error = 1; int32 fromuid = 2; int32 touid = 3; repeated TextChatData textmsgs = 4; }
总结为以下5个服务:
通知添加好友
通知验证好友
通知聊天消息
回复添加好友
聊天消息发送
3.1 LoginHandler
在解析登录消息的时候,客户端会把自己的 uid 存放到 json
对象中,CSession 也会随机生成一个 session_id。注意:这个 uid 是存放在
Mysql 中的,回顾一下哪些信息要存放在 Mysql 中?是在注册的时候会存放在
Mysql 中的,请求注册的时候会在 HTTP 的 request.body.data
中携带这些信息
name
email
pwd
icon
然后会调用 Mysql 封装的函数进行注册,注册过程中会先从一张 user_id
的表中获取 id 值,并且把这个 id 值进行 +1 操作,然后把这个 id
值连同用户的name
, email
, pwd
,
nickname
, icon
一同写入到 user
表中。注册完之后会返回 uid,然后写入到 json 对象中:
1 2 3 4 5 6 7 8 9 10 root["error" ] = 0 ; root["uid" ] = uid; root["email" ] = email; root ["user" ]= name; root["passwd" ] = pwd; root["confirm" ] = confirm; root["icon" ] = icon; root["varifycode" ] = src_root["varifycode" ].asString (); std::string jsonstr = root.toStyledString (); beast::ostream (connection->_response.body ()) << jsonstr;
这样注册完之后用户就知道自己的 uid 了。然后连接的时候会携带这个 uid
发送给 ChatServer。
ChatServer 在接收到 LoginHandler 的 msg_id 的时候,会从数据库中通过
uid 来获取客户端的完全信息(先从 Redis 中获取,如果 Redis 中没有,则去
Mysql 查询,查询到后还要写入到 Redis 中)。然后再通过 uid
来从数据库获取申请列表,以及获取好友列表。这个时候 ChatServer 会从 Redis
中获取自己的连接数,然后把这个连接数 +1 并再次存到 Redis 中:
1 2 3 4 5 6 7 8 auto rd_res = RedisMgr::GetInstance ()->HGet (LOGIN_COUNT, server_name);int count = 0 ;if (!rd_res.empty ()) { count = std::stoi (rd_res); } count++; auto count_str = std::to_string (count);RedisMgr::GetInstance ()->HSet (LOGIN_COUNT, server_name, count_str);
最后还需要把用户的 id 和 session
绑定起来,并且存储起来,这样后续可以直接根据 uid 在 Redis
中查询其对应的服务器:
1 2 3 4 5 session->SetUserId (uid); std::string ipkey = USERIPPREFIX + uid_str; RedisMgr::GetInstance ()->Set (ipkey, server_name);
3.1 NotifyAddFriend
首先根据 proto 文件查看请求和响应各有哪些参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 message AddFriendReq { int32 applyuid = 1; string name = 2; string desc = 3; string icon = 4; string nick = 5; int32 sex = 6; int32 touid = 7; } message AddFriendRsp { int32 error = 1; int32 applyuid = 2; int32 touid = 3; }
然后是逻辑类中的成员函数:
把 msg_node 中的 data 字符串解析成 json 对象
解析完了之后要先更新数据库,为什么要更新数据库?
根据客户端的 uid 去 Redis 中查找对应的 IP,什么时候存储了
uid 和 IP?