## 场景六:Kafka数据同步到关系型数据
### 一、核心流程
1\. 创建或选择数据源 & 目的地
2\. 任务设置
3\. 读取设置
4\. 写入设置
5\. 管理数据任务
### 二、核心操作步骤
1. 创建或选择数据源 & 目的地,填写数据任务名称
* 若已存在想要选择的数据源 & 目的地,则直接选择,可选择一个数据源和多个数据目的地。
* 若还未创建数据源 & 目的地,可参照[配置Kafka数据源](chapter1/shu-ju-yuan/pei-zhi-kafka.md) & [配置数据目的地](如何配置数据目的地.md) 配置方法。
![](https://img.kancloud.cn/6b/a0/6ba01531b46ced36338d7f6f7ddf846a_1215x660.png)
2. 点击下一步,进入任务设置
任务设置主要包括了以下几个部分:
* 重要任务设置:如该任务为重要数据任务,您可在任务设置页面设置重要任务星标。
* 参与人:拥有该任务的浏览和编辑权限,可以查看数据任务的所有通知
* 数据源设置:包括选择数据源的读取方式,读取并发数量,读取速率限制,高级设置等,可根据数据的特性和用户的需求场景对数据读取进行相应设置
* 数据目的地设置:包括写入并发数量,处理速率限制,表和字段名称,高级设置等,可对数据写入进行设置
* 其他设置:包括错误队列设置和邮件通知设置
下面主要介绍本实例的相关步骤。
#### 「数据源设置」
![](https://img.kancloud.cn/df/e3/dfe3ed66358b67f1ef6e3520d56b2a74_1964x764.png)
* 资源配置
* 设置当前任务数据源使用资源组,通过合理分配任务与资源组,可使重要任务计算资源得到保障。
* 读取方式
当数据源为Kafka时,无该选项
* 读取频率
只有定时读取一项,并且需要设置定时时长,默认为60s,可切换为Cron 表达式
* 读取并发数量
* 读取并发数量是指该任务从数据源中并行读取表的数量,默认为5。
* 具体读取并发数量要根据数据源可接受请求量决定。
* 详细策略:并发数量代表着并行读取topic的数量,当topic读取完毕时系统会释放掉该线程,其他的topic可继续抢占该线程进程数据的读取。任务暂停后又重启所有的topic会重新抢占线程,系统会按照断点续传的方式继续同步新增数据。
* 读取速率限制
* 当用户设置读取速率限制,系统平均读取速率将不会超过该数值。
* 可按流量或行数作为限制条件。
* 用户能够不选、单选或多选,若同时勾选,两种限制将同时应用。
* 勾选后即可激活对应的速率限制,请输入正整数;
* 数据源高级设置
请参见[Kafka读取设置](Kafka读取设置.md)
#### 「数据目的地设置」
![](https://img.kancloud.cn/8a/50/8a5089717d800f7983e3a845f78d7e9c_1952x684.png)
* 设置当前任务数据目的地使用资源组,通过合理分配任务与资源组,可使重要任务计算资源得到保障。
* 写入并发数量
* 基本逻辑与「读取并发数量」一致。写入并发数量是指该任务从数据源中并行写入表的数量,默认为5。
* 具体写入并发数量要根据数据源可接受请求量决定。
* 写入速率限制
* 当用户设置写入速率限制,系统平均写入速率将不会超过该数值。
* 可按流量或行数作为限制条件。
* 用户能够不选、单选或多选,若同时勾选,两种限制将同时应用。
* 勾选后即可激活对应的速率限制,请输入正整数;
* 表和字段名称大小写
* 支持用户设置目的地表名称和字段名称大小写
* 选择【自定义】,任务执行后,系统会根据用户自定义的表和字段名称写入到目的地。
* 选择【全部大写】,任务执行后,系统会将用户设置的所有表和字段名称转换为大写(只包含英文)后写入到目的地。
* 选择【全部小写】,任务执行后,系统会将用户设置的所有表和字段名称转换为小写(只包含英文)后写入到目的地。
* 目的地高级设置
请参见[子任务设置](子任务设置.md)、[数据源变化设置](数据源变化设置.md)、[写入一致性](写入一致性.md)
#### 「其他设置」
请参见[错误队列设置](错误队列设置.md)、[邮件通知设置](邮件通知设置.md)
3. 点击下一步,选择要同步的Topic
![](https://img.kancloud.cn/67/9b/679ba6c2834ffd40899b61d2a14c8b4e_982x723.png)
勾选需要的同步的表,点击保存。
4. 读取设置
![](https://img.kancloud.cn/f6/f4/f6f4d03f9c333ef5274364eb831cd69a_1247x561.png)
读取设置主要包括几个部分:
* 同步列表:用户选择的要同步的Topic,批量功能见[批量设置](批量功能.md)
* 表结构:选中某个表后,会展示该表的字段名称、类型、精度、标度、Not Null等信息
* 传输队列设置:可对每个表单独进行设置,优先级高于任务设置中的传输队列设置
本实例的要注意的部分:
* 表结构:
* 当Kafka数据源的数据格式为Avro时,展现表结构
* 当数据格式为JSON或String时,不展现表结构
* 读取条件
Kafka数据源时不支持设置读取条件
5. 点击下一步,进入写入设置页面
![](https://img.kancloud.cn/f7/90/f7905b1aa6277d88ceb5eee882e87e78_1244x556.png)
该页面主要包括:
* 同步列表:用户选择的要同步的Topic,可点击进行切换,对每个Topic的写入规则进行设置,详情见[批量设置](批量功能1.md)
* 映射列表:从数据源到目的地的映射详情,可对目的地表名、字段名、类型、精度、标度、Not Null、主键、是否同步字段等进行修改,可以添加和删除字段
* 高级设置:可以进行子任务Batch 设置,是单表级别的设置,优先级高于任务设置中的子任务设置,详情见[子任务设置](子任务设置.md)
* 清洗脚本:可以开启后进行编辑,自定义对数据完成一些替换或清洗操作
本实例的要注意的部分:
* 当数据格式为avro时,展示数据源到目的地的表结构映射关系,可以直接点击保存设置,激活任务,会按照映射关系同步数据。用户也可根据自己的需求自定义清洗脚本清洗数据。详情见[设置清洗脚本](pzgz/qxjb.md)
* 当数据格式为JSON或String时,无表结构的映射。
![](https://img.kancloud.cn/67/f0/67f0ef8490f8d64068d2b898bd7e1945_1244x423.png)
用户需要自定义表结构,必须在目的写入设置界面手动添加目的地字段和设置清洗脚本解析数据,写出映射关系,才能保证数据的正常同步。
**以一具体数据为例**:
JSON 样例如下:
```
{
"_id": "topic_json_par_1-0-21",
"key": null,
"value": "{\"cust-id\":7979797,\"month\":11,\"expenses\":18.72}"
}
```
* 假设我想写入目的地的字段是「_id」和value中的「cust-id」、「month」、「expenses」
* 首先需要在写入设置中目的地表增加这些字段,可以自己设定字段名,字段类型需要与脚本中的类型相同
![](https://img.kancloud.cn/0d/e5/0de5153f3838ebc06304d0663ef5d695_931x382.png)
* 再编辑清洗脚本,对这些字段的数据来源进行说明
```
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class JsonProcess {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public List<Map<String, Object>> process(Map<String, Object> record) throws Exception {
List<Map<String, Object>> records = new ArrayList<>();
if (!Objects.isNull(record)) {
//取出想要的字段"_id",存为字符串类型
String id = (String) record.get("_id");
//先将value取出, value 类型为字符串,其中存储的是 JSON 结构的字符
String jsonTypeStr = (String) record.get("value");
if (jsonTypeStr != null && jsonTypeStr.length() > 0) {
// 将 value字符串转换成 JSON 对象
final JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonTypeStr);
// 从 value中获取"cust-id"元素的值,原数据为数值类型,存为数值类型
final Integer cust = jsonNode.get("cust-id").asInt();
// 从value中获取month元素
final Integer month = jsonNode.get("month").asInt();
// 从value中获取expenses元素
final Integer expenses = jsonNode.get("expenses").asInt();
//将取出的元素写入
Map<String, Object> eachRowData = new HashMap<>();
//put函数第一个参数需与目的地表名的字段名一致,第二个参数为上面定义的变量名
eachRowData.put("_id",id);
eachRowData.put("cust", cust);
eachRowData.put("month", month);
eachRowData.put("expense", expenses);
records.add(eachRowData);
}
}
return records;
}
}
```
* 点击试运行,可查看代码清洗效果。若出现脚本编译失败,可点击查看详情检查具体报错位置,进行代码修改。
![](https://img.kancloud.cn/12/7f/127fc78d27eaa2fbdd227a0e5155c727_1258x356.png)
* 清洗脚本编辑完成后,点击保存。
<br/>
写入设置完成后,点击「保存」 ,可激活任务。
6. 管理数据任务
任务激活后,可看到任务详情页。
![](https://img.kancloud.cn/21/7b/217bc0f082f79671f472c330fd701417_1257x674.png)
#### 「数据任务统计」
* 已读取数据量:
* 指 DataPipeline 从数据源已读取的数据量。
* 当 DataPipeline 系统重启,会根据断点续传机制从上一次读取记录点开始重新读取数据,重复读取的数据量不会记录到已读取数据量里。
* 无主键的定时同步时全量替换数据,故每次定时读取全量数据,数据源没有更新的数据会再次读取。
* 已处理数据量:
* 指 DataPipeline 已处理的数据量,这里包括:同步到数据目的的数据量和进入到错误队列的数据量。
* 当 DataPipeline 系统重启,会根据断点续传机制从上一个读取记录点开始重新写入部分数据,但这部分数据不会记录到已写入数据量里。
* 错误队列:指已读取的数据中系统判断无法写入到数据目的地,而异步放到错误队列中的数据量。:
* 读取速率:指DataPipeline从数据源读取数据的速率。
* 处理速率:指DataPipeline处理数据的速率
#### 「复制任务」
详情见:[复制任务](复制功能.md)
#### 「错误队列」
详情见:[错误队列](yun-wei-guan-li/shu-ju-ren-wu-xiang-qing-ye/cuo-wu-dui-lie.md)
#### 「消息列表」
详情见:[消息列表](yun-wei-guan-li/shu-ju-ren-wu-xiang-qing-ye/xiao-xi-lie-biao.md)
- DataPipeline产品手册
- 产品更新日志
- v2.7.0 版本介绍
- v2.6.5 版本介绍
- v2.6.0 版本介绍
- v2.5.5 版本介绍
- v2.5.0 版本介绍
- v2.4.5 版本介绍
- v2.4.1 版本介绍
- v2.4.0 版本介绍
- v2.3 版本介绍
- v2.2.5 版本介绍
- v2.2 版本介绍
- v2.1 版本介绍
- v2.0.5 版本介绍
- v2.0 版本介绍
- v2.0 以前版本介绍
- 环境和数据库的部署要求
- Mysql - BINLOG配置方法
- Oracle - LOGMINER配置方法
- SQL Server - Change Tracking配置方法
- Postgre SQL-decoderbufs配置方法
- Postgre SQL-wal2json配置方法
- 常见场景操作
- 场景一:实时同步异构数据库数据(例:MySQL到Oracle)
- 场景二:批量同步异构数据库数据(例:SQL Server到MySQL)
- 场景三:API数据同步到关系型数据库(例:API到MySQL)
- 场景四:Hive数据同步到关系型数据库(例:Hive到SQLServer)
- 场景五:关系型数据库数据同步到Hive(例:Oracle到Hive为例)
- 场景六:Kafka数据同步到关系型数据(例:Kafka到MySQL为例)
- 场景七:一对多场景介绍
- 产品入门
- 数据同步任务
- 创建数据同步
- 配置数据源&数据目的地
- 配置数据源
- 配置MySQL数据源
- 配置Oracle数据源
- 配置SQL Server数据源
- 配置PostgreSQL数据源
- 配置FTP数据源
- 配置S3数据源
- 配置API数据源
- 配置Kafka数据源
- 配置Hive数据源
- 配置阿里云 OSS数据源
- 配置腾讯云TDSQL数据源
- 配置自定义数据源
- 配置数据目的地
- 配置MySQL数据目的地
- 配置Oracle数据目的地
- 配置SQL Server数据目的地
- 配置Greenplum数据目的地
- 配置Redshift数据目的地
- 配置TIDB数据目的地
- 配置FTP数据目的地
- 配置HBase数据目的地
- 配置HDFS数据目的地
- 配置Hive数据目的地
- 配置AnalyticDB for PostgreSQL数据目的地
- 配置Kafka数据目的地
- 数据同步的任务设置
- 读取设置
- 数据源资源组设置
- 批量功能
- SQL类型数据源读取条件设置
- 分数据源读取设置
- MySQL读取设置
- Oracle读取设置
- SQLServer读取设置
- PostgreSQL读取设置
- FTP文件系统读取设置
- S3文件系统读取设置
- Hive读取设置
- Kafka读取设置
- 阿里云OSS读取设置
- API读取设置
- 腾讯云TDSQL读取设置
- Hive数据源读取分区设置
- 其他设置
- 错误队列设置
- 邮件通知设置
- 任务分组设置
- 写入设置
- 批量功能
- 设置清洗脚本
- 数据目的地资源组设置
- 数据目的地设置
- 子任务设置
- 数据源变化设置
- 写入端数据一致性
- 批量读取后,先写入到临时表,再转存到实际表
- 高级设置
- 子任务设置
- Hive分区设置
- Column family 设置
- 数据同步任务管理
- 数据任务监控
- 重要任务
- 故障任务
- 非激活状态
- 性能关注
- 数据任务分组
- 管理数据同步
- 复制功能
- 回滚功能
- 重新同步功能
- 错误队列
- 消息列表
- 文件同步任务
- 创建文件同步
- 配置文件源
- 配置S3文件源
- 配置FTP文件源
- 配置文件目的地
- 配置HDFS文件目的地
- 文件同步的任务设置
- 任务流
- 核心功能介绍
- 新建任务流
- 配置核心组件
- 配置开始任务组件
- 配置数据任务组件
- 配置远程命令执行组件
- 配置延时器组件
- 配置权限设置
- 激活任务流
- 元数据管理
- 查看总览
- 搜索页
- 详情页
- 系统设置
- 数据任务
- 元数据管理
- 用户管理
- 常见问题
- 部署要求
- Docker安装的集群部署方式?
- DataPipeline的并发任务是线程还是进程?
- 分布式架构指的是什么样的框架?
- 生产环境配置推荐及回答?
- DataPipeline的服务是统一管理还是私有化部署?若是私有化部署若要升级怎么操作?
- DataPipeline的Kafka如果与客户目前使用的Kafka版本不一样,是否需要适配?
- 请说明产品的HA和容灾方案 ?
- DataPipeline有多少独立的服务?各容器的作用是什么?
- 在从节点上装mysql,对单表导入1000万数据对任务有影响吗?
- 数据传输
- 数据源/数据目的地
- 基本要求
- 数据源或目的地可以重复使用吗?
- 数据源多个表是否可以写到目的地一张表?
- 数据源或目的地连接失败怎么办?
- 数据源
- MySQL
- DataPieline如何应对Mysql数据库表和字段名称大小写不敏感问题?
- DataPipeline Mysql数据源的实时处理模式下,暂时无法读取哪些字段类型?
- Mysql数据源实时处理模式下,暂不支持那些语句操作的同步?
- Oracle
- Oracle实时模式为LogMiner时,为什么还需要设置读取频率?
- SQL Server
- SQL Server数据源读取方式选择Change Tracking时需要注意什么?
- SQL Server实时模式为Change Tracking时,为什么还需要设置读取频率?
- PostgreSQL
- Hive
- Hive数据源支持哪些文件格式?
- Kafka
- FTP文件系统
- FTP数据源CSV静态表结构时,用户为什么需要确认首行是否为字段名称?
- FTP数据源静态表结构和动态表结构的区别是什么?
- FTP源的文件在不断写入的情况下,DataPipeline的读取与写入的模式是怎样的?
- FTP数据源支持哪些编码方式?
- S3文件系统
- 阿里云OSS文件系统
- API
- 腾讯云TDSQL
- 数据目的地
- MySQL目的地常见问题
- 时区问题需要注意什么?
- SQL Server目的地常见问题
- 行级的物理删除,使用Change Tracking的方式,是否获取的到?DataPipeline会如何处理这类的数据?
- 数据源实时模式是否可以同步视图?
- Oracle目的地常见问题
- TiDB目的地常见问题
- 目的地TIDB同步表时,需要注意什么?
- Redshift目的地常见问题
- Redshift 并发数设置是50,DataPipeline对100个表并发插入的方案?对Redshift 性能的影响?DataPipeline对大数据量并发插入Redshift 的处理方式?
- Hive目的地常见问题
- 如何避免Hive目的地出现小文件问题?
- DataPipeline同步数据到Hive目的地表时,数据源发生变化会怎么样?
- 我们目前对已做好Hive分区逻辑的目的地,是不是不支持继续往里写?只能写新表?
- 配置Hive目的地是需要注意哪些?
- Hive目的地时字段转换需要注意哪些问题?
- GreenPlum目的地常见问题
- Kafka目的地常见问题
- kafka目的地支持设置新的分区吗?
- 多个表结构不一致的表,可以同步至kafka的同一个topic吗?
- HDFS目的地常见问题
- FTP文件系统目的地常见问题
- 目的地FTP时,我们现在是按什么逻辑创建文件的?
- AnalyticDB for PostgreSQL目的地常见问题
- Hbase目的地常见问题
- 目的地常见问题
- 各个数据目的地的写入方式分别是采用什么形式?
- DataPipeline支持的数据库的目标端的连接方式是什么?
- DataPipeline支持的目标端冲突数据处理机制是什么?
- 任务设置
- 读取模式相关问题
- 任务设置中读取频率的实现原理是什么样的?
- 采用实时同步的情况,新建同步任务时,源端的数据表有大量的存量数据,如何通过产品实现数据同步的一致性的?
- 定时批量清目标表数据的逻辑是什么样的?
- 数据源端基于日志的实时模式,是源库推送还是我们做捕获?
- 关系型数据如MySQL,如果出现大量的数据修改,BinLog日志如何抓取,如何实现及时的消费?
- 读取与写入的速率限制是按照任务还是按照表?
- 我们的无侵入性是如何实现的?是完全无侵入性,还是侵入性很小?是否无侵入性就意味着源端服务器没有访问请求的压力,那目的端写入是否还存在压力?
- 动态限速的策略是什么?
- 读写一致性的逻辑是什么?
- v2.6版本增量的逻辑如何实现?
- 重新同步策略问题
- 如果任务激活后进行重新同步,目的地数据会清空吗?
- 读取设置
- 如何设置数据读取条件where语句?有哪些注意事项?
- 用户选择实时模式时,选表时发现有一些表置灰不能同步要想同步这些表该怎么办?
- 同步完成后暂停,取消表后又新加入此表,DataPipeline对于此表的处理策略是什么样的?
- 写入设置
- 表和字段问题
- 目的地表名称和字段名称最长字符长度有什么特定限制吗?以及表名称和字段名称的输入规则要求是什么?
- 同步数据到异构数据库,字段类型会有变化吗?
- 表结构中的精度和标度是什么意思?
- DataPipeline所支持的不同数据类型有哪些?kafka schema的数据类型和不同库间的数据库转换规则?
- 数据源端支持哪些字符集类型?
- Hive作为目的地表需要注意什么?
- Hive作为数据源且格式为parquet时需要注意什么?
- 如何新增一个字段?
- 主键相关问题
- 无主键的表的同步逻辑是怎样的?
- 选择增量识别为主键,如何保证源端和目标的数据一致性呢,如果该记录有修改,系统是怎么处理的?
- 数据目的地ODS有大量无主键表,同步时DataPipeline是如何处理的?
- 表结构变化问题
- 任务激活前后,数据源变化表结构变化有什么不同?
- 当数据源表结构更新时DataPipeline是如何处理的?
- 如果目的地端已经存在了数据库表,但表结构不相同,我们能否将数据写入到该表?
- DataPipeline是否支持将不同的数据表(在不同的数据库中,但是表结构一致,同时有主键和唯一性识别的字段),导入同一个目的端表?
- 管理数据同步
- 基本概念
- 错误通知是什么?
- 错误队列是什么?
- 哪些数据错误会进入错误队列?
- 请简述产品支持的目标端冲突数据处理机制?
- 错误队列里的原始数据是指源端读取的原始数据还是说经过清洗规则后的数据?
- 激活任务后,有哪些参数可以修改?
- 同步状态
- 部分表已读取已写入等都为0,但完成进度为100%?
- 任务详情页中的数据读写量具体含义是?为什么有时候还会减少?
- 如何去查看FTP源和FTP文件中的文件有没有同步完成?
- 数据任务激活后是不允许修改任何设置吗?
- 激活任务后,数据百分比为什么会往回条,如:从99% 跳到30%
- 同步逻辑
- 自动重启逻辑是怎样的?
- 目前数据同步的暂停重启策略是什么样的?暂停和重启后是如何读取和写入数据的?
- 目前进行数据任务的时候,读取速率远大于写入速率,其中,已读取且还未写入的数据会暂时存储在Kafka上,但是由于Kafka存储空间有限,超出后容易造成数据的丢失,这怎么办?
- 如果一条数据多次、频繁变化,在DataPipeline产品侧如何保证数据的并行和保序是如何保证的?
- 用户导入数据后,hdfs认证机制,数据哪些用户可以使用,用户数据安全如何确保?
- 请简述目标端性能可管理性(可提供的性能监控、分析、调优手段等)
- DataPipeline是否支持远程抽取数据?
- 如果一条数据多次、频繁变化,在DataPipeline产品侧如何保证数据的并行和保序是如何保证的?
- 产品到期问题
- 产品使用期限到期所有任务都会被暂停任务,那么如何提前获知产品使用期限是否到期以避免任务被暂停?
- 实际场景中,目的地服务器每周可能会有aws升级,需要暂停服务器,DataPipeline有没有对应的方案能够满足?
- 任务报错
- redis连接异常任务暂停了怎么办?
- 文件传输
- FTP文件源同步整个文件时是如何同步的?
- 任务流
- 如何使用远程命令执行脚本来调取另外一个任务流?
- 任务流开启状态下,任务此时关闭掉任务流,正在同步的组件任务的处理逻辑是什么样的?
- 任务流中上游组件有多个组件任务时,上游任务全部完成才能开启下游任务吗?
- 任务既连了开始键,又配置了依赖,执行逻辑会是什么样的?
- 任务流中新建任务为何只有读取方式为增量识别字段,没有binlog?
- 任务监控
- 什么样的实时传输任务会在性能关注中显示?