💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] ***** # **Oracle 数据库表备份** 这次我们将一个 Oracle 数据库实例上的 LOGIN 表的数据导入到一个 Kafka 主题上,再将它同步到另一个 Oracle 数据库实例上的 LOGIN 表。 <br > ## **准备 Oracle 数据库实例** 1. 设置环境变量: ``` $ export CLASSPATH=.:/usr/local/share/java/kafka-connect-jdbc/ojdbc6.jar ``` 2. 使用 Docker 快速安装并运行 3 个 Oracle 11g 的实例,用户名/初始密码:`SYSTEM`/`oracle`。 ``` $ docker run --name oracle11g -d -p 1521:1521 -p 8080:8080 -e ORACLE_ALLOW_REMOTE=true oracleinanutshell/oracle-xe-11g $ docker run --name oracle11g-1 -d -p 11521:1521 -p 18080:8080 -e ORACLE_ALLOW_REMOTE=true oracleinanutshell/oracle-xe-11g ``` > 修改密码: > $ docker exec -it oracle11g bash > $ sqlplus > Enter user-name: system > Enter password: oracle > SQL> alter user system identified by admin123; > SQL> exit; > $ exit 在每个实例下创建 TEST 用户和 LOGIN 表: ``` -- USER SQL CREATE USER C##TEST IDENTIFIED BY "test1234" DEFAULT TABLESPACE "USERS" TEMPORARY TABLESPACE "TEMP"; GRANT "DBA" TO C##TEST ; GRANT "CONNECT" TO C##TEST ; GRANT "RESOURCE" TO C##TEST ; -- TABLES CREATE TABLE C##TEST.LOGIN ( ID NUMBER NOT NULL , USERNAME VARCHAR2(30) NOT NULL , LOGIN_TIME DATE NOT NULL , CONSTRAINT LOGIN_PK PRIMARY KEY (ID) ENABLE ); CREATE SEQUENCE C##TEST.SEQ_LOGIN CACHE 20; CREATE INDEX C##TEST.IDX_LOGIN_LT ON C##TEST.LOGIN (LOGIN_TIME DESC); ``` <br > ## **安装 JDBC Connector 插件** 参见 [JDBC Connector](JDBCConnector.md) <br > ## **创建 Source/Sink Connector** 1. 创建 JDBC Source Connector: ``` $ echo '{"name":"oracle-jdbc-source","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max":"1","dialect.name":"OracleDatabaseDialect","connection.url":"jdbc:oracle:thin:@localhost:1521:xe","connection.user":"TEST","connection.password":"test1234","mode":"timestamp","table.whitelist":"LOGIN","validate.non.null":false,"timestamp.column.name":"LOGIN_TIME","topic.prefix":"oracle.test."}}' | curl -X POST -d @- http://localhost:8083/connectors --header "Content-Type:application/json" ``` > **mode**:表加载模式,支持的模式有 `bulk`,每次轮询时都要对整个表进行批量加载;`incrementing`,在每个表上使用递增列以仅检测新增的行。请注意,这不会检测到对现有行的修改或删除;`timestamp`,使用时间戳(或类似时间戳的列)来检测新行和修改过的行。 假设每次写入都会更新该列,并且值是单调递增的,不需要保证唯一。`timestamp+incrementing`,使用两列,用于检测新行和已修改行的时间戳列,以及用于为更新提供全局唯一ID的递增列,以便可以为每一行分配唯一的偏移量。 > **validate.non.null**:默认情况下,JDBC 连接器将验证所有增量表和时间戳表是否将 ID 和时间戳列设置为 NOT NULL。 如果没有,则 JDBC 连接器将无法启动。 将此设置为 false 将禁用这些检查。 > [**incrementing.column.name**](http://incrementing.column.name):递增的列的名称,用于检测新行。 空值表示自动侦测递增列。 该列不能为空。 > [**timestamp.column.name**](http://timestamp.column.name):用一个或多个时间戳列的逗号分隔列表,使用 COALESCE SQL 函数检测新行或修改过的行。每次轮询都会发现第一个非空时间戳值大于看到的最大先前时间戳值的行。 至少一列不能为空。 ``` ``` 2. 创建 JDBC Sink Connector: ``` $ echo '{"name":"oracle-jdbc-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","topics":"oracle.test.LOGIN","tasks.max":"1","connection.url":"jdbc:oracle:thin:@localhost:11521:xe","connection.user":"TEST","connection.password":"test1234","auto.create":"false","insert.mode":"insert","table.name.format":"LOGIN"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "Content-Type:application/json" ``` > **insert.mode**:要使用的插入模式,支持的模式有,`insert`、`upsert`、`update`。 > **pk.mode**:主键模式, 支持的模式有 `none`、`kafka`、`record_key`、`record_value`。 ``` ``` 3. 列出所有活动的 Connector: ``` $ curl localhost:8083/connectors ["oracle-jdbc-source","oracle-jdbc-sink"] ``` <br > ## **测试** 1. 通过 SQL 脚本在数据库实例 1 上新增测试数据: ``` DECLARE i NUMBER; BEGIN i:=0; LOOP i:=i+1; INSERT INTO TEST.LOGIN(ID, USERNAME, LOGIN_TIME) VALUES (TEST.SEQ_LOGIN.NEXTVAL,to_char(systimestamp, 'yyyymmddhh24miss'), SYSDATE); IF i > 100000 THEN EXIT; END IF; END LOOP; END; ``` <br > 1. 在数据库实例 2 上检查数据是否已同步 ``` select count(1) from test.login; select * from test.login order by login_time desc,id desc; ``` <br > 1. SQL 监控分析: ``` SELECT ADDRESS,HASH_VALUE,SQL_ID, SQL_FULLTEXT,PARSE_CALLS,EXECUTIONS, APPLICATION_WAIT_TIME,CONCURRENCY_WAIT_TIME/1000000 CONCURRENCY_WAIT_TIME,CLUSTER_WAIT_TIME,USER_IO_WAIT_TIME, OPTIMIZER_COST, CPU_TIME/1000000 CPU_TIME, ELAPSED_TIME/1000000 ELAPSED_TIME, DECODE(EXECUTIONS,0,0,ROUND(CPU_TIME/1000000/EXECUTIONS,5)) AV_CPU_TIME, DECODE(EXECUTIONS,0,0,ROUND(ELAPSED_TIME/1000000/EXECUTIONS,5)) AV_ELAPSED_TIME, SHARABLE_MEM,PERSISTENT_MEM,RUNTIME_MEM, DISK_READS,DIRECT_WRITES,BUFFER_GETS, ROWS_PROCESSED, FIRST_LOAD_TIME, TO_CHAR(LAST_ACTIVE_TIME,'YYYY-MM-DD/HH24:MI:SS') LAST_ACTIVE_TIME FROM V$SQLAREA WHERE PARSING_SCHEMA_NAME='TEST' AND LAST_ACTIVE_TIME >=SYSDATE-1/24 AND SQL_FULLTEXT NOT LIKE'%sys.%' ORDER BY EXECUTIONS DESC; ``` <br > ## **清理测试环境** 1. 删除连接器: ``` $ curl -X DELETE http://localhost:8083/connectors/oracle-jdbc-source $ curl -X DELETE http://localhost:8083/connectors/oracle-jdbc-sink ``` <br > 1. 清空数据表: ``` TRUNCATE TABLE LOGIN; ``` <br > 1. 删除主题 oracle.test.LOGIN