# Saga Saga是用来在微服务中的长事务管理,具备ACID中的ACD,不具备I,隔离性。在一定业务条件下,可以使用Saga非常简单和方便的管理微服务事务。同理,也可以用于管理多库事务 Saga要求微服务提供回滚操作,然后如果需要回滚,有Saga编排调度各个微服务对应的回滚服务。BeetlSQL提供了SagaMapper,是的内置的操作都有对应的回滚操作,也提供@SagaSql,用户提供正向SQl,也提供回滚SQL。这样,在多库环境下,BeetlSQL能正确回滚数据而不依赖于数据库提供的事务 ## SagaMapper SagaMapper同BaseMapper,但有自己的实现方式 ```java public interface SagaMapper<T> { /** sega 改造的接口**/ @AutoMapper(SagaInsertAMI.class) void insert(T entity); @AutoMapper(SagaUpdateByIdAMI.class) int updateById(T entity); @AutoMapper(SagaDeleteByIdAMI.class) int deleteById(Object key); /** 正常接口 **/ @AutoMapper(SingleAMI.class) T single(Object key); @AutoMapper(UniqueAMI.class) T unique(Object key); @AutoMapper(SelectByIdsAMI.class) List<T> selectByIds(List<?> key); } ``` 可以看到,insert方法的实现是SagaInsertAMI而不是InsertAMI,SagaInsertAMI代码如下 ```java @Override public Object call(SQLManager sm, Class entityClass, Method m, Object[] args) { int ret = sm.insert(args[0]); SagaContext sagaContext = SagaContext.sagaContextFactory.current(); Class target = args[0].getClass(); String idAttr = sm.getClassDesc(target).getIdAttr(); Object key = BeanKit.getBeanProperty(args[0],idAttr); sagaContext.getTransaction().addTask(new InsertSagaRollbackTask(sm.getName(),target,key) ); return ret; } @Data public static class InsertSagaRollbackTask implements SagaRollbackTask { //todo } ``` SagaInsertAMI除了调用SQLManager完成insert操作,BeetlSQL也会记录一个反向操作到sagaContext里,如果在Saga事务环境里要求回滚,则会执行这个方向操作 ```java /** * 参考代码org.beetlsql.sql.saga.test.SimpleTest */ @Test public void simple(){ SagaContext sagaContext = SagaContext.sagaContextFactory.current(); UserMapper userMapper = sqlManager.getMapper(UserMapper.class); long count = sqlManager.allCount(User.class); try{ User user = new User(); user.setName("abc"); userMapper.insert(user); User user2 = new User(); user2.setName("abc"); userMapper.insert(user2); throw new RuntimeException("模拟异常"); }catch(RuntimeException ex){ sagaContext.rollback(); } long afterCount = sqlManager.allCount(User.class); Assert.assertEquals(count,afterCount); } ``` 代码第一行是获得SagaContext,BeetlSQL内置了LocalSagaContext和KakaSagaContext,前者是执行回一次,后者则是会多次执行,在每次执行失败后,丢给Kafka队列。如果执行超过某个阈值,则放到丢弃队列。 开发者可以根据自己需要实现SagaContext,比如使用其他消息机制,或者把回滚任务保存到reids里。 除了内置操作提供回滚外,SagaMapper支持提供反向sql的注解 @SagaUpdateSql ```java @SagaUpdateSql( sql="update stock set count=count+1 where id=?", rollback = "update stock set count=count-1 where id=? and count!=0" ) void addStock(String id); ``` @SagaUpdateSql同@Sql,但提供了反向SQL,一旦要求saga回滚,则执行rollback表示 > 对于更新操作,Saga认为如果更新api返回0,则说明回滚失败。比如上面库存,因为id不存在或者count=0情况下,更新失败。 ## Spring Kafka 实现 可能有多种原因导致回滚失败,比如数据库宕机,或者备库还暂时未连上,及时备库连上,可能数据还暂时为同步到倍库,默认的LocalSagaContext只执行一次回滚,如果失败,则抛出异常。KafkaSagaContext则会按照配置,执行多次,且如果最终还是失败的话,会吧回滚任务放入丢弃队列,等待手工干预 启用Kafak,首选需要导入依赖 ```xml <groupId>com.ibeetl</groupId> <artifactId>sql-saga-springkafa</artifactId> <version>3.1.0-RELEASE</version> ``` > 完整实例参考源码org.beetl.sql.springboot.dynamic.DynamicTest 配置重试队列和丢弃队列,以及kafka的配置 ```properties beetlsql-saga.kafka.retry-topic=retryTopic001 beetlsql-saga.kafka.fail-topic=failTopic001 spring.kafka.bootstrapServers=127.0.0.1:9092 spring.kafka.consumer.group-id=saga spring.kafka.consumer.auto-offset-reset=latest spring.kafka.listener.type=single spring.kafka.listener.ack-mode=record spring.kafka.consumer.value-deserializer=org.beetl.sql.saga.kafka.JacksonDeserializer spring.kafka.producer.value-serializer=org.beetl.sql.saga.kafka.JacksonSerializer ``` 编写重试逻辑和丢弃逻辑,重试逻辑会尝试再次回滚,丢弃逻辑则依据业务系统来定,比如保存到数据库,等待人工来处理,或者定时后再处理 ```java @Configuration @Slf4j @ImportAutoConfiguration(KafkaSagaConfig.class) public class FailSagaConfig { @Autowired ObjectMapper objectMapper; @Autowired KafkaSagaConfig kafkaSagaConfig; /** * 重试回滚 * @param record * @throws Exception */ @KafkaListener( topics = "#{'${beetlsql-saga.kafka.retry-topic}'}") public void retry(ConsumerRecord<?, KafkaSagaTransaction> record) throws Exception { try{ KafkaSagaTransaction kafkaSegaTransaction = record.value(); KafkaSagaContext kafkaSegaContext = new KafkaSagaContext(kafkaSegaTransaction,kafkaSagaConfig); kafkaSegaContext.rollback(); }catch(Exception ex){ log.info(ex.getMessage()); } } @KafkaListener( topics = "#{'${beetlsql-saga.kafka.fail-topic}'}") public void fail(ConsumerRecord<?, KafkaSagaTransaction> record) throws Exception { try{ KafkaSagaTransaction kafkaSegaTransaction = record.value(); log.error("save to db:"+objectMapper.writeValueAsString(kafkaSegaTransaction)); }catch(Exception ex){ log.info(ex.getMessage()); } } } ``` @ImportAutoConfiguration(KafkaSagaConfig.class) 必须有,KafkaSagaConfig是BeetlSQL提供的内置类。这里会初始化配置信息 ```java public class KafkaSagaConfig { // 重试次数 @Value("${beetlsql-saga.max-try:2}") protected int maxTry; //重试队列 @Value("${beetlsql-saga.kafka.retry-topic:retrySagaTopic}") protected String retrySegaTopic; //重试也失败后的发送的队列,通常人工处理 @Value("${beetlsql-saga.kafka.fail-topic:failSagaTopic}") protected String failSegaTopic; @Autowired protected KafkaTemplate template; @PostConstruct public void initSaga() { //必须设置事务实现方式 SagaContext.sagaContextFactory = new KafkaSagaContextFactory(this); } } ``` 在完成上述配置后,可以在业务系统中使用Saga (单元测试源码:org.beetl.sql.springboot.dynamic.DynamicService) ```java @Transactional(propagation = Propagation.NEVER) public boolean normal(){ SagaContext sagaContext = SagaContext.sagaContextFactory.current(); try{ UserInfoInDs1 ds1 = new UserInfoInDs1(); ds1.setId(100); ds1.setName("ces"); UserInfoInDs2 ds2 = new UserInfoInDs2(); ds2.setId(100); ds2.setName("abs"); //俩个数据库 userInfoDs1Mapper.insert(ds1); userInfoDs2Mapper.insert(ds2); //模拟一个错误 int a = 1/0; }catch(Exception ex){ sagaContext.rollback(); return false; } return true; } ``` 如上错误,通过rollback即可恢复数据 也有数据库宕机无法回滚的操作 ```java @Transactional(propagation = Propagation.NEVER) public boolean dbDown(){ SagaContext sagaContext = SagaContext.sagaContextFactory.current(); try{ UserInfoInDs1 ds1 = new UserInfoInDs1(); ds1.setId(100); ds1.setName("ces"); UserInfoInDs2 ds2 = new UserInfoInDs2(); ds2.setId(100); ds2.setName("abs"); //俩个数据库 userInfoDs1Mapper.insert(ds1); //删除表ds1和ds2,导致ds2插入失败,且回滚失败 rmTable(); userInfoDs2Mapper.insert(ds2); }catch(Exception ex){ sagaContext.rollback(); return false; } return true; } ``` `rmTable()`;会删除数据库表,导致操作无法回滚,最后会调用FailSagaConfig.fail方法