Oracle GoldenGate 实时同步Oracle数据到Kafka安装及配置

简介

  • MySQL实时同步到Kafk可以采用canal, Oracle实时同步到Kafka可以采用OGG, 配置过程比canal略复杂
  • OGG即Oracle GoldenGate是Oracle的同步工具, 能够实现大量交易数据的实时捕捉、变换和投递,实现源数据库与目标数据库的数据同步,保持亚秒级的数据延迟。

相关进程

  • GoldenGate主要包含Manager进程、Extract进程、Pump进程、Replicat进程

OGG相关目录

  • dirchk:检查点文件,记录了该进程的检查点信息
  • dirdat:trail日志文件,存放收取接手的日志文件
  • dirdef:用来存放通过DEFGEN工具生成的源或目标端数据定义文件
  • dirpcs:用来存放进程状态文件
  • dirprm:用来存放参数文件,该进程所配置的参数(edit param 进程组名 就是配置该文件)
  • dirrpt:用来存放进程报告(report)文件,可以查看该进程运行时的报错信息等(view report 进程组名 就是看该文件)
  • dirsql:用来存放SQL脚本文件
  • dirtmp:当事物所需要的内存超过已分配内存时,缺省存在此目录

源端安装配置

  1. 下载解压
# 查询所需要的版本: https://edelivery.oracle.com/osdc/faces/SoftwareDelivery
# 下载 Oracle GoldenGate 11.2.1.0.3 for Oracle on Linux x86-64

tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg
# 使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功
chown -R oracle:oinstall /opt/ogg
  1. 配置
# /home/oracle/.bash_profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
export PATH=$OGG_HOME:$PATH

source .bash_profile

# 测试
ggsci
  1. oracle打开归档模式
sqlplus / as sysdba

# 查看当前是否为归档模式, Disabled为关闭
archive log list

# Database log mode	       No Archive Mode
# Automatic archival	       Disabled
# Archive destination	       USE_DB_RECOVERY_FILE_DEST
# Oldest online log sequence     18
# Current log sequence	       20

# 执行
conn / as sysdba  # 以DBA身份连接数据库
shutdown immediate # 立即关闭数据库
startup mount # 启动实例并加载数据库,但不打开
alter database archivelog; # 更改数据库为归档模式
alter database open; # 打开数据库
alter system archive log start; # 启用自动归档

# 查看
archive log list

# Database log mode	       Archive Mode
# Automatic archival	       Enabled
# Archive destination	       USE_DB_RECOVERY_FILE_DEST
# Oldest online log sequence     18
# Next log sequence to archive   20
# Current log sequence	       20
  1. Oracle打开日志相关: OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态
select force_logging, supplemental_log_data_min from v$database;

-- FORCE_ SUPPLEMENTAL_LOG
-- ------ ----------------
-- NO     NO

-- 修改命令
alter database force logging;
alter database add supplemental log data;
-- 再次查看
select force_logging, supplemental_log_data_min from v$database;
  1. Oracle 创建复制用户
# 首先root用户建立相关文件夹,并赋予权限
cd $ORACLE_BASE
mkdir -p oggdata/orcl
-- 执行sql
create tablespace oggtbs datafile '/ora/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;
create user ogg identified by ogg default tablespace oggtbs;
grant dba to ogg;
  1. OGG初始化
ggsci
# 在当前目录创建文件夹
create 
  1. Oracle创建测试表
create user test_ogg  identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));

目标端 (kafka) 配置

  1. 下载解压
# 查询所需要的版本: https://edelivery.oracle.com/osdc/faces/SoftwareDelivery
# Oracle GoldenGate for Big Data 12.3.0.1.0 on Linux x86-64
tar -xf ggs_Adapters_Linux_x64.tar -C /opt/ogg/
  1. 配置
# vim /etc/profile

export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH

# 测试
source /etc/profile
cd $OGG_HOME
ggsci
  1. 初始化目录
cd $OGG_HOME
create subdirs

OGG源端配置

  1. 配置OGG的全局变量
su oracle
cd $OGG_HOME
ggsci

# 配置用户
dblogin userid ogg password ogg
# 编辑
edit param ./globals
# 添加
oggschema ogg
  1. 配置管理器mgr
# 编辑
edit param mgr
# 添加
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

# PORT 即mgr的默认监听端口;
# DYNAMICPORTLIST 动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;
# AUTORESTART 重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;
# PURGEOLDEXTRACTS 即TRAIL文件的定期清理
  1. 添加复制表
add trandata test_ogg.test_ogg
info trandata test_ogg.test_ogg
  1. 配置extract进程
# 编辑
edit param extkafka
# 添加
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

# 第一行指定extract进程名称;
# dynamicresolution动态解析;
# SETENV设置环境变量,这里分别设置了Oracle数据库以及字符集;
# userid ggs,password ggs即OGG连接Oracle数据库的帐号密码,这里使用2.5中特意创建的复制帐号;
# exttrail定义trail文件的保存位置以及文件名,注意这里文件名只能是2个字母,其余部分OGG会补齐;table即复制表的表名,支持*通配,必须以 ";" 结尾

# 添加extract进程
add extract extkafka,tranlog,begin now

# 添加trail文件的定义与extract进程绑定
add exttrail /opt/ogg/dirdat/to,extract extkafka

# 启停命令
# stop extkafka
# start extkafka
  1. 配置pump进程: pump进程本质上来说也是一个extract,只不过他的作用仅仅是把trail文件传递到目标端,配置过程和extract进程类似,只是逻辑上称之为pump进程
