最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

基于OGG 实现Oracle到Kafka增量数据实时同步

[复制链接]
跳转到指定楼层
楼主
发表于 2021-12-23 16:10:28 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2021-12-24 12:11 编辑

简介: 在大数据时代,存在大量基于数据的业务。数据需要在不同的系统之间流动、整合。通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统的数仓通过批量数据同步的方式,定期从OLTP系统中抽取数据。
背景
在大数据时代,存在大量基于数据的业务。数据需要在不同的系统之间流动、整合。通常,核心业务系统的数据存在OLTP数据库系统中,其它业务系统需要获取OLTP系统中的数据。传统的数仓通过批量数据同步的方式,定期从OLTP系统中抽取数据。但是随着业务需求的升级,批量同步无论从实时性,还是对在线OLTP系统的抽取压力,都无法满足要求。需要实时从OLTP系统中获取数据变更,实时同步到下游业务系统。
本文基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。
Kafka是一种高效的消息队列实现,通过订阅kafka的消息队列,下游系统可以实时获取在线Oracle系统的数据变更情况,实现业务系统。
环境介绍
组件版本

整体架构图

名词解释
1.OGG Manager
OGG Manager用于配置和管理其它OGG组件,配置数据抽取、数据推送、数据复制,启动和停止相关组件,查看相关组件的运行情况。
2.数据抽取(Extract)
抽取源端数据库的变更(DML, DDL)。数据抽取主要分如下几种类型:
本地抽取
从本地数据库捕获增量变更数据,写入到本地Trail文件
数据推送(Data Pump)
从本地Trail文件读取数据,推送到目标端。
初始数据抽取
从数据库表中导出全量数据,用于初次数据加载
3.数据推送(Data Pump)
Data Pump是一种特殊的数据抽取(Extract)类型,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG
4.Trail文件
数据抽取从源端数据库抓取到的事物变更信息会写入到Trail文件。
5.数据接收(Collector)
数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。
6.数据复制(Replicat)
数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。
7.检查点(Checkpoint)
检查点用于记录数据库事物变更。

操作步骤
源端Oracle配置
1.检查归档
使用OGG,需要在源端开启归档日志
[AppleScript] 纯文本查看 复制代码
QL> archive log list;
 
    Database log mode              Archive Mode
 
    Automatic archival             Enabled
 
    Archive destination            /u01/app/oracle/product/12.2.0/db_1/dbs/arch
 
    Oldest online log sequence     2576
 
    Next log sequence to archive   2577
 
    Current log sequence           2577

2.检查数据库配置
[AppleScript] 纯文本查看 复制代码
SQL> select force_logging, supplemental_log_data_min from v$database;
  
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
  
---------- ------------------------
  
YES        YES

