🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
## 场景六:Kafka数据同步到关系型数据 ### 一、核心流程 1\. 创建或选择数据源 & 目的地 2\. 任务设置 3\. 读取设置 4\. 写入设置 5\. 管理数据任务 ### 二、核心操作步骤 1. 创建或选择数据源 & 目的地,填写数据任务名称 * 若已存在想要选择的数据源 & 目的地,则直接选择,可选择一个数据源和多个数据目的地。 * 若还未创建数据源 & 目的地,可参照[配置Kafka数据源](chapter1/shu-ju-yuan/pei-zhi-kafka.md) & [配置数据目的地](如何配置数据目的地.md) 配置方法。 ![](https://img.kancloud.cn/6b/a0/6ba01531b46ced36338d7f6f7ddf846a_1215x660.png) 2. 点击下一步,进入任务设置 任务设置主要包括了以下几个部分: * 重要任务设置:如该任务为重要数据任务,您可在任务设置页面设置重要任务星标。 * 参与人:拥有该任务的浏览和编辑权限,可以查看数据任务的所有通知 * 数据源设置:包括选择数据源的读取方式,读取并发数量,读取速率限制,高级设置等,可根据数据的特性和用户的需求场景对数据读取进行相应设置 * 数据目的地设置:包括写入并发数量,处理速率限制,表和字段名称,高级设置等,可对数据写入进行设置 * 其他设置:包括错误队列设置和邮件通知设置 下面主要介绍本实例的相关步骤。 #### 「数据源设置」 ![](https://img.kancloud.cn/df/e3/dfe3ed66358b67f1ef6e3520d56b2a74_1964x764.png) * 资源配置 * 设置当前任务数据源使用资源组,通过合理分配任务与资源组,可使重要任务计算资源得到保障。 * 读取方式 当数据源为Kafka时,无该选项 * 读取频率 只有定时读取一项,并且需要设置定时时长,默认为60s,可切换为Cron 表达式 * 读取并发数量 * 读取并发数量是指该任务从数据源中并行读取表的数量,默认为5。 * 具体读取并发数量要根据数据源可接受请求量决定。 * 详细策略:并发数量代表着并行读取topic的数量,当topic读取完毕时系统会释放掉该线程,其他的topic可继续抢占该线程进程数据的读取。任务暂停后又重启所有的topic会重新抢占线程,系统会按照断点续传的方式继续同步新增数据。 * 读取速率限制 * 当用户设置读取速率限制,系统平均读取速率将不会超过该数值。 * 可按流量或行数作为限制条件。 * 用户能够不选、单选或多选,若同时勾选,两种限制将同时应用。 * 勾选后即可激活对应的速率限制,请输入正整数; * 数据源高级设置 请参见[Kafka读取设置](Kafka读取设置.md) #### 「数据目的地设置」 ![](https://img.kancloud.cn/8a/50/8a5089717d800f7983e3a845f78d7e9c_1952x684.png) * 设置当前任务数据目的地使用资源组,通过合理分配任务与资源组,可使重要任务计算资源得到保障。 * 写入并发数量 * 基本逻辑与「读取并发数量」一致。写入并发数量是指该任务从数据源中并行写入表的数量,默认为5。 * 具体写入并发数量要根据数据源可接受请求量决定。 * 写入速率限制 * 当用户设置写入速率限制,系统平均写入速率将不会超过该数值。 * 可按流量或行数作为限制条件。 * 用户能够不选、单选或多选,若同时勾选,两种限制将同时应用。 * 勾选后即可激活对应的速率限制,请输入正整数; * 表和字段名称大小写 * 支持用户设置目的地表名称和字段名称大小写 * 选择【自定义】,任务执行后,系统会根据用户自定义的表和字段名称写入到目的地。 * 选择【全部大写】,任务执行后,系统会将用户设置的所有表和字段名称转换为大写(只包含英文)后写入到目的地。 * 选择【全部小写】,任务执行后,系统会将用户设置的所有表和字段名称转换为小写(只包含英文)后写入到目的地。 * 目的地高级设置 请参见[子任务设置](子任务设置.md)、[数据源变化设置](数据源变化设置.md)、[写入一致性](写入一致性.md) #### 「其他设置」 请参见[错误队列设置](错误队列设置.md)、[邮件通知设置](邮件通知设置.md) 3. 点击下一步,选择要同步的Topic ![](https://img.kancloud.cn/67/9b/679ba6c2834ffd40899b61d2a14c8b4e_982x723.png) 勾选需要的同步的表,点击保存。 4. 读取设置 ![](https://img.kancloud.cn/f6/f4/f6f4d03f9c333ef5274364eb831cd69a_1247x561.png) 读取设置主要包括几个部分: * 同步列表:用户选择的要同步的Topic,批量功能见[批量设置](批量功能.md) * 表结构:选中某个表后,会展示该表的字段名称、类型、精度、标度、Not Null等信息 * 传输队列设置:可对每个表单独进行设置,优先级高于任务设置中的传输队列设置 本实例的要注意的部分: * 表结构: * 当Kafka数据源的数据格式为Avro时,展现表结构 * 当数据格式为JSON或String时,不展现表结构 * 读取条件 Kafka数据源时不支持设置读取条件 5. 点击下一步,进入写入设置页面 ![](https://img.kancloud.cn/f7/90/f7905b1aa6277d88ceb5eee882e87e78_1244x556.png) 该页面主要包括: * 同步列表:用户选择的要同步的Topic,可点击进行切换,对每个Topic的写入规则进行设置,详情见[批量设置](批量功能1.md) * 映射列表:从数据源到目的地的映射详情,可对目的地表名、字段名、类型、精度、标度、Not Null、主键、是否同步字段等进行修改,可以添加和删除字段 * 高级设置:可以进行子任务Batch 设置,是单表级别的设置,优先级高于任务设置中的子任务设置,详情见[子任务设置](子任务设置.md) * 清洗脚本:可以开启后进行编辑,自定义对数据完成一些替换或清洗操作 本实例的要注意的部分: * 当数据格式为avro时,展示数据源到目的地的表结构映射关系,可以直接点击保存设置,激活任务,会按照映射关系同步数据。用户也可根据自己的需求自定义清洗脚本清洗数据。详情见[设置清洗脚本](pzgz/qxjb.md) * 当数据格式为JSON或String时,无表结构的映射。 ![](https://img.kancloud.cn/67/f0/67f0ef8490f8d64068d2b898bd7e1945_1244x423.png) 用户需要自定义表结构,必须在目的写入设置界面手动添加目的地字段和设置清洗脚本解析数据,写出映射关系,才能保证数据的正常同步。 **以一具体数据为例**: JSON 样例如下: ``` { "_id": "topic_json_par_1-0-21", "key": null, "value": "{\"cust-id\":7979797,\"month\":11,\"expenses\":18.72}" } ``` * 假设我想写入目的地的字段是「_id」和value中的「cust-id」、「month」、「expenses」 * 首先需要在写入设置中目的地表增加这些字段,可以自己设定字段名,字段类型需要与脚本中的类型相同 ![](https://img.kancloud.cn/0d/e5/0de5153f3838ebc06304d0663ef5d695_931x382.png) * 再编辑清洗脚本,对这些字段的数据来源进行说明 ``` import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; public class JsonProcess { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public List<Map<String, Object>> process(Map<String, Object> record) throws Exception { List<Map<String, Object>> records = new ArrayList<>(); if (!Objects.isNull(record)) { //取出想要的字段"_id",存为字符串类型 String id = (String) record.get("_id"); //先将value取出, value 类型为字符串,其中存储的是 JSON 结构的字符 String jsonTypeStr = (String) record.get("value"); if (jsonTypeStr != null && jsonTypeStr.length() > 0) { // 将 value字符串转换成 JSON 对象 final JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonTypeStr); // 从 value中获取"cust-id"元素的值,原数据为数值类型,存为数值类型 final Integer cust = jsonNode.get("cust-id").asInt(); // 从value中获取month元素 final Integer month = jsonNode.get("month").asInt(); // 从value中获取expenses元素 final Integer expenses = jsonNode.get("expenses").asInt(); //将取出的元素写入 Map<String, Object> eachRowData = new HashMap<>(); //put函数第一个参数需与目的地表名的字段名一致,第二个参数为上面定义的变量名 eachRowData.put("_id",id); eachRowData.put("cust", cust); eachRowData.put("month", month); eachRowData.put("expense", expenses); records.add(eachRowData); } } return records; } } ``` * 点击试运行,可查看代码清洗效果。若出现脚本编译失败,可点击查看详情检查具体报错位置,进行代码修改。 ![](https://img.kancloud.cn/12/7f/127fc78d27eaa2fbdd227a0e5155c727_1258x356.png) * 清洗脚本编辑完成后,点击保存。 <br/> 写入设置完成后,点击「保存」 ,可激活任务。 6. 管理数据任务 任务激活后,可看到任务详情页。 ![](https://img.kancloud.cn/21/7b/217bc0f082f79671f472c330fd701417_1257x674.png) #### 「数据任务统计」 * 已读取数据量: * 指 DataPipeline 从数据源已读取的数据量。 * 当 DataPipeline 系统重启,会根据断点续传机制从上一次读取记录点开始重新读取数据,重复读取的数据量不会记录到已读取数据量里。 * 无主键的定时同步时全量替换数据,故每次定时读取全量数据,数据源没有更新的数据会再次读取。 * 已处理数据量: * 指 DataPipeline 已处理的数据量,这里包括:同步到数据目的的数据量和进入到错误队列的数据量。 * 当 DataPipeline 系统重启,会根据断点续传机制从上一个读取记录点开始重新写入部分数据,但这部分数据不会记录到已写入数据量里。 * 错误队列:指已读取的数据中系统判断无法写入到数据目的地,而异步放到错误队列中的数据量。: * 读取速率:指DataPipeline从数据源读取数据的速率。 * 处理速率:指DataPipeline处理数据的速率 #### 「复制任务」 详情见:[复制任务](复制功能.md) #### 「错误队列」 详情见:[错误队列](yun-wei-guan-li/shu-ju-ren-wu-xiang-qing-ye/cuo-wu-dui-lie.md) #### 「消息列表」 详情见:[消息列表](yun-wei-guan-li/shu-ju-ren-wu-xiang-qing-ye/xiao-xi-lie-biao.md)