企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
定时任务使用spring-task,分布式是基于redis实现,可保证同一任务同时只有一个节点在执行。 ***** **使用技术:** * 分布式锁,基于redis实现 * 队列,基于redis实现 * 发布/订阅,基于redis实现 * 分布式缓存,基于redis实现 ***** 实现原理: 1. 每个启动的节点服务都执行一个内置任务,每秒执行一次,基于分布式锁尝试锁定自己为master节点,集群环境中只有一个master节点。 2. master节点的每个定时任务执行时向队列推入一个令牌,同时向集群所有节点广播一条消息,各节点收到消息后从队列中抢令牌,谁抢到谁执行真正的任务 ``` /** * 尝试锁定本服务作为master * * @param timeoutMillis */ @SuppressWarnings("unchecked") public void tryLockMaster(final long timeoutMillis) { final ICache healthCache = cacheManager.getICache(TaskConstants.TASK_HEALTH_CACHE_NAME); if (healthCache.getNativeCache() instanceof RedisOperations) { RedisOperations redisOperations = (RedisOperations) healthCache.getNativeCache(); //利用redis全局锁 RedisLock lock = new RedisLock(redisOperations, TaskConstants.TASK_HEALTH_CACHE_NAME); lock.execute(new LockCallback<Boolean>() { @Override public Boolean doInLock(RedisConnection connection) { if (!existMaster() || isMaster()) { //将全局唯一ID置入缓存 healthCache.put(TaskConstants.TASK_HEALTH_KEY_NAME, TaskConstants.UU_ID); //设置过期时间 healthCache.expire(TaskConstants.TASK_HEALTH_KEY_NAME, timeoutMillis / 1000); log.info("I'm master! UUID[{}]", TaskConstants.UU_ID); } return true; } }); } } ``` ``` /** * 集群中本任务只有一个服务(master)在执行 * * @return */ @SuppressWarnings("unchecked") public void doTask() { MasterFactory masterFactory = getMasterFactory(); //如果使用集群模式则发送消息 if (masterFactory.isCluster()) { if (masterFactory.isMaster()) { //令牌入队。使用队列保证集群中同时只有一个服务执行任务,即集群中只有拿到令牌的服务才会执行任务 masterFactory.getQueueManager().getQueue(CURRENT_QUEUE).offer(1); //发送执行任务命令 masterFactory.getPubSubManager().publish(1, CURRENT_CHANNEL); } } //否则直接执行任务 else { doExecute(); } } ``` ``` /** * 接收消息并处理 * * @param channel * @param messageBody * @param pattern */ @SuppressWarnings("unchecked") @Override public void onMessage(String channel, Object messageBody, String pattern) { if (CURRENT_CHANNEL.equals(channel)) { //使用队列保证集群中同时只有一个服务执行任务 IQueueManager queueManager = masterFactory.getQueueManager(); IQueue queue = queueManager.getQueue(CURRENT_QUEUE); if (queue.poll() != null) { //同步执行任务,即同一个任务直到上一次执行完成时再执行下次。单机运行时,spring task内部已经实现,但集群任务下需要另外实现,此处基于redis全局锁实现 if (queueManager instanceof RedisQueueManager) { //基于redis全局锁实现 RedisOperations redisOperations = ((RedisQueueManager) queueManager).getRedisOperations(); RedisLock lock = new RedisLock(redisOperations, CURRENT_TASK_LOCK_NAME); lock.execute(new LockCallback<Boolean>() { @Override public Boolean doInLock(RedisConnection connection) { //执行具体的任务 doExecute(); return true; } }); } //其他方式待实现... else { doExecute(); } } } } ``` 要想启用分布式任务模式需设置以下参数 --cache.cacheDriver=rediscache --mq.pubsub.import=true --mq.queue.import=true --redis.host=xxx.xxx.xxx.xxx