如果没有开启辅助日志,需要开启:
[AppleScript] 纯文本查看 复制代码
SQL> alter database force logging;[/size][/font][/color][/align][color=#000][font=微软雅黑][size=2]SQL> alter database add supplemental log data;


3.开启goldengate复制参数
[AppleScript] 纯文本查看 复制代码
 alter system set enable_goldengate_replication = true;
4.创建源端Oracle账号
[AppleScript] 纯文本查看 复制代码
SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
 
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
 
SQL> grant dba to ggsadmin;
5.创建测试表   
[AppleScript] 纯文本查看 复制代码
SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;[/align] 
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
 
SQL> select count(*) from baiyang.ora_to_kfk;
    COUNT(*)
 
----------
 
        436
源端OGG配置
1.检查源端OGG环境
[AppleScript] 纯文本查看 复制代码
cd /oradata/oggorcl/ogg
 
./ggsci
 
GGSCI (dtproxy) 1> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     STOPPED
2.创建相关文件夹   
[AppleScript] 纯文本查看 复制代码
GGSCI (dtproxy) 2> create subdirs
 
    Creating subdirectories under current directory /oradata/oggorcl/ogg
 
   
    Parameter file                 /oradata/oggorcl/ogg/dirprm: created.
 
    Report file                    /oradata/oggorcl/ogg/dirrpt: created.
 
    Checkpoint file                /oradata/oggorcl/ogg/dirchk: created.
 
    Process status files           /oradata/oggorcl/ogg/dirpcs: created.
 
    SQL script files               /oradata/oggorcl/ogg/dirsql: created.
 
    Database definitions files     /oradata/oggorcl/ogg/dirdef: created.
 
    Extract data files             /oradata/oggorcl/ogg/dirdat: created.
 
    Temporary files                /oradata/oggorcl/ogg/dirtmp: created.
 
    Credential store files         /oradata/oggorcl/ogg/dircrd: created.
 
    Masterkey wallet files         /oradata/oggorcl/ogg/dirwlt: created.
 
    Dump files                     /oradata/oggorcl/ogg/dirdmp: created
3.配置源端Manager
[AppleScript] 纯文本查看 复制代码
GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
 
    Successfully logged into database.
 
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals

添加
[AppleScript] 纯文本查看 复制代码
oggschema ggsadmin
 
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr

添加
[AppleScript] 纯文本查看 复制代码
PORT 7810 --默认监听端口
 
DYNAMICPORTLIST  7811-7820 --动态端口列表
 
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --进程有问题,每3分钟重启一次,一共重启五次
 
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7  --*/
 
LAGREPORTHOURS 1 --每隔一小时检查一次传输延迟情况
 
LAGINFOMINUTES 30 --传输延时超过30分钟将写入错误日志
 
LAGCRITICALMINUTES 45 --传输延时超过45分钟将写入警告日志
 
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 --定期清理trail文件
 
ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW --设定172网段可连接
添加同步的表
[AppleScript] 纯文本查看 复制代码
GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
 
Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.
 
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
 
Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239
目标端OGG配置
1.目标端检查环境
[AppleScript] 纯文本查看 复制代码
GGSCI (172-16-101-242) 1> info all[/align] 
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
    MANAGER     STOPPED 
2.创建目录   
[AppleScript] 纯文本查看 复制代码
GGSCI (172-16-101-242) 2> create subdirs
 
    Creating subdirectories under current directory /app/ogg
 
    Parameter file                 /app/ogg/dirprm: created.
 
    Report file                    /app/ogg/dirrpt: created.
 
    Checkpoint file                /app/ogg/dirchk: created.
 
    Process status files           /app/ogg/dirpcs: created.
 
    SQL script files               /app/ogg/dirsql: created.
 
    Database definitions files     /app/ogg/dirdef: created.
 
    Extract data files             /app/ogg/dirdat: created.
 
    Temporary files                /app/ogg/dirtmp: created.
 
    Credential store files         /app/ogg/dircrd: created.
 
    Masterkey wallet files         /app/ogg/dirwlt: created.
 
Dump files                     /app/ogg/dirdmp: created.
3.目标端Manager配置
GGSCI (172-16-101-242) 3> edit params mgr

添加
[AppleScript] 纯文本查看 复制代码
PORT 7810
 
    DYNAMICPORTLIST 7811-7820
 
    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
 
    PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
    
    GGSCI (172-16-101-242) 4> edit  param  ./GLOBALS
CHECKPOINTTABLE ggsadmin.checkpoint
全量数据同步
1.配置源端数据初始化
配置源端初始化进程
GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable

配置源端初始化参数
GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk

添加
[AppleScript] 纯文本查看 复制代码
EXTRACT initkfk
    SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
    USERID ggsadmin,PASSWORD oracle
    RMTHOST 172.16.101.242, MGRPORT 7810
    RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;
  
2.源端生成表结构define文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk

添加
[AppleScript] 纯文本查看 复制代码
defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt
 
    userid ggsadmin,password oracle
 
    table baiyang.ora_to_kfk;

执行
[AppleScript] 纯文本查看 复制代码
$./defgen paramfile dirprm/define_kfk.prm
 
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt

将此文件传输到目标段dirdef文件夹
scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt

3.配置目标端数据初始化进程
配置目标端初始化进程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun GGSCI (172-16-101-242) 6> edit params initkfk

添加
[AppleScript] 纯文本查看 复制代码
SPECIALRUN
 
    end runtime
 
    setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
 
    targetdb libfile libggjava.so set property=./dirprm/kafka.props
 
    SOURCEDEFS ./dirdef/define_kfk.txt
 
    EXTFILE ./dirdat/ekfk000000
 
    reportcount every 1 minutes, rate
 
    grouptransops 10000
 
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
4.配置kafka相关参数
vi ./dirprm/kafka.props

添加
[AppleScript] 纯文本查看 复制代码
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/*  --*/
vi custom_kafka_producer.properties
添加
[AppleScript] 纯文本查看 复制代码
bootstrap.servers=172.16.101.242:9092
 
acks=1
 
compression.type=gzip
 
reconnect.backoff.ms=1000
 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 
batch.size=102400
 
linger.ms=10000


   
5.源端开启全量数据抽取
源端
GGSCI (dtproxy) 20>  start mgr GGSCI (dtproxy) 21>  start initkfk

6.目标端全量数据应用
GGSCI (172-16-101-242) 13> start mgr ./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

7.kafka数据验证
使用kafka客户端工具查看topic的数据
[AppleScript] 纯文本查看 复制代码
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}



