# 数据清洗
## 1.什么是「数据清洗」?
数据工程师会根据业务需求,在执行数据任务过程中进行数据处理。DataPipeline提供可视化操作界面,帮助用户零开发完成自定义表名、字段名称,过滤、替换数据等「数据清洗」功能。
## 2.数据清洗支持哪些功能?
* 自定义修改目的地表名和字段名、字段类型、标度、精度、主键、NotNull;
* 支持设置数据读取条件;
* 支持针对每个字段设置:字段过滤、过滤规则、替换规则;
* 支持自定义清洗脚本规则。
## 3.如何设置数据清洗规则?
勾选表(集合)后,点击「保存」立即显示配置规则页面。
* 显示内容:数据任务名称、数据源和数据目的地信息、Table列表、数据源表结构详情、目的地表结构详情、清洗脚本开关。
* Table 列表显示将同步到目的地的数据源表名称,可点击列表顶部「编辑」图标修改需要同步的数据表。
* 在列表中选择数据源表,右侧会显示该数据源表和目的地表的映射关系详情。
* 用户可浏览该数据表的内的所有字段和类型在数据源和目的地之间的映射关系:
* 系统将实时检测各表名称是否符合目的地表名称要求;
* 若存在同名表将会提示用户「目的地已存在同名的表」。

* 主文案:任务激活后,系统会根据用户设置的表结构进行数据同步。
* 若发现表结构不一致,系统会修改目的地已存在的表结构后继续完成数据同步。
* 若发现字段类型不一致,系统不会对此进行修改。
* 目的地为Kafka的任务,将检验目的地是否已经存在的Topic名称,若不存在将提示「Topic名称不存在」;

