企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 9) 定期拉取最新路由信息(V0.6) 1. 当agent首次连接到dns service时,将全部的`load_balance`均设置为`NEW`状态。如果dns service重新启动,或者断开链接重连,都会将之前的拉去中或者没拉取的`load_balance`状态都设置为`NEW`状态,只有`NEW`状态的`load_balance`才能定期自动拉取. > lars_loadbalance_agent/src/dns_client.cpp ``` //======================================================== static void conn_init(net_connection *conn, void *args) { for (int i = 0; i < 3; i++) { r_lb[i]->reset_lb_status(); } } //======================================================== void *dns_client_thread(void* args) { printf("dns client thread start\n"); event_loop loop; //1 加载配置文件得到dns service ip + port std::string ip = config_file::instance()->GetString("dnsserver", "ip", ""); short port = config_file::instance()->GetNumber("dnsserver", "port", 0); //2 创建客户端 tcp_client client(&loop, ip.c_str(), port, "dns client"); //3 将thread_queue消息回调事件,绑定到loop中 dns_queue->set_loop(&loop); dns_queue->set_callback(new_dns_request, &client); //4 设置当收到dns service回执的消息ID_GetRouteResponse处理函数 client.add_msg_router(lars::ID_GetRouteResponse, deal_recv_route); //======================================================== //5.设置链接成功/链接断开重连成功之后,通过conn_init来清理之前的任务 client.set_conn_start(conn_init); //======================================================== //启动事件监听 loop.event_process(); return NULL; } ``` > lars_loadbalance_agent/src/route_lb.cpp ```c //将全部的load_balance都重置为NEW状态 void route_lb::reset_lb_status() { pthread_mutex_lock(&_mutex); for (route_map_it it = _route_lb_map.begin(); it != _route_lb_map.end(); it++) { load_balance *lb = it->second; if (lb->status == load_balance::PULLING) { lb->status = load_balance::NEW; } } pthread_mutex_unlock(&_mutex); } ``` 2. 增加配置文件参数update_timeout, 表示一个NEW状态的load_balance下的modid/cmdid节点应该经历多长时间进行一次刷新拉取。 > lars_loadbalance_agent/conf/lars_lb_agent.conf ```ini ;对于每个NEW状态的modid/cmdid,多久更新一下本地路由,秒 update_timeout=15 ``` > lars_loadbalance_agent/include/main_server.h ```c struct load_balance_config { //... //对于每个NEW状态的modid/cmdid,多久更新一下本地路由,秒 long update_timeout; }; ``` > lars_loadbalance_agent/src/main_server.cpp ```c static void init_lb_agent() { //1. 加载配置文件 config_file::setPath("./conf/lars_lb_agent.conf"); //... //... lb_config.update_timeout = config_file::instance()->GetNumber("loadbalance", "update_timeout", 15); //... } ``` 3. 给`load_balance`设置最后update时间参数,及最后一次从dns service拉取下来更新`host_map`的时间.然后在`route_lb`每次执行`get_host`的时候,对每个已经存在的host节点做最后时间超时检查,如果超时,则重新从 dns service中拉取。 > lars_loadbalance_agent/include/load_balance.h ```c /* * 负载均衡算法核心模块 * 针对一组(modid/cmdid)下的全部host节点的负载规则 */ class load_balance { public: load_balance(int modid, int cmdid): status(PULLING), last_update_time(0), _modid(modid), _cmdid(cmdid) { //load_balance 初始化构造 } // ... long last_update_time; //最后更新host_map时间戳 private: // ... }; ``` > lars_loadbalance_agent/src/load_balance.cpp ```c //根据dns service远程返回的结果,更新_host_map void load_balance::update(lars::GetRouteResponse &rsp) { long current_time = time(NULL); //... //... //更新最后update时间 last_update_time = current_time; //重置状态为NEW status = NEW; } ``` ​ `load_balance`每次调用`update()`都记录一次最后的更新时间,并标记为`NEW`表示当前`modid/cmdid`没有在PULLING,可以再更新。 > lars_loadbalance_agent/src/route_lb.cpp ```c //agent获取一个host主机,将返回的主机结果存放在rsp中 int route_lb::get_host(int modid, int cmdid, lars::GetHostResponse &rsp) { int ret = lars::RET_SUCC; //1. 得到key uint64_t key = ((uint64_t)modid << 32) + cmdid; pthread_mutex_lock(&_mutex); //2. 当前key已经存在_route_lb_map中 if (_route_lb_map.find(key) != _route_lb_map.end()) { //2.1 取出对应的load_balance load_balance *lb = _route_lb_map[key]; if (lb->empty() == true) { //存在lb 里面的host为空,说明正在pull()中,还没有从dns_service返回来,所以直接回复不存在 assert(lb->status == load_balance::PULLING); rsp.set_retcode(lars::RET_NOEXIST); } else { ret = lb->choice_one_host(rsp); rsp.set_retcode(ret); // ================================================= //超时重拉路由 //检查是否要重新拉路由信息 //若路由并没有处于PULLING状态,且有效期已经超时,则重新拉取 if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) { lb->pull(); } // ================================================= } } //3. ... // ... pthread_mutex_unlock(&_mutex); return ret; } ``` --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**