最具影响力的数字化技术在线社区

168主编 发表于 2022-8-31 10:02:25

基于OGG 实现Oracle到Kafka增量数据实时同步

本帖最后由 168主编 于 2022-8-31 10:05 编辑

背景在大数据时代,存在大量基于数据的业务。数据需要在不同的系统之间流动、整合。通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统的数仓通过批量数据同步的方式,定期从OLTP系统中抽取数据。但是随着业务需求的升级,批量同步无论从实时性,还是对在线OLTP系统的抽取压力,都无法满足要求。需要实时从OLTP系统中获取数据变更,实时同步到下游业务系统。本文基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。Kafka是一种高效的消息队列实现,通过订阅kafka的消息队列,下游系统可以实时获取在线Oracle系统的数据变更情况,实现业务系统。环境介绍组件版本https://pic3.zhimg.com/80/v2-00a874708519549c378a2668bf05a366_720w.jpg整体架构图https://pic2.zhimg.com/80/v2-6012af88ed6ba495f8ad3cb3a934345d_720w.jpg名词解释1.OGG ManagerOGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。2.数据抽取(Extract)抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下几种类型:本地抽取从本地数据库捕获增量变更数据,写入到本地Trail文件数据推送(Data Pump)从本地Trail文件读取数据,推送到目标端。初始数据抽取从数据库表中导出全量数据,用于初次数据加载3.数据推送(Data Pump)Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG4.Trail文件数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。5.数据接收(Collector)数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。6.数据复制(Replicat)数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。7.检查点(Checkpoint)检查点用于记录数据库事物变更。操作步骤源端Oracle配置1.检查归档使用OGG,需要在源端开启归档日志SQL> archive log list;Database log mode Archive ModeAutomatic archival EnabledArchive destination /u01/app/oracle/product/12.2.0/db_1/dbs/archOldest online log sequence 2576Next log sequence to archive 2577Current log sequence 25772.检查数据库配置SQL> select force_logging, supplemental_log_data_min from v$database;FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI---------- ------------------------YES YES如果没有开启辅助日志,需要开启:SQL> alter database force logging;SQL> alter database add supplemental log data;3.开启goldengate复制参数SQL> alter system set enable_goldengate_replication = true;4.创建源端Oracle账号SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;SQL> grant dba to ggsadmin;5.创建测试表SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);SQL> select count(*) from baiyang.ora_to_kfk;COUNT(*)----------436源端OGG配置1.检查源端OGG环境cd /oradata/oggorcl/ogg./ggsciGGSCI (dtproxy) 1> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER STOPPED2.创建相关文件夹GGSCI (dtproxy) 2> create subdirsCreating subdirectories under current directory /oradata/oggorcl/oggParameter file /oradata/oggorcl/ogg/dirprm: created.Report file /oradata/oggorcl/ogg/dirrpt: created.Checkpoint file /oradata/oggorcl/ogg/dirchk: created.Process status files /oradata/oggorcl/ogg/dirpcs: created.SQL script files /oradata/oggorcl/ogg/dirsql: created.Database definitions files /oradata/oggorcl/ogg/dirdef: created.Extract data files /oradata/oggorcl/ogg/dirdat: created.Temporary files /oradata/oggorcl/ogg/dirtmp: created.Credential store files /oradata/oggorcl/ogg/dircrd: created.Masterkey wallet files /oradata/oggorcl/ogg/dirwlt: created.Dump files /oradata/oggorcl/ogg/dirdmp: created.3.配置源端ManagerGGSCI (dtproxy) 4> dblogin userid ggsadmin password oracleSuccessfully logged into database.GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals-- 添加oggschema ggsadminGGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr-- 添加PORT 7810 --默认监听端口DYNAMICPORTLIST 7811-7820 --动态端口列表AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --进程有问题,每3分钟重启一次,一共重启五次PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7 --*/LAGREPORTHOURS 1 --每隔一小时检查一次传输延迟情况LAGINFOMINUTES 30 --传输延时超过30分钟将写入错误日志LAGCRITICALMINUTES 45 --传输延时超过45分钟将写入警告日志PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --设定172网段可连接-- 添加同步的表GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk-- Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk-- Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239目标端OGG配置1.目标端检查环境GGSCI (172-16-101-242) 1> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER STOPPED2.创建目录GGSCI (172-16-101-242) 2> create subdirsCreating subdirectories under current directory /app/oggParameter file /app/ogg/dirprm: created.Report file /app/ogg/dirrpt: created.Checkpoint file /app/ogg/dirchk: created.Process status files /app/ogg/dirpcs: created.SQL script files /app/ogg/dirsql: created.Database definitions files /app/ogg/dirdef: created.Extract data files /app/ogg/dirdat: created.Temporary files /app/ogg/dirtmp: created.Credential store files /app/ogg/dircrd: created.Masterkey wallet files /app/ogg/dirwlt: created.Dump files /app/ogg/dirdmp: created.3.目标端Manager配置GGSCI (172-16-101-242) 3> edit params mgr-- 添加PORT 7810DYNAMICPORTLIST 7811-7820AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3GGSCI (172-16-101-242) 4> edit param ./GLOBALSCHECKPOINTTABLE ggsadmin.checkpoint全量数据同步
1.配置源端数据初始化
-- 配置源端初始化进程GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable-- 配置源端初始化参数GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk-- 添加EXTRACT initkfkSETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)USERID ggsadmin,PASSWORD oracleRMTHOST 172.16.101.242, MGRPORT 7810RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500table baiyang.ora_to_kfk;2.源端生成表结构define文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk-- 添加defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txtuserid ggsadmin,password oracletable baiyang.ora_to_kfk;-- 执行$./defgen paramfile dirprm/define_kfk.prm-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt-- 将此文件传输到目标段dirdef文件夹scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt3.配置目标端数据初始化进程
-- 配置目标端初始化进程GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrunGGSCI (172-16-101-242) 6> edit params initkfk-- 添加SPECIALRUNend runtimesetenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")targetdb libfile libggjava.so set property=./dirprm/kafka.propsSOURCEDEFS ./dirdef/define_kfk.txtEXTFILE ./dirdat/ekfk000000reportcount every 1 minutes, rategrouptransops 10000map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;4.配置kafka相关参数
-- 配置kafka 相关参数vi ./dirprm/kafka.props-- 添加gg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.propertiesgg.handler.kafkahandler.topicMappingTemplate=test_ogggg.handler.kafkahandler.format=jsongg.handler.kafkahandler.mode=opgg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/* --*/vi custom_kafka_producer.properties-- 添加bootstrap.servers=172.16.101.242:9092acks=1compression.type=gzipreconnect.backoff.ms=1000value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializerbatch.size=102400linger.ms=100005.源端开启全量数据抽取
-- 源端GGSCI (dtproxy) 20> start mgrGGSCI (dtproxy) 21> start initkfk6.目标端全量数据应用
GGSCI (172-16-101-242) 13> start mgr./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD7.kafka数据验证
使用kafka客户端工具查看topic的数据bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}全量数据已经同步到目标kafka topic增量数据同步
1.源端抽取进程配置
GGSCI (dtproxy) 9> edit param extkfk-- 添加dynamicresolutionSETENV (ORACLE_SID = "dtstack")SETENV (NLS_LANG = "american_america.AL32UTF8")userid ggsadmin,password oracleexttrail ./dirdat/totable baiyang.ora_to_kfk;-- 添加extract进程GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now-- 添加trail文件的定义与extract进程绑定GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk2.源端数据推送进程配置
-- 配置源端推送进程GGSCI (dtproxy) 12> edit param pupkfk-- 添加extract pupkfkpassthrudynamicresolutionuserid ggsadmin,password oraclermthost 172.16.101.242 mgrport 7810rmttrail ./dirdat/totable baiyang.ora_to_kfk;-- 添加extract进程GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to-- 添加trail文件的定义与extract进程绑定GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk3.配置目标端恢复进程
-- 配置目标端恢复进程edit param repkfk-- 添加REPLICAT repkfkSOURCEDEFS ./dirdef/define_kfk.txttargetdb libfile libggjava.so set property=./dirprm/kafka.propsREPORTCOUNT EVERY 1 MINUTES, RATEGROUPTRANSOPS 10000MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;--添加trail文件到replicate进程add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint4.源端开启实时数据抓取
./ggsciGGSCI (dtproxy) 5> start extkfkSending START request to MANAGER ...EXTRACT EXTKFK startingGGSCI (dtproxy) 6> start pupkfkSending START request to MANAGER ...EXTRACT PUPKFK startingGGSCI (dtproxy) 7> status allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER RUNNINGEXTRACT RUNNING EXTKFK 00:00:00 00:00:10EXTRACT RUNNING PUPKFK 00:00:00 00:00:005.目标端开启实时数据同步
./ggsciGGSCI (172-16-101-242) 7> start replicat repkfkSending START request to MANAGER ...REPLICAT REPKFK startingGGSCI (172-16-101-242) 8> info allProgram Status Group Lag at Chkpt Time Since ChkptMANAGER RUNNINGREPLICAT RUNNING REPKFK 00:00:00 00:00:006.测试增量数据同步
Oracle插入增量数据SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;SQL> commit;SQL> select count(*) from baiyang.ora_to_kfk;COUNT(*)----------905查看Kafka消息队列消费数据{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}源端Oracle删除数据SQL> delete from baiyang.ora_to_kfk ;906 rows deleted.SQL> commit;查看kafka消息队列消费数据{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}源端插入数据SQL> insert into baiyang.ora_to_kfk values('汉字', 'y1', 'z1', 111000,2000,'x1');1 row created.SQL> commit;查看kafka消息队列消费数据{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"汉字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}总结使用OGG可以方便地将Oracle的数据变更情况实时同步到Kafka消息队列。下游业务系统通过订阅kafka的消息队列,能方便地实现各类实时数据的应用。
页: [1]
查看完整版本: 基于OGG 实现Oracle到Kafka增量数据实时同步