全量数据已经同步到目标kafka topic
增量数据同步
1.源端抽取进程配置
GGSCI (dtproxy) 9> edit param extkfk

添加
[AppleScript] 纯文本查看 复制代码
dynamicresolution
 
SETENV (ORACLE_SID = "dtstack")
 
SETENV (NLS_LANG = "american_america.AL32UTF8")
 
userid ggsadmin,password oracle
 
exttrail ./dirdat/to
 
table baiyang.ora_to_kfk;



添加extract进程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now

添加trail文件的定义与extract进程绑定
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

2.源端数据推送进程配置
配置源端推送进程
GGSCI (dtproxy) 12> edit param pupkfk

添加
[AppleScript] 纯文本查看 复制代码
extract pupkfk
 
passthru
 
dynamicresolution
 
userid ggsadmin,password oracle
 
rmthost 172.16.101.242 mgrport 7810
 
rmttrail ./dirdat/to
 
table baiyang.ora_to_kfk;



添加extract进程
GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to

添加trail文件的定义与extract进程绑定
GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk

3.配置目标端恢复进程
配置目标端恢复进程
edit param repkfk

添加
[AppleScript] 纯文本查看 复制代码
REPLICAT repkfk
 
SOURCEDEFS ./dirdef/define_kfk.txt
 
targetdb libfile libggjava.so set property=./dirprm/kafka.props
 
REPORTCOUNT EVERY 1 MINUTES, RATE
 
GROUPTRANSOPS 10000
 
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;



添加trail文件到replicate进程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

4.源端开启实时数据抓取
[AppleScript] 纯文本查看 复制代码
./ggsci
 
GGSCI (dtproxy) 5> start extkfk
 
Sending START request to MANAGER ...
 
EXTRACT EXTKFK starting

GGSCI (dtproxy) 6> start pupkfk
 
Sending START request to MANAGER ...
 
EXTRACT PUPKFK starting
  
GGSCI (dtproxy) 7> status all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
 
EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10
 
EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00



5.目标端开启实时数据同步
[AppleScript] 纯文本查看 复制代码
./ggsci
 
GGSCI (172-16-101-242) 7> start replicat repkfk
 
Sending START request to MANAGER ...
 
REPLICAT REPKFK starting
  
GGSCI (172-16-101-242) 8> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
  
MANAGER     RUNNING
 
REPLICAT    RUNNING     REPKFK      00:00:00      00:00:00



6.测试增量数据同步
Oracle插入增量数据
[AppleScript] 纯文本查看 复制代码
SQL> insert into baiyang.ora_to_kfk  select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and  object_id < 1000;
 
SQL> commit;
 
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
 
----------
 
    905



查看Kafka消息队列消费数据
[AppleScript] 纯文本查看 复制代码
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}


源端Oracle删除数据
[AppleScript] 纯文本查看 复制代码
SQL> delete from baiyang.ora_to_kfk ;
 
906 rows deleted.
 
SQL> commit;


查看kafka消息队列消费数据
[AppleScript] 纯文本查看 复制代码
{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}


源端插入数据
SQL> insert into  baiyang.ora_to_kfk values('汉字', 'y1', 'z1', 111000,2000,'x1'); 1 row created. SQL> commit;

查看kafka消息队列消费数据
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"汉字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

总结
使用OGG可以方便地将Oracle的数据变更情况实时同步到Kafka消息队列。下游业务系统通过订阅kafka的消息队列,能方便地实现各类实时数据的应用。



楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-20 06:14

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表