企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
#### 8.4 Zinx-V0.8代码实现 好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的`Start()`方法中,在服务端Accept之前,启动Worker工作池。 > zinx/znet/server.go ```go //开启网络服务 func (s *Server) Start() { //... //开启一个go去做服务端Linster业务 go func() { //0 启动worker工作池机制 s.msgHandler.StartWorkerPool() //1 获取一个TCP的Addr addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) if err != nil { fmt.Println("resolve tcp addr err: ", err) return } //... //... } }() } ``` 其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。 所以应该在Connection的`StartReader()`方法中修改: > zinx/znet/connection.go ```go /* 读消息Goroutine,用于从客户端中读取数据 */ func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running") defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!") defer c.Stop() for { // 创建拆包解包的对象... //读取客户端的Msg head... //拆包,得到msgid 和 datalen 放在msg中... //根据 dataLen 读取 data,放在msg.Data中... //得到当前客户端请求的Request数据 req := Request{ conn:c, msg:msg, } if utils.GlobalObject.WorkerPoolSize > 0 { //已经启动工作池机制,将消息交给Worker处理 c.MsgHandler.SendMsgToTaskQueue(&req) } else { //从绑定好的消息和对应的处理方法中执行对应的Handle方法 go c.MsgHandler.DoMsgHandler(&req) } } } ``` 这里并没有强制使用多任务Worker机制,而是判断用户配置`WorkerPoolSize`的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。