🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
6.2.25 对象操作sharding分库分表 * 演示参考范例地址 [https://github.com/sagframe/sqltoy-showcase/tree/master/trunk/sqltoy-sharding](https://github.com/sagframe/sqltoy-showcase/tree/master/trunk/sqltoy-sharding) * 4.x版本开始支持对象分库分表,参见sqltoy-showcase中的StaffInfoVO。 * **针对增加、修改、删除等操作需要整合JTA实现跨数据库事务管理,参考范例中的配置** * **如果只是查询分库分表,请参见5.3.1.3和5.3.1.4 章节的说明,跟JTA无关,因为没有事务概念。** ![](https://img.kancloud.cn/76/86/76860cd4b460d033e7de272b61e34571_978x640.png) 参见sharding配置(sqltoy-showcase中的spring-sqltoy.xml): ![](https://img.kancloud.cn/f8/ae/f8ae256c94d28f751945f3023f950c6d_754x463.png) ShardingStrategy使用sqltoy默认提供的单字段hash取模算法,具体实现参见org.sagacity.sqltoy.plugins.sharding.HashShardingStrategy,默认实现如果不满足实际业务需要,可以自行实现ShardingStrategy。 ![](https://img.kancloud.cn/da/1f/da1f35a64ee5aac454a6aa465af78a82_1047x740.png) 核心配置:@Sharding配置: * 单分库的配置: @Sharding(db = @Strategy(name = "hashDataSourceSharding", fields = {"staffId" }), maxConcurrents = 10, maxWaitSeconds = 3600) * 单分表的配置: @Sharding(table = @Strategy(name = "hashDataSourceSharding", fields = { "staffId" }), maxConcurrents = 10, maxWaitSeconds = 3600) * 简化配置: @Sharding(db = @Strategy(name = "hashDataSourceSharding", fields = {"staffId" })) * maxConcurrents:最大并行数量:分库分表在批量插入或修改、加载时会同时将数据分组到多个库,sqltoy底层是一个并行执行过程,如分20个库,并行数是10,则会有一个线程池进行并行控制。 * maxWaitSeconds:最大并行计算等待时长(单位秒),防止并行执行时间过长,设置的并行等待时长。 参见:org.sagacity.sqltoy.parallel.ParallelUtils /\*\* \* @todo 将集合进行根据sharding字段的值提取sharding策略并按照策略将集合分组,然后并行执行 \* @param sqlToyContext \* @param entities \* @param dataSource \* @param handler \* @return \* @throws Exception \*/ public static List execute(final SqlToyContext sqlToyContext, List entities, boolean wrapIdValue, DataSource dataSource, ParallelCallbackHandler handler) throws Exception { // 获取对象的媒体信息 EntityMeta entityMeta = sqlToyContext.getEntityMeta(entities.get(0).getClass()); // 主键值需要提前按照主键策略赋予(sequence 和assign模式的不会实际执行赋值) if (wrapIdValue) { ShardingUtils.assignPKs(sqlToyContext, entityMeta, entities); } // 将批量集合数据按sharding策略处理后的库和表组成的key进行分组 Collection<ShardingGroupModel> shardingGroups = ShardingUtils.groupShardings(sqlToyContext, entities, entityMeta, dataSource); // 单分组直接执行并返回结果 if (shardingGroups.size() == 1) { return handler.execute(sqlToyContext, shardingGroups.iterator().next()); } // 开始多线程并行执行 ShardingConfig shardingConfig = entityMeta.getShardingConfig(); List results = new ArrayList(); // 并行线程数量 int threads = shardingGroups.size(); // 并行线程数量大于指定的最大并行数,则按实际指定的并行数执行 if (threads > shardingConfig.getMaxConcurrents() && shardingConfig.getMaxConcurrents() > 1) threads = shardingConfig.getMaxConcurrents(); //开启固定线程池 ExecutorService pool = Executors.newFixedThreadPool(threads); List<Future<List>\> futureResult = new ArrayList<Future<List>\>(); for (final ShardingGroupModel group : shardingGroups) { Future<List> future = pool.submit(new DialectExecutor(sqlToyContext, group, handler)); futureResult.add(future); } pool.shutdown(); // 设置最大等待时长 if (shardingConfig.getMaxWaitSeconds() > 0) pool.awaitTermination(shardingConfig.getMaxWaitSeconds(), TimeUnit.SECONDS); // 提取各个线程返回的结果进行合并 try { for (Future<List> future : futureResult) { List item = future.get(); if (item != null && !item.isEmpty()) ``` results.addAll(item); ``` } } catch (Exception e) { e.printStackTrace(); throw e; } finally { pool.shutdownNow(); } return results; } * 测试代码: ![](https://img.kancloud.cn/bf/8e/bf8ecd77548162c755c130a173be34a2_1010x760.png) 结果查询:(相同库sqltoy不同表:sys\_staff\_info 和sys\_staff\_info\_1) ![](https://img.kancloud.cn/a5/98/a5989e19987d18b268d99cbd6fa66df1_692x558.png)![](https://img.kancloud.cn/9d/21/9d210fe70183ebbe2cf1744cee084009_667x623.png) 不同库: ![](https://img.kancloud.cn/b3/8b/b38b7d12e7ce77506afa756ae55d5009_908x609.png)