[TOC] # 不同数据源的 DataX 读写插件示例 DataX 官网支持绝大部分主流数据源的读写插件,并且有详细的使用文档。 ## CSV 文件的读写插件 csv 文件就是文本文件,用 txtreader 和 txtwriter 读写。配置文件详细语法请参见[DataX官网说明](https://github.com/alibaba/DataX)。 **txtreader配置示例:** ~~~ "reader":{ "name":"txtfilereader", "parameter":{ "path":["文件全路径"], "encoding":"UTF-8", "column":[ { "index":0, "type":"long" } ,{ "index":1, "type":"long" } ,{ "index":2, "type":"string" } ,{ "index":3, "type":"double" } ,{ "index":4, "type":"string" } ], "fieldDelimiter":"||", "fileFormat":"text" } } ~~~ **txtwriter****配置示例:** ~~~ "writer":{ "name":"txtfilewriter", "parameter":{ "path":"文件全路径", "fileName":"文件名", "writeMode":"truncate", "dateFormat":"yyyy-MM-dd", "charset":"UTF-8", "nullFormat":"", "fileDelimiter":"||" } } ~~~ ## MySQL 数据库的读写插件 针对 MySQL 数据库,用 mysqlreader 和 mysqlwriter 插件读写。 **mysqlreader****配置示例:** ~~~ "reader": { "name": "mysqlreader", "parameter": { "username": "root", "password": "root", "column": [ "id", "name" ], "splitPk": "db_id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database" ] } ] } } ~~~ **mysqlwriter****配置示例:** ~~~ "writer": { "name": "mysqlwriter", "parameter": { "writeMode": "insert", "username": "root", "password": "root", "column": [ "id", "name" ], "session": [ "set session sql_mode='ANSI'" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk", "table": [ "test" ] } ] } } ~~~ ## Oracle 数据库的读写插件 针对 Oracle 数据库,用 oraclereader 和 oraclewriter 插件来读写。 **oraclereader****配置示例:** ~~~ "reader": { "name": "oraclereader", "parameter": { // 数据库连接用户名 "username": "root", // 数据库连接密码 "password": "root", "column": [ "id","name" ], //切分主键 "splitPk": "db_id", "connection": [ { "table": [ "table" ], "jdbcUrl": [ "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]" ] } ] } } ~~~ **oraclewriter****配置示例:** ~~~ "writer": { "name": "oraclewriter", "parameter": { "username": "root", "password": "root", "column": [ "id", "name" ], "preSql": [ "delete from test" ], "connection": [ { "jdbcUrl": "jdbc:oracle:thin:@[HOST_NAME]:PORT:[DATABASE_NAME]", "table": [ "test" ] } ] } } ~~~ ## DB2 数据库的读写插件 **db2reader****配置示例:** ~~~ "reader":{ "name":"db2reader", "parameter":{ "username":"SRC_DB_UESRNAME", "password":"SRC_DB_PASSWORD", "column":[ "SRC_COLUMN_LIST" ], "connection":[ { "table":[ "SRC_TABLE_NAME" ], "jdbcUrl":[ "jdbc:db2://SRC_DB_IP:SRC_DB_PORT/SRC_DB_NAME" ] } ] } } ~~~ **db2writer****配置示例:** ## OceanBase 数据库的读写插件 OceanBase 数据库使用插件 oceanbasev10reader 和 oceanbasev10writer 来读写。该插件由 OceanBase 产品团队单独提供。 * **oceanbasev10reader****配置示例** ~~~ "reader":{ "name":"oceanbasev10reader", "parameter":{ "where":"", "timeout":10000, "readBatchSize":100000, "readByPartition":"true", "column": [ “列名1”,”列名2” ], "connection":[ { "jdbcUrl":["||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口/模式名或数据库名"], "table":["表名"] } ], "username":"租户内用户名", "password":"密码" } } ~~~ 示例:OceanBase 表 ware 导出到 csv 文件 ~~~ [admin@*** /home/admin/datax3] $cat job/ob_tpcc_ware_2_csv.json { "job":{ "setting":{ "speed":{ "channel":10 }, "errorLimit":{ "record":0, "percentage": 0.02 } }, "content":[ { "reader":{ "name":"oceanbasev10reader", "parameter":{ "where":"", "timeout":10000, "readBatchSize":100000, "readByPartition":"true", "column": [ "W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP" ], "connection":[ { "jdbcUrl":["||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc"], "table":["ware"] } ], "username":"tpcc", "password":"123456" } }, "writer":{ "name":"txtfilewriter", "parameter":{ "path":"/home/admin/csvdata/", "fileName":"ware", "writeMode":"truncate", "dateFormat":"yyyy-MM-dd", "charset":"UTF-8", "nullFormat":"", "fileDelimiter":"||" } } } ] } } [admin@*** /home/admin/datax3] $bin/datax.py job/ob_tpcc_ware_2_csv.json ~~~ * **oceanbasev10writer****配置示例** 使用 DataX 向 OceanBase 里写入时,要避免写入速度过快导致 OceanBase 的增量内存耗尽。通常建议 DataX 配置文件里针对写入做一个写入限速设置。关键字是 memstoreThreshold : ~~~ "writer": { "name": "oceanbasev10writer", "parameter": { "username": "租户内的用户名", "password": "密码", "writeMode": "insert", "column": [ "列名1",“列名2” ], "preSql": [ "" ], "connection": [ { "jdbcUrl": "||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:oceanbase://连接IP:连接端口(默认2883)/模式名或数据库名", "table": [ "表名" ] } ], "batchSize": 1024, "memstoreThreshold": "90" } } ~~~ 示例:从 csv 文件导入到 OceanBase 表中 ~~~ [admin@*** /home/admin/datax3] $cat job/csv_2_ob_tpcc_ware2.json { "job":{ "setting":{ "speed":{ "channel":32 }, "errorLimit":{ "record":0, "percentage": 0.02 } }, "content":[ { "reader":{ "name":"txtfilereader", "parameter":{ "path":["/home/admin/csvdata/ware*"], "encoding":"UTF-8", "column":[ { "index":0, "type":"long" } ,{ "index":1, "type":"long" } ,{ "index":2, "type":"long" } ,{ "index":3, "type":"string" } ,{ "index":4, "type":"string" } ,{ "index":5, "type":"string" } ,{ "index":6, "type":"string" } ,{ "index":7, "type":"string" } ,{ "index":8, "type":"string" } ], "fieldDelimiter":",", "fileFormat":"text" } }, "writer":{ "name":"oceanbasev10writer", "parameter":{ "writeMode":"insert", "column":[ "W_ID","W_YTD","W_TAX","W_NAME","W_STREET_1","W_STREET_2","W_CITY","W_STATE","W_ZIP" ], "connection":[ { "jdbcUrl":"||_dsc_ob10_dsc_||obdemo:obbmsql||_dsc_ob10_dsc_||jdbc:oceanbase://127.1:2883/tpcc", "table":["WARE2"] } ], "username":"tpcc", "password":"123456", "batchSize":256, " memstoreThreshold":"90" } } } ] } } [admin@*** /home/admin/datax3] $bin/datax.py job/csv_2_ob_tpcc_ware2.json ~~~