# Flink-CDC同步Oracle全量+增量数据
各资源版本
依赖 | 版本 |
---|---|
Oracle | Oracle11g |
Flink-CDC | 2.2.1 |
Flink | 1.13.0 |
JDK | 8 |
# Docker安装Oracle11g
准备工作,由于我这边用来搭建demo的机器就一台,且资源有限,选用了docker安装oracle11g,linux、windows的安装过于复杂。 磁盘空间有限,先清空了一些磁盘空间(oracle需6个G的磁盘)。
清空docker overlay2、container的磁盘 container里面主要是一些日志,占用了很大的磁盘空间,可以根据container的md5直接查看到是具体那个容器使用的,如果不需要直接停止删除掉容器即可。
1.安装oracle11g,安装时间大概需要几分钟到十分钟。
docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
2.启动oracle11g
docker run -d -p 1521:1521 --name oracle11g registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
3.进入oracle容器
docker exec -it oracle11g bash
# Oracle操作
1.进入oracle容器后,切换root用户
su root
# 输入密码:helowin
2.编辑profile文件配置ORACLE环境变量
vi /etc/profile
# 在文件最后写上下面内容
export ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2
export ORACLE_SID=helowin
export PATH=$ORACLE_HOME/bin:$PATH
# 保存后source /etc/profile
3.创建软链
ln -s $ORACLE_HOME/bin/sqlplus /usr/bin
4.切换回oracle用户
su - oracle
# 新建数据目录
mkdir /home/oracle/oracle-data
# 登录sqlplus
sqlplus /nolog
# 使用dba权限
SQL> conn /as sysdba
5.oracle具体操作,归档、添加补充日志、创建用户等
# 开启日志归档
SQL> alter system set db_recovery_file_dest_size = 5G;
SQL> alter system set db_recovery_file_dest = '/home/oracle/oracle-data' scope=spfile;
SQL> shutdown immediate;
SQL> startup mount;
SQL> alter database archivelog;
SQL> alter database open;
# 查看日志归档是否开启
SQL> archive log list;
# 开启补全日志
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
# 查看是否开启补全日志
SQL> select supplemental_log_data_min min from v$database;
# 创建表空间
CREATE TABLESPACE sinfor DATAFILE '/home/oracle/sinfor.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
# 创建用户
CREATE USER sinfor IDENTIFIED BY sinfor DEFAULT TABLESPACE SINFOR;
# 给用户授权
grant connect,resource,dba to sinfor;
6.使用DBeaver连接数据库并建表,服务名/SID是helowin,用户名密码为刚建的用户
CREATE TABLE "SINFOR"."sinfor_user"
(
"ID" VARCHAR2(20),
"NAME" VARCHAR2(100),
"AGE" NUMBER(3,0),
"STATUS" NUMBER(1,0),
"CREATE_TIME" DATE,
"UPDATE_TIME" DATE,
PRIMARY KEY ("ID")
)
7.要同步的表也加上补全日志
alter table "sinfor_user" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# 查看所有补全日志
select * from dba_log_groups;
# 程序开发
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sinfor</groupId>
<artifactId>sinfor-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--Flink 默认使用的是 slf4j 记录日志,相当于一个日志的接口,我们这里使用 log4j 作为
具体的日志实现-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
用单表的方式,但是这种方式有局限性,需要每张表都作为一个任务占用slot资源。且tableApi我好像没发现有sink功能。
package com.sinfor.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author fengwen
* @date 2023/4/12
* @description TODO
**/
public class OracleToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.disableOperatorChaining();
tableEnv.executeSql("CREATE TABLE sinfor_user (\n" +
" `ID` INT NOT NULL,\n" + // 注意字段名要大写
" `NAME` STRING,\n" +
" `AGE` STRING,\n" +
" `STATUS` INT,\n" +
" `CREATE_TIME` BIGINT,\n" +
" `UPDATE_TIME` BIGINT,\n" +
" PRIMARY KEY(`ID`) NOT ENFORCED\n" + // 要加上NOT ENFORCED
" ) WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
" 'hostname' = 'xx.xx.xx.xx',\n" +
" 'port' = '1521',\n" +
" 'username' = 'sinfor',\n" +
" 'password' = 'sinfor',\n" +
" 'database-name' = 'helowin',\n" + // 服务名/SID
" 'schema-name' = 'SINFOR',\n" + // 注意这里要大写
" 'table-name' = 'SINFOR_USER',\n" +
" 'debezium.log.mining.continuous.mine'='true',\n"+
" 'debezium.log.mining.strategy'='online_catalog',\n" +
" 'debezium.database.tablename.case.insensitive'='false',\n"+
" 'scan.startup.mode' = 'initial')");
TableResult tableResult = tableEnv.executeSql("select * from sinfor_user");
tableResult.print();
env.execute();
}
}
用DataStream方式,这种方式坑有点多,实属不易。但是好在测试是成功的,也能sink到kafka
package com.sinfor.flink;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.oracle.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* @author fengwen
* @date 2023/4/17
* @description TODO
**/
public class Oracle2Kafka {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("debezium.database.tablename.case.insensitive", "false");
properties.setProperty("debezium.log.mining.strategy", "online_catalog");
properties.setProperty("debezium.log.mining.continuous.mine", "true");
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname("xxx.xx.xx.xxx")
.port(1521)
.database("helowin") // monitor XE database
.schemaList("SINFOR") // monitor inventory schema
.tableList("SINFOR.SINFOR_USER") // monitor products table
.username("sinfor")
.password("sinfor")
.startupOptions(StartupOptions.initial())
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> sourceStream = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
SingleOutputStreamOperator<String> streamOperator = sourceStream
.filter(StringUtils::isNotEmpty);
streamOperator.setParallelism(1).print();
//sink到kafka
Properties kafkaProp = new Properties();
kafkaProp.setProperty("bootstrap.servers", "xxx.xx.xx.xxx:9191");
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("sinfor",
new SimpleStringSchema(),
kafkaProp);
streamOperator.addSink(producer);
env.execute();
}
}
测试结果如下,全量及后续的改和删都能正常
WARNING
1.这里面或多或少会有坑在,因为flink-cdc版本一定要选2.1.0后,flink-cdc和flink的版本是有对应关系的。 同时如果包下载不了的话需要本地mvn安装下,但是我这个依赖应该都是能正常下载的。
2.oracle里的date类型到flink里是要用BIGINT去接收的。
3.一定要开启同步表的补全日志,不然会报错提示要你去执行ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS之类
4.找不到XXFactory,这个错误是依赖不对。
5.如果表名是user,user是关键字,要用``括起来,不过一般应该没人这么命名表吧...
6.数据更新延迟问题,要设置下面两个参数 'debezium.log.mining.strategy'='online_catalog', 'debezium.log.mining.continuous.mine'='true'
7.并行度只能为1
mvn 安装本地jar包相关命令
mvn install:install-file -Dfile=.\flink-sql-connector-oracle-cdc-2.1.1.jar -DgroupId=com.ververica -DartifactId=flink-connector-oracle-cdc -Dversion=2.1.1 -Dpackaging=jar
oracle日志爆炸问题
docker exec -it oracle11g bash
source /etc/profile
sqlplus /nolog
connect /as sysdba
# 查看日志大小
select * from V$FLASH_RECOVERY_AREA_USAGE;
# 查看archive存放位置
show parameter recover;
exit
rman target sys/pass
# 检查无用的archive
crosscheck archivelog all;
删除截止到前一天的所有archivelog:
delete archivelog until time 'sysdate-1';
delete archivelog until time 'sysdate';