* 若用户Kafka目的地支持自动创建Topic,则该任务激活后将自动创建该Topic;
* 若用户Kafka目的地不支持自动创建Topic,则要求用户输入已存在的Topic后才可点击保存。
* 支持对目的地表名称的自定义修改;
* 支持对目的地表的字段名称的自定义修改;
* 修改规则请参考下表:
* 若不符合规则,则会提示用户具体错误信息。
<table border="1"> <tr> <th>数据目的地类型</th> <th>自定义表名称限制</th> <th>自定义字段名称限制</th> </tr> <tr> <td >RDS、Oracle、Redshift 、Hive、TiDB、Greenplum</td><td >只允许英文、下划线和数字。</td> <td rowspan="2">只允许英文、下划线和数字;</br> 不支持同名字段</td> </tr>
<tr> <td >FTP、Kakfa</td><td >只允许英文、下划线、数字、点和横杠。</td> </tr></table>
* 数据源表结构:
* 数据源为SQL类型数据库
* 显示数据源表名称、字段名称、字段类型、标度、精度、Not Null、主键
* 所有内容不可修改,只能浏览
* 数据源为FTP、S3
* 显示字段名称,只能浏览,不能修改。
* 数据源为Couchbase
* 目的地显示:Bucket名称、Bucket密码、Topic名称
* 数据源为API
* 目的地显示:字段名称、字段类型、标度、精度、Not Null 、主键、同步、过滤规则、清洗规则。
* 数据源为Kafka
* 数据格式为JSON、STRING时,目的地显示字段名称、字段类型、标度、精度、Not Null 、主键、同步、过滤规则、清洗规则
* 数据目的地表结构:
* 数据目的地类型:SQL类型数据库、Greenplum、TiDB或Redshift
* 显示目的地表名称、字段名称、字段类型、标度、精度、Not Null 、主键
* 目的地表结构默认显示:数据源表结构经过Schema Mapping后的表结构。
* 数据目的地类型:FTP
* 显示目的地文件名、字段名称、唯一键
* 数据目的地类型:kafka
* 显示:Topic名称、字段名称、字段类型、唯一键
* 数据目的地类型:Hive
* 显示目的地表名称、字段名称、字段类型、唯一键
* 目的地表结构默认显示:数据源表结构经过Schema Mapping后的表结构。
* 支持对目的地的字段类型、标度、精度、主键以及Not Null的编辑与修改。
:-: 
* 精度、标度允许为空,空值代表为目的地默认值。
* 精度、标度只允许输入整数,否则提示:请输入正整数(Oracle数据库Number字段类型的标度允许输入负整数)
* 主键字段不允许为空值,允许设置多个主键字段去重,Not Null的要求一直打钩,不允许被勾掉。
* 若用户经设置后目的地表没有主键,提示:请设置主键字段,主键字段可去重目的地重复数据。
* 目的地存在主键后,该提示消失
* 未添加主键的表,不允许用户保存
* 无法保存,同步列表显示「未完成」
* 提示: 请设置主键字段,主键可去除目的地重复数据
* 唯一键(目的地为hive、kafka、FTP数据目的地时)
* 点击每个字段对应的“唯一键”部分,可点亮与抹除小钥匙,可点亮多个,1个或0个
* 点亮多个,则按默认规则组合唯一键去重;点亮1个,选中字段直接用于唯一键去重
* 指定多个唯一键时,后端自动生成唯一键虚拟值用以去重筛选。
* 当目的地未勾选任意字段为唯一键:
* 无法保存,同步列表显示「未完成」
* 提示:请设置唯一键字段或添加\_id作为唯一键,可去除目的地重复数据
:-: 
* 支持修改目的地字段类型(包括主键)
* 要求根据目的地数据库类型显示该表在目的地的真实字段类型。当目的地为文件系统(FTP、Kafka)时,目的地的字段类型显示为String。
* 字段类型可编辑标度、精度和Not Null(不需要设置标度或精度的字段若用户填写了信息时,可作为无效信息处理)、主键。Not Null默认状态要求数据源和目的地是一致的。
* 点击字段类型,显示下拉框并显示该数据目的地的所有字段类型。
* 当更换字段类型时,要求标度、精度、Not Null、主键信息不要被清空,保持原有状态。
* 主键列表。
* 点击主键列表区域则可以设置为主键,再次点击可取消主键选择。
* 目的表可设置多个主键,但不允许没有主键。
* 数据任务激活后不允许手动修改表结构。
* 支持删除目的地字段
* 鼠标移动到目的地任意字段,字段最右侧显示删除按钮,鼠标移动到删除按钮,要求浮层提示【删除】。
* 点击【删除】字段,该字段一行为空,并显示【恢复字段】按钮。
* 删除字段后,用户激活数据任务,系统在创建目的地表时,不会创建该字段。
* 同步该表数据时,系统会忽略该字段数据,不会同步到目的地。
* 数据任务激活后,已删除字段区域为空,并隐藏【恢复字段】按钮。
* 数据源新增字段与用户在激活任务前删除的字段同名,要求在该表底部新增一个同名字段,不覆盖当前的已被操作删除的字段。
* 点击【恢复字段】,要求重新显示该字段
* 支持新增目的地字段
* 点击【添加字段】,要求在目的地表最后一行新增一个字段。
* 字段名称为空
* 字段类型为string
* 长度、精度为空
* Not Null 未勾选
* 新增字段的数据来源要求用户在【高级清洗】逻辑中添加。比如:
* 增加字段【InsertTime】,该逻辑可在高级清洗逻辑中实现。
* 合并字段A+字段B值,该逻辑可在高级清洗逻辑中实现。
* 新增字段不会与数据源表结构有关联标识
* 关于自增_id需求变化
* 当数据源表没有主键字段,或数据源为文件系统,目的地为SQL类型(MySQL、Oracle、SQL Server、Redshift)
* 目的地表结构首行默认添加_id字段作为主键。
* 若用户经过设置后目的地表不存在主键字段,提示:请设置主键字段,或添加_id作为主键。_id作为主键可去重目的地重复数据。
* 点击【添加_id】,在目的地表结构首行添加【_id】字段,并标记为主键,如果目的地已有【_id】字段,则添加为【 _ _ id】,依次类推。
* 允许修改字段名称、字段类型、长度、精度,允许删除
* 主键字段要求不允许勾掉Not Null,并且不允许关闭【同步】按钮。
:-: 
* 点击「收起、展开数据源表结构」快速查看数据源表和目的地表详情。
* 收起数据源表结构后,可以针对每个字段进行清洗过滤:
* 提供清洗功能:修改字段类型、标度、精度、Not Null、主键、同步、字段过滤、数据过滤、数据替换。
* 用户可以设置一个甚至多个主键以达到数据清洗效果。
* 关闭字段同步后,该字段将不会被同步到数据目的地:
* 将保留数据源表结构,但该字段传空值。
* 过滤规则支持对数值的比较,
* 输入规则为「 字段名 > 值, 字段名 < 值, 字段名 == 值 」
* 替换规则支持四则运算的数值替换或者对字符串的替换;
* 数值替换的输入规则为「 字段名 +/-/×/÷ 值 」,
* 字符串替换的输入规则为「 s|需要替换的字符|替换后的字符|, s|需要替换的字符|替换后的字符| 」, 若有多条替换规则请用逗号分隔。
* 替换规则支持输入正则表达式。
* 设置清洗规则后,平台只会向数据目的地同步符合用户设置的数据内容。
* 若同时输入过滤规则和替换规则,两个规则都将被系统应用于过滤。
* 规则的不同应用顺序的可能会导致最终同步结果与期望值不一致。
* 主键不允许进行数据清洗。
* 支持设置读取条件
* 支持对每个数据表设置读取条件;
* 输入 where 语句来添加数据读取条件,请按照各数据源 SQL 语法要求输入,此读取条件暂不适用于实时模式下增量数据:
`例「MySQL」:where price > 100 and sku <> 0 `
* 设置后任务将以该条件从数据源读取数据。
* where条件句只作用于一个任务,各个任务之间相互独立。
:-: 
* 当数据任务的数据源的读取模式为定时模式时,将出现「增量识别」选项,用户可勾选是否把字段作为识别增量字段。
* 增量识别字段用于同步增量数据。
* 每次轮询以大于等于上一次轮询结果的最后一行此字段值作为查询条件,查询结果判断为增量数据。
* 选择的字段必须为可排序,例如数字或时间,推荐的字段一般为随数据更新而自增的字段,如:
* 更新序列号 (例:SequenceID)
* 更新时间戳 (例: UpdatedAt)
* 若所选字段不符合以上推荐条件,可能会造成数据源更新的数据无法被识别为增量数据或导致任务报错。
* 增量数据同步时,必须要求勾选增量识别字段。
:-: 
* 如仍没有勾选增量识别字段,点击保存按钮,提示未完成界面。
:-: 
## 4.如何使用清洗脚本?
* 点击「清洗脚本」可以打开或关闭清洗脚本功能。
* 默认为关闭状态,点击后将高亮开关,并进入新窗口进行清洗脚本编辑;
* 上方输入框支持用户输入或粘贴清洗逻辑,默认已经提供了基本的代码结构;
* 目前支持Java语言进行清洗逻辑的编写;
* 支持用户在脚本库调用脚本,不用每次都重新输入
* 左下角显示「脚本库」按钮,点击按钮,显示脚本库的脚本列表
* 点击选择需要使用的脚本,选择后脚本显示在编辑界面上,可修改
* 脚本库显示默认三个脚本
* DML.java
* 提示内容: DML字段标识脚本:源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete
* CollectTime.java
* 提示内容: 读取时间脚本:根据数据读取时间,在目的地表中增加相应的时间字段
* UpdateTime.java
* 提示内容: 写入时间脚本:根据数据写入时间,在目的地表中增加相应的时间字段
* 点击“使用此脚本”,当前脚本对表进行配置
* 支持查看样例数据,默认情况下系统将自动获取随机1个数据源的样例数据,您可选择获取随机10个或随机100个样例,点击按钮即可刷新获取。
* 若样例数据为空,系统将返回一对中括号。
* 支持对清洗逻辑进行试运行:
* 试运行的结果将实时显示,有变化的内容将以如图形式呈现。
* 若输入的清洗逻辑有误,运行结果窗口将报错,请您查看错误详细内容重试。
:-: 
* 脚本编辑界面右上角显示关闭按钮,用户设置完毕后点击关闭按钮退出
* 用户退出清洗脚本后,可以通过编辑按钮再次进入。
* 点击编辑按钮,进入清洗脚本界面,点击返回返回到配置页,与当前逻辑一致。
:-: 
* 关闭清洗脚本,编辑脚本入口按钮不可编辑,鼠标放在上面浮层提示:请开启清洗脚本
* 激活前后,清洗脚本开启和编辑按钮均可编辑
* 以下为当数据源为MySQL、Oracle、SQL Server时的高级清洗脚本的例子:
MySQL
~~~
import java.util.Map;
public class Zl_clean_test_multiTransformEngine {
public Map<String, Object> process(Map<String, Object> record) {
//字段sex 源端类型为char(6)
//如果字段sex值为null或者为empty(即长度为0),则被过滤掉
if(record.get("sex").equals(null) || ((String)record.get("sex")).length()==0)
return null;
String sex = (String)record.get("sex");
//字段sex的值不为male的被过滤掉
if(!sex.equals("male"))
return null;
//字段desc 源端类型为text
//如果字段desc值为null或者为empty(即长度为0),则被过滤掉
if(record.get("desc").equals(null) || ((String)record.get("desc")).length()==0)
return null;
String desc = (String)record.get("desc");
//字段desc的值不以s开头的被过滤掉
if(desc.charAt(0)!='s')
return null;
//字段tall 源端类型为float
float tall = (float)record.get("tall");
//字段tall的值小于100的被过滤掉
if(tall < 100)
return null;
//字段age 源端类型为int
int age = (int)record.get("age");
//字段age的值小于20的,进行+2操作
if(age < 20) {
record.put("age", age+2);
}
//字段salary 源端类型为double(10, 2)
double salary = (double)record.get("salary");
//字段salary的值大于10000的,进行/100操作
if(salary > 10000) {
record.put("salary", salary/100d);
}
//字段married 源端类型为bit(1)类型
boolean married = (boolean)record.get("married");
//字段married的值如果为1,将salary进行+500的操作;否则进行*2的操作
if(married == true) {
record.put("salary", salary+500);
} else {
record.put("salary", salary*2);
}
//目的地新增字段city,类型为varchar,为字段附值为“beijing”
record.put("city", "beijing");
return record;
}
}
~~~
Oracle
~~~
import java.util.Map;
import java.math.BigDecimal;
public class DP_STAG_testCleanTransformEngine {
public Map<String, Object> process(Map<String, Object> record) {
//字段testBody 源端为varchar2类型
if(record.get("testBody").equals(null) || ((String)record.get("testBody")).length()==0)
return null;
String testBody = (String)record.get("testBody");
//字段testBody,若不以s开头,则被过滤掉
if(testBody.charAt(0)!='s') {
return null;
}
//字段sex 源端为varchar2类型
if(record.get("sex").equals(null) || ((String)record.get("sex")).length()==0)
return null;
String sex = (String)record.get("sex");
// //只保留字段sex的值为male的
if(!sex.equals("male"))
return null;
//字段salary 源端为Number
//测试发现,若源端为Number类型,且precision为1~36之间,则此处获取的类型均为BigDecimal
long salary = ((BigDecimal)record.get("salary")).longValue();
//若字段salary的值大于100,则过滤掉
if(salary > 100) {
return null;
}
//将字段salary的值乘以100
BigDecimal newSalary = BigDecimal.valueOf(salary * 100);
record.put("salary", newSalary);
//字段circle 源端类型为Float
//测试发现源端为Float(4)和Float(8),此处获取的类型均为Double
double circle = (double)record.get("circle");
//若字段circle的值大于200,则减去100
if(circle > 200d) {
record.put("circle", circle-100);
}
return record;
}
}
~~~
SqlServer
~~~
import java.util.Map;
import java.math.BigDecimal;
public class Zl_clean_test_multiTransformEngine {
public Map<String, Object> process(Map<String, Object> record) {
//字段sex 源端类型为char(6)
if(record.get("sex").equals(null) || ((String)record.get("sex")).length()==0)
return null;
String sex = ((String)record.get("sex")).trim();
//字段sex的值不为male的被过滤掉
if(sex.equals("male"))
return null;
//字段desc 源端类型为text
if(record.get("desc").equals(null) || ((String)record.get("desc")).length()==0)
return null;
String desc = (String)record.get("desc");
//字段desc的值不以s开头的被过滤掉
if(desc.charAt(0)=='s')
return null;
//字段tall 源端类型为float
Double tall = (Double)record.get("tall");
//字段tall的值小于100的被过滤掉
if(tall < 100d)
return null;
//字段age 源端类型为int
int age = (int)record.get("age");
//字段age的值小于20的,进行+2操作
if(age < 20) {
record.put("age", age+2);
}
//字段salary 源端类型为money(10, 2)
double salary = ((BigDecimal)record.get("salary")).doubleValue();
//字段salary的值大于10000的,进行/100操作
if(salary > 10000) {
BigDecimal b1 = BigDecimal.valueOf(salary/100);
record.put("salary", b1);
}
//字段married 源端类型为bit(1)类型
boolean married = (boolean)record.get("married");
//字段married的值如果为1,将age进行+5的操作;否则进行-2的操作
if(married == true) {
record.put("age", age+5);
} else {
record.put("age", age-2);
}
return record;
}
}
~~~
Postgres
~~~
import java.util.Map;
import java.math.BigDecimal;
public class Public_zl_clean_testTransformEngine {
public Map<String, Object> process(Map<String, Object> record) {
//字段sex 源端类型为char(6)
//如果字段sex值为null或者为empty(即长度为0),则被过滤掉
if(record.get("sex").equals(null) || ((String)record.get("sex")).length()==0)
return null;
String sex = ((String)record.get("sex")).trim();
//字段sex的值不为male的被过滤掉
if(!sex.equals("male"))
return null;
//字段desc 源端类型为text
//如果字段desc值为null或者为empty(即长度为0),则被过滤掉
if(record.get("des").equals(null) || ((String)record.get("des")).length()==0)
return null;
String desc = (String)record.get("des");
//字段desc的值不以s开头的被过滤掉
if(desc.charAt(0)!='s')
return null;
//字段tall 源端类型为float4
float tall = (float)record.get("tall");
//字段tall的值小于100的被过滤掉
if(tall < 100f)
return null;
if(tall > 170f && tall < 180f) {
record.put("tall", tall+2);
}
//字段age 源端类型为int
int age = (int)record.get("age");
//字段age的值小于20的,进行+2操作
if(age < 20) {
record.put("age", age+2);
}
//字段salary 源端类型为float8
double salary = (double)record.get("salary");
//字段salary的值大于10000的,进行+100操作
if(salary > 10000) {
record.put("salary", salary+100);
}
//字段married 源端类型为bit(1)类型
boolean married = (boolean)record.get("married");
//字段married的值如果为1,将age进行+5的操作;否则进行-2的操作
if(married == true) {
record.put("age", age+5);
} else {
record.put("age", age-2);
}
return record;
}
}
~~~
* 点击返回就能够回到配置规则页面。
:-: 
`注:目前基本清洗功能无法和清洗脚本同步使用,开启清洗脚本后,基本清洗功能将被禁用(覆盖)。`
## 5.高级设置
* batch设置【批量写入】
* 开启和关闭按钮
* 关闭时,表示按照子任务设置batch进行批量写入
* 点击开启时
* 设置开启后对当前界面的表或文件生效,读取数据达到设置条件后,进行批量同步写入。优先级高于全局子任务设置,启用时优先采用。
* 条数和时间为必填项,不允许为空
* 大小为选填项,允许为空
* 条数类型显示“读取满____条时批量写入”,输入框数值默认为5000,允许用户修改,输入可以为空。
* 大小类型显示,“读取满____KB/MB时批量写入”,KB和MB单位可在下拉栏切换,用户可选填,允许修改,可以为空。
* 时间类型显示“读取满____秒/分时批量写入”,默认为30秒,秒和分可在下拉栏切换,允许修改,输入可以为空。
:-: 
* 当目的地是Hive时,增加是时间分区设置,来进行数据清洗。
* 使用新增或指定已有字段来设置extractor,让数据在HIVE目的地分区写入,通过定义字段格式format和分区时间间隔来自定义分区标准。
* 用户点击开关按钮可以设置开启或者关闭分区
* 时间分区timestamp.extractor为三个选项:写入时间分区(默认)、读取时间分区、自定义时间分区
* 提示文案:
* 写入时间分区:系统写入数据到目的地的时间作为分区值
* 读取时间分区:系统读取数据的时间作为分区值
* 自定义时间分区:系统根据用户指定字段(要求字段类型为Timestamp)作为分区值
* 选择写入时间分区
* path.format。
* 提示文案:指写入目的地时间分区格式,如'year'=YYYY,表示目的地按年份分区进行写入
* 默认显示一个字段名称输入框和格式输入框(例:'year' =YYYY),右侧一致显示【添加】
* 输入框默认最小长度为4个字符,要求根据文案伸缩输入框,最大字符长度为20个字符。
* 最多添加5个时间分区字段名称。
* 所有输入框不能为空
* 时间间隔。
* 默认为1天(单位可选择:年、月、天、小时、分钟)
* 不能为空
:-: 
* 选择读取时间分区
* path.format。
* 默认显示一个字段名称输入框和格式输入框(例:'year' =YYYY),右侧一致显示【添加】
* 输入框默认最小长度为4个字符,要求根据文案伸缩输入框,最大字符长度为20个字符。
* 最多添加5个时间分区字段名称。
* 所有输入框不能为空
* 时间间隔。
* 默认为1天(单位可选择:年、月、天、小时、分钟)
* 不能为空,否则【保存】按钮置灰,并且提示:不能为空
:-: 
* 选择自定义时间分区
* 分区字段名称。
* 输入框提示内容:请输入分区字段名称
* 不能为空
* path.format。
* 默认显示一个字段名称输入框和格式输入框(例:'year' =YYYY),右侧一致显示【添加】
* 输入框默认最小长度为4个字符,要求根据文案伸缩输入框,最大字符长度为20个字符。
* 最多添加5个时间分区字段名称。
* 所有输入框不能为空
* 时间间隔。
* 默认为1天(单位可选择:年、月、天、小时、分钟)
* 不能为空
:-: 
* 只允许添加一个时间分区。
* 保存成功后停留在该弹窗,右上角提示保存成功。
* 任务激活后,不允许高级设置不允许修改
**常见问题:**
### Q1:如果自定义目的地表名时创建了多个同名表会发生什么?
当您激活任务后,系统将尝试把数据源多张表的数据传输到同一个表中,此时:
* 若这些表的表结构一致,则多张表能够成功合并为同一张表;
* 若该表与目的地同名表的表结构不一致,根据目的地表结构设置修改目的地原有的同步表。
</br></br></br>
* * * * *
</br></br></br>
### Q2:关于字段名称修改需要注意什么?
Oracle中数据表的字段名称可能同时含有大小写,当选择的目的地为其他不支持同时含大小写字段名数据表的数据库(例如 Mysql )时,若您使用默认的映射关系,不修改目的地表字段名称,数据目的地将把表字段名称中的大小写视作一致;
* 例如:当您选择了 Oracle 中的一个含字段 Aa 、 aA 的数据表作为需要同步的表,数据目的地选择了 Mysql ,如果未修改对应的目的地字段名称,则 Mysql 目的地将把这两个字段视作同名字段进行传输,可能会导致问题。
- DataPipeline产品手册
- 产品主要界面介绍
- 创建-数据任务
- 数据源
- 配置MySQL数据源
- BINLOG配置手册
- 配置Oracle数据源
- LOGMINER配置手册
- 配置SQL Server数据源
- Change Tracking配置手册
- 配置FTP数据源
- 配置Couchbase数据源
- 配置PostgreSQL数据源
- 配置S3数据源
- 配置API数据源
- 配置Kafka数据源
- 数据目的地
- 配置Redshift数据目的地
- 配置Oracle数据目的地
- 配置MySQL数据目的地
- 配置Kafka数据目的地
- 配置SQL Server数据目的地
- 配置FTP数据目的地
- 配置TIDB数据目的地
- 配置Greenplum数据目的地
- 配置Hive数据目的地
- Hive技术实现
- Hive目前支持情况
- 配置HybridDB for PostgreSQL
- 任务设置
- 选择同步内容
- 数据清洗
- 激活数据任务
- 管理-数据任务
- 数据任务列表
- 数据任务详情页
- 基本信息
- 概览信息
- 任务设置
- 错误队列
- 配置规则
- 消息列表
- 文件同步
- 文件源
- 配置S3文件源
- 配置FTP文件源
- 文件目的地
- 配置hdfs文件目的地
- 任务设置
- 激活文件任务
- 管理-文件任务
- 文件任务详情页
- 基本信息
- 概览信息
- 任务设置
- 消息列表
- 元数据管理
- 用户权限管理
- 系统设置
- 产品更新日志
- 常见问题
