企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 8) 消息业务路由分发机制 ​ 现在我们发送的消息都是message结构的,有个message头里面其中有两个关键的字段,`msgid`和`msglen`,其中加入`msgid`的意义就是我们可以甄别是哪个消息,从而对这类消息做出不同的业务处理。但是现在我们无论是服务端还是客户端都是写死的两个业务,就是"回显业务",显然这并不满足我们作为服务器框架的需求。我们需要开发者可以注册自己的回调业务。所以我们需要提供一个注册业务的入口,然后在后端根据不同的`msgid`来激活不同的回调业务函数。 ### 8.1 添加消息分发路由类msg_router ​ 下面我们提供这样一个中转的router模块,在include/message.h添加 > lars_reactor/include/message.h ```c #pragma once #include <ext/hash_map> //解决tcp粘包问题的消息头 struct msg_head { int msgid; int msglen; }; //消息头的二进制长度,固定数 #define MESSAGE_HEAD_LEN 8 //消息头+消息体的最大长度限制 #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN) //msg 业务回调函数原型 //===================== 消息分发路由机制 ================== class tcp_client; typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data); //消息路由分发机制 class msg_router { public: msg_router():_router(),_args() {} //给一个消息ID注册一个对应的回调业务函数 int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data) { if(_router.find(msgid) != _router.end()) { //该msgID的回调业务已经存在 return -1; } _router[msgid] = msg_cb; _args[msgid] = user_data; return 0; } //调用注册的对应的回调业务函数 void call(int msgid, uint32_t msglen, const char *data, tcp_client *client) { //判断msgid对应的回调是否存在 if (_router.find(msgid) == _router.end()) { fprintf(stderr, "msgid %d is not register!\n", msgid); return; } //直接取出回调函数,执行 msg_callback *callback = _router[msgid]; void *user_data = _args[msgid]; callback(data, msglen, msgid, client, user_data); } private: //针对消息的路由分发,key为msgID, value为注册的回调业务函数 __gnu_cxx::hash_map<int, msg_callback *> _router; //回调业务函数对应的参数,key为msgID, value为对应的参数 __gnu_cxx::hash_map<int, void *> _args; }; //===================== 消息分发路由机制 ================== ``` ​ 开发者需要注册一个`msg_callback`类型的函数,通过`msg_router`类的`register_msg_router()`方法来注册,同时通过`call()`方法来调用。 ​ 全部回调业务函数和msgid的对应关系保存在一个hash_map类型的`_router`map中,`_args`保存对应的参数。 ​ 但是这里有个小细节需要注意一下,`msg_callback`的函数类型声明是这样的。 ```c typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data); ``` ​ 其中这里面第4个参数,只能是tcp_client类型的参数,也就是我们之前的设计的msg_callback只支持tcp_client的消息回调机制,但是很明显我们的需求是不仅是`tcp_client`要用,tcp_server中的`tcp_conn`也要用到这个机制,那么很显然这个参数在这就不是很合适,那么如果设定一个形参既能指向`tcp_client`又能能指向`tcp_conn`两个类型呢,当然答案就只能是将这两个类抽象出来一层,用父类指针指向子类然后通过多态特性来调用就可以了,所以我们需要先定义一个抽象类。 ### 8.2 链接抽象类创建 ​ 经过分析,我们定义如下的抽象类,并提供一些接口。 > lars_reactor/include/net_connection.h ```c #pragma once /* * * 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类 * * */ class net_connection { public: //发送消息的接口 virtual int send_message(const char *data, int datalen, int msgid) = 0; }; ``` ​ 然后让我们tcp_server端的`tcp_conn`类继承`net_connecton`, 客户端的`tcp_client` 继承`net_connection` > lars_reactor/include/tcp_conn.h ```c class tcp_conn : public net_connection { //... }; ``` > lars_reactor/include/tcp_client.h ```c class tcp_client : public net_connection { //... } ``` 这样,我们就可以用一个net_connection指针指向这两种不同的对象实例了。 ​ 接下来我们将`msg_callback`回调业务函数类型改成 ```c typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data); ``` ​ 这样这个业务函数就可以支持tcp_conn和tcp_client了。 所以修改之后,我们的`msg_router`类定义如下: > lars_reactor/include/message.h ```c //消息路由分发机制 class msg_router { public: msg_router(): { printf("msg router init ...\n"); } //给一个消息ID注册一个对应的回调业务函数 int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data) { if(_router.find(msgid) != _router.end()) { //该msgID的回调业务已经存在 return -1; } printf("add msg cb msgid = %d\n", msgid); _router[msgid] = msg_cb; _args[msgid] = user_data; return 0; } //调用注册的对应的回调业务函数 void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn) { printf("call msgid = %d\n", msgid); //判断msgid对应的回调是否存在 if (_router.find(msgid) == _router.end()) { fprintf(stderr, "msgid %d is not register!\n", msgid); return; } //直接取出回调函数,执行 msg_callback *callback = _router[msgid]; void *user_data = _args[msgid]; callback(data, msglen, msgid, net_conn, user_data); printf("=======\n"); } private: //针对消息的路由分发,key为msgID, value为注册的回调业务函数 __gnu_cxx::hash_map<int, msg_callback*> _router; //回调业务函数对应的参数,key为msgID, value为对应的参数 __gnu_cxx::hash_map<int, void*> _args; }; ``` ### 8.3 msg_router集成到tcp_server中 #### A. tcp_server添加msg_router静态成员变量 > lars_reactor/include/tcp_server.h ```c class tcp_server { public: // ... //---- 消息分发路由 ---- static msg_router router; // ... }; ``` 同时定义及初始化 > lars_reactor/src/tcp_server.cpp ```c //... // ==== 消息分发路由 === msg_router tcp_server::router; //... ``` #### B. tcp_server提供注册路由方法 > lars_reactor/include/tcp_server.c ```c class tcp_server { public: //... //注册消息路由回调函数 void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) { router.register_msg_router(msgid, cb, user_data); } //... public: //全部已经在线的连接信息 //---- 消息分发路由 ---- static msg_router router; //... }; ``` #### C. 修正tcp_conn的do_read改成消息分发 > lars_reactor/src/tcp_conn.cpp ```c //... //处理读业务 void tcp_conn::do_read() { //1. 从套接字读取数据 int ret = ibuf.read_data(_connfd); if (ret == -1) { fprintf(stderr, "read data from socket\n"); this->clean_conn(); return ; } else if ( ret == 0) { //对端正常关闭 printf("connection closed by peer\n"); clean_conn(); return ; } //2. 解析msg_head数据 msg_head head; //[这里用while,可能一次性读取多个完整包过来] while (ibuf.length() >= MESSAGE_HEAD_LEN) { //2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN); if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) { fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen); this->clean_conn(); break; } if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) { //缓存buf中剩余的数据,小于实际上应该接受的数据 //说明是一个不完整的包,应该抛弃 break; } //2.2 再根据头长度读取数据体,然后针对数据体处理 业务 //头部处理完了,往后偏移MESSAGE_HEAD_LEN长度 ibuf.pop(MESSAGE_HEAD_LEN); //处理ibuf.data()业务数据 printf("read data: %s\n", ibuf.data()); //消息包路由模式 tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this); ////回显业务 //callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this); //消息体处理完了,往后便宜msglen长度 ibuf.pop(head.msglen); } ibuf.adjust(); return ; } //... ``` ### 8.4 msg_router集成到tcp_client中 > lars_reactor/include/tcp_client.h ```c class tcp_client : public net_connection { public: // ... //设置业务处理回调函数 //void set_msg_callback(msg_callback *msg_cb) //{ //this->_msg_callback = msg_cb; //} //注册消息路由回调函数 void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) { _router.register_msg_router(msgid, cb, user_data); } private: //处理消息的分发路由 msg_router _router; //msg_callback *_msg_callback; //单路由模式去掉 // ... // ... }; ``` ​ 然后在修正`tcp_client`的`do_read()`方法。 > lars_reactor/src/tcp_client.cpp ```c //处理读业务 int tcp_client::do_read() { //确定已经成功建立连接 assert(connected == true); // 1. 一次性全部读取出来 //得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。 int need_read = 0; if (ioctl(_sockfd, FIONREAD, &need_read) == -1) { fprintf(stderr, "ioctl FIONREAD error"); return -1; } //确保_buf可以容纳可读数据 assert(need_read <= _ibuf.capacity - _ibuf.length); int ret; do { ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read); } while(ret == -1 && errno == EINTR); if (ret == 0) { //对端关闭 if (_name != NULL) { printf("%s client: connection close by peer!\n", _name); } else { printf("client: connection close by peer!\n"); } clean_conn(); return -1; } else if (ret == -1) { fprintf(stderr, "client: do_read() , error\n"); clean_conn(); return -1; } assert(ret == need_read); _ibuf.length += ret; //2. 解包 msg_head head; int msgid, length; while (_ibuf.length >= MESSAGE_HEAD_LEN) { memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN); msgid = head.msgid; length = head.msglen; /* if (length + MESSAGE_HEAD_LEN < _ibuf.length) { break; } */ //头部读取完毕 _ibuf.pop(MESSAGE_HEAD_LEN); // =================================== //3. 交给业务函数处理 //if (_msg_callback != NULL) { //this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL); //} // 消息路由分发 this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this); // =================================== //数据区域处理完毕 _ibuf.pop(length); } //重置head指针 _ibuf.adjust(); return 0; } ``` ### 8.5 完成Lars Reactor V0.6开发 我们现在重新写一下 server.cpp 和client.cpp的两个应用程序 > lars_reacor/example/lars_reactor_0.6/server.cpp ```c #include "tcp_server.h" //回显业务的回调函数 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("callback_busi ...\n"); //直接回显 conn->send_message(data, len, msgid); } //打印信息回调函数 void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("recv client: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; tcp_server server(&loop, "127.0.0.1", 7777); //注册消息业务路由 server.add_msg_router(1, callback_busi); server.add_msg_router(2, print_busi); loop.event_process(); return 0; } ``` > lars_reacor/example/lars_reactor_0.6/client.cpp ```c #include "tcp_client.h" #include <stdio.h> #include <string.h> //客户端业务 void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { //得到服务端回执的数据 printf("recv server: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; //创建tcp客户端 tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6"); //注册消息路由业务 client.add_msg_router(1, busi); //开启事件监听 loop.event_process(); return 0; } ``` > lars_reactor/src/tcp_client.cpp ```c //判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误 static void connection_delay(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client*)args; loop->del_io_event(fd); int result = 0; socklen_t result_len = sizeof(result); getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len); if (result == 0) { //链接是建立成功的 cli->connected = true; printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); // ================ 发送msgid:1 ===== //建立连接成功之后,主动发送send_message const char *msg = "hello lars!"; int msgid = 1; cli->send_message(msg, strlen(msg), msgid); // ================ 发送msgid:2 ===== const char *msg2 = "hello Aceld!"; msgid = 2; cli->send_message(msg2, strlen(msg2), msgid); // ================ loop->add_io_event(fd, read_callback, EPOLLIN, cli); if (cli->_obuf.length != 0) { //输出缓冲有数据可写 loop->add_io_event(fd, write_callback, EPOLLOUT, cli); } } else { //链接创建失败 fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); } } ``` 运行结果: 服务端 ```c $ ./server msg_router init... add msg cb msgid = 1 add msg cb msgid = 2 begin accept get new connection succ! read data: hello lars! call msgid = 1 callback_busi ... server send_message: hello lars!:11, msgid = 1 ======= read data: hello Aceld! call msgid = 2 recv client: [hello Aceld!] msgid: [2] len: [12] ``` 客户端 ```c $ ./client msg_router init... do_connect EINPROGRESS add msg cb msgid = 1 connect 127.0.0.1:7777 succ! do write over, del EPOLLOUT call msgid = 1 recv server: [hello lars!] msgid: [1] len: [11] ======= ``` --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**