## 清洗脚本
[TOC]
在写入设置页面,支持清洗脚本功能。默认提供以下模版:
| 模版 | 描述 |
| --- | --- |
| DML.java | 代表DML字段标识脚本,源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete |
| CollectTime.java | 代表读取时间脚本,根据数据读取时间,在目的地表中增加相应的时间字段 |
| UpdateTime.java | 代表写入时间脚本:根据数据写入时间,在目的地表中增加相应的时间字段 |
| TimestampProcess.java | 将 Timestamp 类型数据转换为日期类型 |
| JSONProcess.java | 将 JSON 数据解析,并指定到目标表表结构(含有 JSON 数据解析样例) |
<p style='font-size:16px'> 具体操作如下:
![](https://img.kancloud.cn/58/e1/58e1e1b27d43cfe0313f782dafc83a98_1918x998.png)
* 点击「清洗脚本」开关可以打开或关闭清洗脚本功能。
* 默认为关闭状态,点击后将高亮开关,并进入新窗口进行清洗脚本编辑;
* 上方输入框支持用户输入或粘贴清洗逻辑,默认已经提供了基本的代码结构;
* 目前支持Java语言进行清洗逻辑的编写;
* 支持用户在脚本库调用脚本,不用每次都重新输入
* 左下角显示「脚本库」按钮,点击按钮,显示脚本库的脚本列表
* 点击选择需要使用的脚本,选择后脚本显示在编辑界面上,可修改
* 脚本库提供一些默认脚本模版,如DML.java,CollectTime.java,UpdateTime.java、TimestampProcess.java、JSONProcess.java
* DML.java:代表DML字段标识脚本,源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete
* CollectTime.java:代表读取时间脚本,根据数据读取时间,在目的地表中增加相应的时间字段
* UpdateTime.java:代表写入时间脚本:根据数据写入时间,在目的地表中增加相应的时间字段
* *使用默认的DML字段标识脚本、读取时间脚本以及写入时间脚本时,需要用户在目的地添加相应的字段,否则该脚本无法生效。*
* TimestampProcess.java:将 Timestamp 类型数据转换为日期类型
* JSONProcess.java:将 JSON 数据解析,并指定到目标表表结构(含有 JSON 数据解析样例)
* *使用默认的TimestampProcess.java和JSONProcess.java时,需要用户按实际情况修改模板中的字段信息。*
* 点击“使用此脚本”,当前脚本对表进行配置
* 支持查看样例数据,默认情况下系统将自动获取随机1个数据源的样例数据,您可选择获取随机10个或随机100个样例,点击按钮即可刷新获取。
* 若样例数据为空,系统将返回一对中括号。
* 支持对清洗逻辑进行试运行:
* 试运行的结果将实时显示,有变化的内容将以如图形式呈现。
* 若输入的清洗逻辑有误,运行结果窗口将报错,请您查看错误详细内容重试。
![](https://img.kancloud.cn/2d/7b/2d7bbeec485d31bb925c7222ab3b5a32_1917x999.png)
* 脚本编辑界面右上角显示关闭按钮,用户设置完毕后点击保存按钮退出
* 用户退出清洗脚本后,可以通过编辑按钮再次进入。
* 点击编辑按钮,进入清洗脚本界面,点击返回返回到配置页,与当前逻辑一致。
* 关闭清洗脚本,编辑脚本入口按钮不可编辑,鼠标放在上面浮层提示:请开启清洗脚本
* 激活前后,清洗脚本开启和编辑按钮均可编辑
* 点击保存就能够回到写入设置页面。
* 点击存入脚本库,用户可以将写好的脚本保存到脚本库以供后续使用。
![](https://img.kancloud.cn/01/18/011801ad40ca58c17668542f4857fc88_646x445.png)
* 用户可以自定义脚本名称用以区分不同的脚本。
* 用户可选择脚本列表已有脚本,点击确认脚本会被重新覆盖(只有管理员和自己创建的脚本才允许覆盖)。
* 用户输入新的名称,则会保存一个新的脚本。
* <p style='font-weight:bold;font-size:16px'> 当目的地为HBase时,要求必须在高级清洗中为rowkey赋值,否则目的地只会保留最新的一条记录。
如:
![](https://img.kancloud.cn/ad/de/addec0f005281a14a213703cbfe9cff0_1917x995.png)
<br/>
*****
## 使用案例
为了让用户更好地理解,故列出一些常见的使用脚本。
#### 一、DML字段标识脚本
脚本描述:源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete
```
import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
public class AddDMLField {
public static Map process(Map record, DpRecordMeta meta) {
// 系统会向 dml 字段写入标识:insert、update、delete。
// 如果是定时读取模式,所有 dml 字段会表示为 insert,只有通过实时模式读取数据时才会识别 update 和 delete。
record.put("dml", meta.getType());
// 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
return record;
}
}
```
#### 二、读取时间脚本
脚本描述:根据数据读取时间,在目的地表中增加相应的时间字段
```
import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
public class AddCollectTimeField {
private static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public Map process(Map record, DpRecordMeta meta) {
// 系统会向 collect_time 字段写入读取该数据的时间,格式为:yyyy-MM-dd HH:mm:ss。
record.put("collect_time", Instant.ofEpochMilli(meta.getSourceRecordTimestamp()).atZone(ZoneId.of("UTC")).format(DATE_TIME_FORMATTER));
// 保存脚本后,在目的地表结构中添加字段:collect_time,并把字段类型改为时间类型(或字符串类型)。
return record;
}
}
```
#### 三、写入时间脚本
脚本描述:根据数据写入时间,在目的地表中增加相应的时间字段
```
import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class AddUpdateTimeField {
private static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public Map process(Map record) {
// 系统会向 update_time 字段写入写入目的地的时间,格式为:yyyy-MM-dd HH:mm:ss。
record.put("update_time", LocalDateTime.now(ZoneId.of("Asia/Shanghai")).format(DATE_TIME_FORMATTER));
// 保存脚本后,在目的地表结构中添加字段:update_time,并把字段类型改为时间类型(或字符串类型)。
return record;
}
}
```
#### 四、日期类型转换脚本
脚本描述:将 Timestamp 类型数据转换为日期类型
```
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
public class TimestampProcess {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public Map process(Map record) {
// timestamp_type 类型为字符串,格式为:2019-07-24 17:06:54.000
final String timestampStr = (String) record.get("timestamp_type");
if (timestampStr != null) {
try {
// 将字符串转换为日期对象
final LocalDateTime localDateTime = LocalDateTime.parse(timestampStr, TIMESTAMP_FORMATTER);
// 将时间对象转换为 DATE_FORMATTER 格式的字符串,转化之后的字符串为:2019-07-24
record.put("formatedTime", DATE_FORMATTER.format(localDateTime));
} catch (Exception e) {
e.printStackTrace();
}
}
return record;
}
}
```
#### 五、Timestamp 转换脚本
脚本描述:将日期类型转换为Timestamp 类型数据
```
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
public class Date2TimeStampSample {
public Map<String, Object> process(Map<String, Object> record) throws Exception{
// date = "20180101 12:0:22.123"
String date = record.get("date");
long timeStamp = ZonedDateTime.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("Asia/Shanghai"))).toInstant().toEpochMilli();
record.put("ts", timeStamp);
return record;
}
}
```
#### 六、JSON解析脚本
脚本描述:将 JSON 数据解析,并指定到目标表表结构
```
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class Test_api_url_nameTransformEngine {
public List<Map<String, Object>> process(Map<String, Object> record) {
List<Map<String, Object>> records = new ArrayList<>();
if (!Objects.isNull(record)) {
String jsonStr = (String) record.get("content");
JSONObject jsonNode = JSON.parseObject(jsonStr);
JSONArray items = jsonNode.getJSONObject("reponseData").getJSONArray("data");
for (Object iterm : items) {
JSONObject eachIterm = (JSONObject) iterm;
String title = String.valueOf(eachIterm.get("title"));
if (title != null && title.length() > 100) {
title = title.substring(0, 100);
}
Map<String, Object> eachRowData = new HashMap<>();
eachRowData.put("_id", String.valueOf(eachIterm.get("id")));
eachRowData.put("tab", String.valueOf(eachIterm.get("tab")));
eachRowData.put("title", title);
records.add(eachRowData);
}
}
return records;
}
}
```
效果如下:
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/fd/2b/fd2b3097fa46514d6d6b3906990279dd_1633x876.png)
#### 七、数据脱敏
场景描述:用户想对表格中的信息进行脱敏,比如手机号或者身份证号。
例:身份证号脱敏,保留前四位和后四位,手机号码脱敏,保留前三位和后四位。
具体代码如下:
```
import java.util.Map;
public class Sid_2382_05b0f75d2382f96a23824d7023829c612382ef7ac208785d {
public Map process(Map record) {
record.put("id",((String)record.get("id")).replaceAll("(\\d{4})\\d{10}(\\d{4})","$1**********$2"));
record.put("phone",((String)record.get("phone")).replaceAll("(\\d{3})\\d{4}(\\d{4})","$1****$2"));
return record;
}
}
```
效果如下:
数据源的原表为:
![](https://img.kancloud.cn/80/ff/80ffdd698561dd62873defa87107af7f_612x142.png)
经过「高级清洗」后,数据目的地的表内容为:
![](https://img.kancloud.cn/ea/a9/eaa9d64220b6deba07822cdeb28a0d99_269x67.png)
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/ac/60/ac60528eae2735816d89951e234ffa4e_1905x943.png)
#### 八、枚举类型转换
场景描述:用户想对表格中的类型进行转换。
例:把原表中表示性别的‘0’和‘1‘,到目的地表中,转换为对应的‘男’和‘女’。
具体代码如下:
```
import java.util.Map;
public class Sid_2383_a8984a582383944223834f332383b7e52383aa115c629f93 {
public Map process(Map record) {
if (record.get("sex").equals("0"))
{record.put("sex","男");}
else if(record.get("sex").equals("1"))
{record.put("sex","女");}
else
{record.put("sex","");}
return record;
}
}
```
效果如下:
数据源的原表为:
![](https://img.kancloud.cn/28/a2/28a29b86cdb92a77445788642f1fd21e_336x65.png)
经过「高级清洗」后,数据目的地的表内容为:
![](https://img.kancloud.cn/31/14/3114483e6928a3a928407d62c41747cd_331x67.png)
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/16/29/1629a9e2cfcaa2ae63c22d48c35f1fa7_1905x950.png)
#### 九、字段求和
场景描述:用户想对表格中的几个字段的内的数据进行求和。
例:把原表中的两个字段内的薪水,到目的地表中,生成一个新的字段,其值为前两个字段的和。
具体代码如下:
```
import java.util.Map;
public class Sid_2384_f049d91a2384daea23844a3a23848fd923847cc3bc23dc6e {
public Map<String, Object>process(Map<String, Object>record) {
record.put("sum",(Double) record.get("salary1")+(Double) record.get("salary2"));
return record;
}
}
```
效果如下:
数据源的原表为:
![](https://img.kancloud.cn/fc/de/fcdefe026824ca8e842d9ba4e00adf5f_1360x130.png)
经过「高级清洗」后,数据目的地的表内容为:
![](https://img.kancloud.cn/5f/52/5f52fb834ace93771b9ae0bee88d1e82_1042x142.png)
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/a7/5d/a75d65de441051252a8f67eba51e1666_2836x1530.png)
#### 十、时间类型格式统一
场景描述:用户想对表格中字段的日期格式统一。(2019115、20190115、2019.01.15)转换成 YYYY-MM-DD(2019-01-15)。
具体代码如下:
```
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class ConventDateSample {
public Map<String, Object> process(Map<String, Object> record) throws Exception{
try{
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
Date datetime=df1.parse(record.get("datetime").toString());
String newDatetime = df2.format(datetime);
record.put("datetime",newDatetime);
}catch(Exception e){
}
return record;
}
}
```
效果如下:
数据源的原表为:
![](https://img.kancloud.cn/da/ab/daabc801d04442faf65d2655d8f67e7a_421x72.png)
经过「高级清洗」后,数据目的地的表内容为:
![](https://img.kancloud.cn/2b/25/2b257de2dc7deccd159bd48dbdf6445d_424x63.png)
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/b6/8f/b68f5195b221dde9676f6af6c8da328a_2848x1542.png)
#### 十一、库存日期结合
场景描述:用户想对表格中日期和时间,结合到一个字段内。
具体代码如下:
```
import java.util.Map;
import java.util.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class Sid_2516_8ffcc7ca2516201f25164ee7251690f3251641b41d3bbd5c {
public Map<String, Object>process(Map<String, Object>record) {
record.put("datetime2", record.get("date")+" "+ record.get("time"));
return record;
}
}
```
效果如下:
数据源的原表为:
![](https://img.kancloud.cn/31/7b/317bb1f86a5b7a9218e24df66f8ed41d_137x74.png)
经过「高级清洗」后,数据目的地的表内容为:
![](https://img.kancloud.cn/30/c3/30c3e2901e4c95f0a2d0635879db7ba6_262x74.png)
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/f7/0c/f70c4d5c76c970f0cb88e6b3fe440ba2_1894x951.png)
#### 十二、日期间隔计算
场景描述:用户想对表格中两个日期之间相差的天数,进行计算,再写入到一个字段内。
具体代码如下:
```
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class Sid_2519_5017952225194a2425194ff5251980f3251985af080e3b28 {
public Map<String, Object>process(Map<String, Object> record) throws Exception{
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date start=df2.parse(record.get("startTime").toString());
Date end=df2.parse(record.get("endTine").toString());
try{
int days= (int) ((df.parse(df.format(end)).getTime() -df.parse(df.format(start)).getTime()) / (24 * 60 * 60 * 1000));
record.put("days",days);
}catch(Exception e){
}
return record;
}
}
```
效果如下:
数据源的原表为:
![](https://img.kancloud.cn/db/81/db814cf662aa06ad1e57183148c524dd_311x64.png)
经过「高级清洗」后,数据目的地的表内容为:
![](https://img.kancloud.cn/9d/e9/9de98c7b7500a6d556f96527c13b92cf_379x68.png)
在DataPipeline的「高级清洗」界面的显示如下:
![](https://img.kancloud.cn/51/e9/51e94176523528f67273b9c9630108a8_1892x955.png)
#### 十三、非空字段判断
场景描述:判断一个字段是否为空
```
import java.util.Map;
import java.util.Objects;
import com.datapipeline.clients.utils.CodeEngineUtils;
public class Example13 {
public Map process(Map record)
final Object orderno = record.get("orderno");
boolean ordernoIsNull = CodeEngineUtils.isNull(orderno);
}
```
效果如下:
判断表字段「name」是否为空字段,在DataPipeline的「高级清洗」界面的显示如下:
:-: ![](https://img.kancloud.cn/a4/ae/a4aed1ec26c3b2756af454ae751a4318_2876x1644.png =480x)
从返回结果来看,该字段为空。
#### 十四、指定数据进错误队列
场景描述:指定某一些数据进入错误队列
```
import java.util.Map;
import java.util.Objects;
import java.lang.String;
import java.lang.Throwable;
import com.datapipeline.clients.codeengine.CustomizedCodeEngineException;
public class Example14 {
public Map process(Map record) throws CustomizedCodeEngineException {
if(Integer.parseInt(record.get("age").toString()) <= 20){
throw new CustomizedCodeEngineException(new Exception("Illegal data!"));
}
return record;
}
}
```
#### 十五、时区转换
场景描述:将上海时间转化为标准时间
```
import java.util.Map;
import java.time.*;
import java.time.format.DateTimeFormatter;
public class localTime2standardTime {
public Map<String, Object> process(Map<String, Object> record) {
String crt = (String)record.get("create_time");
try {
long ncrt = ZonedDateTime.parse(crt,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("Asia/Shanghai"))).toInstant().toEpochMilli();
record.put("timestamp",ncrt);
}
catch (Exception e) {
e.printStackTrace();
}
return record;
}
}
```
#### 十六、字符集编码转换
场景描述:当数据源的数据为乱码时,可通过高级清洗转换字符集编码,处理乱码数据。
```
package com.dp.exltcsv;
import java.io.UnsupportedEncodingException;
import java.util.Map;
public class utf2gbk {
public Map<String, Object> process(Map<String, Object> record) throws UnsupportedEncodingException {
// 针对单个字段的格式
// String key = (String)record.get("key");
// record.put(key,(Object)new String(key.getBytes("utf-8"),"gbk"));
// return record;
// 针对所有字段
for (String key : record.keySet()) {
String value = (String)record.get(key);
record.put(key,(Object)new String( value.getBytes("utf-8"), "gbk"));
}
return record;
}
}
```
- DataPipeline产品手册
- 产品更新日志
- v2.7.0 版本介绍
- v2.6.5 版本介绍
- v2.6.0 版本介绍
- v2.5.5 版本介绍
- v2.5.0 版本介绍
- v2.4.5 版本介绍
- v2.4.1 版本介绍
- v2.4.0 版本介绍
- v2.3 版本介绍
- v2.2.5 版本介绍
- v2.2 版本介绍
- v2.1 版本介绍
- v2.0.5 版本介绍
- v2.0 版本介绍
- v2.0 以前版本介绍
- 环境和数据库的部署要求
- Mysql - BINLOG配置方法
- Oracle - LOGMINER配置方法
- SQL Server - Change Tracking配置方法
- Postgre SQL-decoderbufs配置方法
- Postgre SQL-wal2json配置方法
- 常见场景操作
- 场景一:实时同步异构数据库数据(例:MySQL到Oracle)
- 场景二:批量同步异构数据库数据(例:SQL Server到MySQL)
- 场景三:API数据同步到关系型数据库(例:API到MySQL)
- 场景四:Hive数据同步到关系型数据库(例:Hive到SQLServer)
- 场景五:关系型数据库数据同步到Hive(例:Oracle到Hive为例)
- 场景六:Kafka数据同步到关系型数据(例:Kafka到MySQL为例)
- 场景七:一对多场景介绍
- 产品入门
- 数据同步任务
- 创建数据同步
- 配置数据源&数据目的地
- 配置数据源
- 配置MySQL数据源
- 配置Oracle数据源
- 配置SQL Server数据源
- 配置PostgreSQL数据源
- 配置FTP数据源
- 配置S3数据源
- 配置API数据源
- 配置Kafka数据源
- 配置Hive数据源
- 配置阿里云 OSS数据源
- 配置腾讯云TDSQL数据源
- 配置自定义数据源
- 配置数据目的地
- 配置MySQL数据目的地
- 配置Oracle数据目的地
- 配置SQL Server数据目的地
- 配置Greenplum数据目的地
- 配置Redshift数据目的地
- 配置TIDB数据目的地
- 配置FTP数据目的地
- 配置HBase数据目的地
- 配置HDFS数据目的地
- 配置Hive数据目的地
- 配置AnalyticDB for PostgreSQL数据目的地
- 配置Kafka数据目的地
- 数据同步的任务设置
- 读取设置
- 数据源资源组设置
- 批量功能
- SQL类型数据源读取条件设置
- 分数据源读取设置
- MySQL读取设置
- Oracle读取设置
- SQLServer读取设置
- PostgreSQL读取设置
- FTP文件系统读取设置
- S3文件系统读取设置
- Hive读取设置
- Kafka读取设置
- 阿里云OSS读取设置
- API读取设置
- 腾讯云TDSQL读取设置
- Hive数据源读取分区设置
- 其他设置
- 错误队列设置
- 邮件通知设置
- 任务分组设置
- 写入设置
- 批量功能
- 设置清洗脚本
- 数据目的地资源组设置
- 数据目的地设置
- 子任务设置
- 数据源变化设置
- 写入端数据一致性
- 批量读取后,先写入到临时表,再转存到实际表
- 高级设置
- 子任务设置
- Hive分区设置
- Column family 设置
- 数据同步任务管理
- 数据任务监控
- 重要任务
- 故障任务
- 非激活状态
- 性能关注
- 数据任务分组
- 管理数据同步
- 复制功能
- 回滚功能
- 重新同步功能
- 错误队列
- 消息列表
- 文件同步任务
- 创建文件同步
- 配置文件源
- 配置S3文件源
- 配置FTP文件源
- 配置文件目的地
- 配置HDFS文件目的地
- 文件同步的任务设置
- 任务流
- 核心功能介绍
- 新建任务流
- 配置核心组件
- 配置开始任务组件
- 配置数据任务组件
- 配置远程命令执行组件
- 配置延时器组件
- 配置权限设置
- 激活任务流
- 元数据管理
- 查看总览
- 搜索页
- 详情页
- 系统设置
- 数据任务
- 元数据管理
- 用户管理
- 常见问题
- 部署要求
- Docker安装的集群部署方式?
- DataPipeline的并发任务是线程还是进程?
- 分布式架构指的是什么样的框架?
- 生产环境配置推荐及回答?
- DataPipeline的服务是统一管理还是私有化部署?若是私有化部署若要升级怎么操作?
- DataPipeline的Kafka如果与客户目前使用的Kafka版本不一样,是否需要适配?
- 请说明产品的HA和容灾方案 ?
- DataPipeline有多少独立的服务?各容器的作用是什么?
- 在从节点上装mysql,对单表导入1000万数据对任务有影响吗?
- 数据传输
- 数据源/数据目的地
- 基本要求
- 数据源或目的地可以重复使用吗?
- 数据源多个表是否可以写到目的地一张表?
- 数据源或目的地连接失败怎么办?
- 数据源
- MySQL
- DataPieline如何应对Mysql数据库表和字段名称大小写不敏感问题?
- DataPipeline Mysql数据源的实时处理模式下,暂时无法读取哪些字段类型?
- Mysql数据源实时处理模式下,暂不支持那些语句操作的同步?
- Oracle
- Oracle实时模式为LogMiner时,为什么还需要设置读取频率?
- SQL Server
- SQL Server数据源读取方式选择Change Tracking时需要注意什么?
- SQL Server实时模式为Change Tracking时,为什么还需要设置读取频率?
- PostgreSQL
- Hive
- Hive数据源支持哪些文件格式?
- Kafka
- FTP文件系统
- FTP数据源CSV静态表结构时,用户为什么需要确认首行是否为字段名称?
- FTP数据源静态表结构和动态表结构的区别是什么?
- FTP源的文件在不断写入的情况下,DataPipeline的读取与写入的模式是怎样的?
- FTP数据源支持哪些编码方式?
- S3文件系统
- 阿里云OSS文件系统
- API
- 腾讯云TDSQL
- 数据目的地
- MySQL目的地常见问题
- 时区问题需要注意什么?
- SQL Server目的地常见问题
- 行级的物理删除,使用Change Tracking的方式,是否获取的到?DataPipeline会如何处理这类的数据?
- 数据源实时模式是否可以同步视图?
- Oracle目的地常见问题
- TiDB目的地常见问题
- 目的地TIDB同步表时,需要注意什么?
- Redshift目的地常见问题
- Redshift 并发数设置是50,DataPipeline对100个表并发插入的方案?对Redshift 性能的影响?DataPipeline对大数据量并发插入Redshift 的处理方式?
- Hive目的地常见问题
- 如何避免Hive目的地出现小文件问题?
- DataPipeline同步数据到Hive目的地表时,数据源发生变化会怎么样?
- 我们目前对已做好Hive分区逻辑的目的地,是不是不支持继续往里写?只能写新表?
- 配置Hive目的地是需要注意哪些?
- Hive目的地时字段转换需要注意哪些问题?
- GreenPlum目的地常见问题
- Kafka目的地常见问题
- kafka目的地支持设置新的分区吗?
- 多个表结构不一致的表,可以同步至kafka的同一个topic吗?
- HDFS目的地常见问题
- FTP文件系统目的地常见问题
- 目的地FTP时,我们现在是按什么逻辑创建文件的?
- AnalyticDB for PostgreSQL目的地常见问题
- Hbase目的地常见问题
- 目的地常见问题
- 各个数据目的地的写入方式分别是采用什么形式?
- DataPipeline支持的数据库的目标端的连接方式是什么?
- DataPipeline支持的目标端冲突数据处理机制是什么?
- 任务设置
- 读取模式相关问题
- 任务设置中读取频率的实现原理是什么样的?
- 采用实时同步的情况,新建同步任务时,源端的数据表有大量的存量数据,如何通过产品实现数据同步的一致性的?
- 定时批量清目标表数据的逻辑是什么样的?
- 数据源端基于日志的实时模式,是源库推送还是我们做捕获?
- 关系型数据如MySQL,如果出现大量的数据修改,BinLog日志如何抓取,如何实现及时的消费?
- 读取与写入的速率限制是按照任务还是按照表?
- 我们的无侵入性是如何实现的?是完全无侵入性,还是侵入性很小?是否无侵入性就意味着源端服务器没有访问请求的压力,那目的端写入是否还存在压力?
- 动态限速的策略是什么?
- 读写一致性的逻辑是什么?
- v2.6版本增量的逻辑如何实现?
- 重新同步策略问题
- 如果任务激活后进行重新同步,目的地数据会清空吗?
- 读取设置
- 如何设置数据读取条件where语句?有哪些注意事项?
- 用户选择实时模式时,选表时发现有一些表置灰不能同步要想同步这些表该怎么办?
- 同步完成后暂停,取消表后又新加入此表,DataPipeline对于此表的处理策略是什么样的?
- 写入设置
- 表和字段问题
- 目的地表名称和字段名称最长字符长度有什么特定限制吗?以及表名称和字段名称的输入规则要求是什么?
- 同步数据到异构数据库,字段类型会有变化吗?
- 表结构中的精度和标度是什么意思?
- DataPipeline所支持的不同数据类型有哪些?kafka schema的数据类型和不同库间的数据库转换规则?
- 数据源端支持哪些字符集类型?
- Hive作为目的地表需要注意什么?
- Hive作为数据源且格式为parquet时需要注意什么?
- 如何新增一个字段?
- 主键相关问题
- 无主键的表的同步逻辑是怎样的?
- 选择增量识别为主键,如何保证源端和目标的数据一致性呢,如果该记录有修改,系统是怎么处理的?
- 数据目的地ODS有大量无主键表,同步时DataPipeline是如何处理的?
- 表结构变化问题
- 任务激活前后,数据源变化表结构变化有什么不同?
- 当数据源表结构更新时DataPipeline是如何处理的?
- 如果目的地端已经存在了数据库表,但表结构不相同,我们能否将数据写入到该表?
- DataPipeline是否支持将不同的数据表(在不同的数据库中,但是表结构一致,同时有主键和唯一性识别的字段),导入同一个目的端表?
- 管理数据同步
- 基本概念
- 错误通知是什么?
- 错误队列是什么?
- 哪些数据错误会进入错误队列?
- 请简述产品支持的目标端冲突数据处理机制?
- 错误队列里的原始数据是指源端读取的原始数据还是说经过清洗规则后的数据?
- 激活任务后,有哪些参数可以修改?
- 同步状态
- 部分表已读取已写入等都为0,但完成进度为100%?
- 任务详情页中的数据读写量具体含义是?为什么有时候还会减少?
- 如何去查看FTP源和FTP文件中的文件有没有同步完成?
- 数据任务激活后是不允许修改任何设置吗?
- 激活任务后,数据百分比为什么会往回条,如:从99% 跳到30%
- 同步逻辑
- 自动重启逻辑是怎样的?
- 目前数据同步的暂停重启策略是什么样的?暂停和重启后是如何读取和写入数据的?
- 目前进行数据任务的时候,读取速率远大于写入速率,其中,已读取且还未写入的数据会暂时存储在Kafka上,但是由于Kafka存储空间有限,超出后容易造成数据的丢失,这怎么办?
- 如果一条数据多次、频繁变化,在DataPipeline产品侧如何保证数据的并行和保序是如何保证的?
- 用户导入数据后,hdfs认证机制,数据哪些用户可以使用,用户数据安全如何确保?
- 请简述目标端性能可管理性(可提供的性能监控、分析、调优手段等)
- DataPipeline是否支持远程抽取数据?
- 如果一条数据多次、频繁变化,在DataPipeline产品侧如何保证数据的并行和保序是如何保证的?
- 产品到期问题
- 产品使用期限到期所有任务都会被暂停任务,那么如何提前获知产品使用期限是否到期以避免任务被暂停?
- 实际场景中,目的地服务器每周可能会有aws升级,需要暂停服务器,DataPipeline有没有对应的方案能够满足?
- 任务报错
- redis连接异常任务暂停了怎么办?
- 文件传输
- FTP文件源同步整个文件时是如何同步的?
- 任务流
- 如何使用远程命令执行脚本来调取另外一个任务流?
- 任务流开启状态下,任务此时关闭掉任务流,正在同步的组件任务的处理逻辑是什么样的?
- 任务流中上游组件有多个组件任务时,上游任务全部完成才能开启下游任务吗?
- 任务既连了开始键,又配置了依赖,执行逻辑会是什么样的?
- 任务流中新建任务为何只有读取方式为增量识别字段,没有binlog?
- 任务监控
- 什么样的实时传输任务会在性能关注中显示?