# 编辑
edit param pukafka
# 添加
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost cdh01 mgrport 7809
rmttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

# 第一行指定extract进程名称;
# passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;
# dynamicresolution动态解析;
# userid ogg,password ogg即OGG连接Oracle数据库的帐号密码
# rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;
# rmttrail即目标端trail文件存储位置以及名称。

# 分别将本地trail文件和目标端的trail文件绑定到extract进程
add extract pukafka,exttrailsource /opt/ogg/dirdat/to
add rmttrail /opt/ogg/dirdat/to,extract pukafka
  1. 配置define文件: Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射
# 编辑
edit param test_ogg
# 添加
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

# 退出
quit

# 进行OGG主目录下执行以下命令
./defgen paramfile dirprm/test_ogg.prm

# 将生成的/opt/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里
scp -r /opt/ogg/dirdef/test_ogg.test_ogg root@cdh01:/opt/ogg/dirdef/

OGG目标端配置

  1. 开启kafka服务
  2. 配置管理器mgr
cd $OGG_HOME
ggsci
# 编辑
edit param mgr
# 添加
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
  1. 配置checkpoint: checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可
# 编辑
edit param ./GLOBALS
# 配置
CHECKPOINTTABLE test_ogg.checkpoint
  1. 配置replicate进程
# 编辑
edit param rekafka
# 添加
REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

# REPLICATE rekafka定义rep进程名称;
# sourcedefs即在4.6中在源服务器上做的表映射文件;
# TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;
# REPORTCOUNT即复制任务的报告生成频率;
# GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系
  1. 配置kafka.props
# cd /opt/ogg/dirprm/ && vim kafka.props

# handler类型
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
# Kafka生产者配置文件
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
# kafka的topic名称,无需手动创建
# gg.handler.kafkahandler.topicMappingTemplate=test_ogg(新版topicName属性的设置方式)
gg.handler.kafkahandler.topicName=test_ogg
# 传输文件的格式,支持json,xml等
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.insertOpKey = I  
gg.handler.kafkahandler.format.updateOpKey = U  
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.includePrimaryKeys=true
# OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
gg.handler.kafkahandler.mode=op
# 类路径
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/*:/opt/ogg/:/opt/ogg/lib/*


# vim custom_kafka_producer.properties
# kafkabroker的地址
bootstrap.servers=cdh01:9092,cdh02:9092,cdh03:9092
acks=1
# 压缩类型
compression.type=gzip
# 重连延时
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
  1. 添加trail文件到replicate进程
cd $OGG_HOME
ggsci
add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

测试

  1. 创建topic
kafka-topics --create --zookeeper cdh01:2181 --replication-factor 3 --partitions 3 --topic test_ogg
  1. 启动所有进程
# 在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。
# 启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。
# 全部需要在ogg目录下执行ggsci目录进入ogg命令行。
# 源端
start mgr
start extkafka
start pukafka
# 目标端
start mgr
start rekafka
  1. 查看: 通过info all 或者info [进程名] 查看状态,所有的进程都为RUNNING才算成功
# 源端
info all
# 目标端
info all

# 日志查看
less ggser.log
ggsci > view report rekafka
  1. 测试同步更新效果
    • 源端执行sql
    -- 源端执行sql
    conn test_ogg/test_ogg
    insert into test_ogg values(1,'test');
    commit;
    update test_ogg set name='zhangsan' where id=1;
    commit;
    delete test_ogg where id=1;
    commit;
    
    • 查看文件状态
    # 源端
    ls -l /opt/ogg/dirdat/to*
    # 目标端
    ls -l /opt/ogg/dirdat/to*
    
    • 查看kafka中数据
    kafka-console-consumer --bootstrap-server cdh01:9092,cdh02:9092,cdh03:9092 --topic test_ogg --from-beginning
    
    # {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-10-30 17:43:30.247674","current_ts":"2020-10-30T17:43:35.632000","pos":"00000000000000001061","primary_keys":["ID"],"after":{"ID":1,"NAME":"test"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-10-30 17:44:05.247770","current_ts":"2020-10-30T17:44:10.707000","pos":"00000000000000001201","primary_keys":["ID"],"after":{"ID":2,"NAME":"orcal"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-10-30 17:44:31.247814","current_ts":"2020-10-30T17:44:35.728000","pos":"00000000000000001343","primary_keys":["ID"],"after":{"ID":3,"NAME":"db"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-10-30 17:45:06.247722","current_ts":"2020-10-30T17:45:11.755000","pos":"00000000000000001480","primary_keys":["ID"],"after":{"ID":4,"NAME":"ok"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2020-10-30 17:45:06.247722","current_ts":"2020-10-30T17:45:11.757000","pos":"00000000000000001617","primary_keys":["ID"],"after":{"ID":5,"NAME":"sd"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2020-10-30 17:45:54.247822","current_ts":"2020-10-30T17:45:59.790000","pos":"00000000000000001871","primary_keys":["ID"],"before":{"ID":1,"NAME":"test"},"after":{"ID":1,"NAME":"oracle"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2020-10-30 17:50:37.247804","current_ts":"2020-10-30T17:50:42.948000","pos":"00000000000000001990","primary_keys":["ID"],"before":{"ID":4,"NAME":"ok"}}
    # {"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2020-10-30 17:50:37.247804","current_ts":"2020-10-30T17:50:42.949000","pos":"00000000000000002127","primary_keys":["ID"],"before":{"ID":2,"NAME":"orcal"}}
    

参考