马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
本帖最后由 168主编 于 2018-12-21 17:54 编辑
说明
之前说过,关于flume和kafka的实践操作就不单独拿出来讲了,打算用一个案例将两者结合,同流式计算一起在具体案例中讲述。
本篇博文案例背景:用户访问系统会留下一条条访问记录(除了姓名和身份证号外,还包括ip地址、登陆地点、设备等一系列详情信息),在数据库中我们有一份黑名单用户数据(姓名和身份证号),现在业务需求是我们需要实时监测访问系统的黑名单用户,将访问信息实时写入数据库中,并在前台展示。
此项目可用于公安系统通缉,比如犯罪嫌疑人在网吧登录、在宾馆登记等等都会留下访问记录(对接公安系统),只要检测到黑名单人员相关信息,便会实时反馈到公安系统,以便实时处理。
技术实现方案
flume的sink类型指定为kafka类型,采集指定日志目录送至kafka,slipstream的流表对接kafka消费数据。在stream job业务逻辑中编写黑名单比对业务流程,此后流任务启动起来后,每出现一条黑名单人员的访问记录,它便会被实时监测到,并写入结果表中记录下来。
Flume对接Kafka配置
解压flume,编写 flume2kafka.conf 配置文件:
[AppleScript] 纯文本查看 复制代码 tar -zxvf apache-flume-1.7.0-bin.tar.gz
cd apache-flume-1.7.0-bin/conf/
vi flume2kafka.conf
添加如下配置:
[AppleScript] 纯文本查看 复制代码 flume2kafka.sources=r3
flume2kafka.sinks=k3
flume2kafka.channels=c3
# 配置source
flume2kafka.sources.r3.type = spooldir
flume2kafka.sources.r3.spoolDir = /home/jbw/log/
flume2kafka.sources.r3.fileHeader = true
# 配置channel,将buffer事件放在内存中
flume2kafka.channels.c3.type = memory
flume2kafka.channels.c3.capacity = 10000
flume2kafka.channels.c3.transactionCapacity = 1000
# 配置sink
flume2kafka.sinks.k3.type=org.apache.flume.sink.kafka.KafkaSink
flume2kafka.sinks.k3.brokerList=172.18.xxx.xxx:9092,172.18.xxx.xxx:9092,172.18.xxx.xxx:9092
flume2kafka.sinks.k3.topic=peopleVisitTopic
flume2kafka.sinks.k3.serializer.class=kafka.serializer.StringEncoder
flume2kafka.sinks.k3.requiredAcks = 1
flume2kafka.sinks.k3.batchSize = 20
# 把source和sink绑定在channel上
flume2kafka.sources.r3.channels=c3
flume2kafka.sinks.k3.channel=c3
创建Kafka Topic[AppleScript] 纯文本查看 复制代码 cd kafka/bin
# 开启一个kafka的Topic
./kafka-topics.sh --create --topic people_visit_topic --zookeeper 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181 --partitions 1 --replication-factor 2
可以看到名为peopleVisitTopic的topic创建成功,此topic即为flume端将采集到的数据送入的topic,也是stream消费的topic。 (需要注意,kafka的topic命名规范指出,名称中不能含有下划线"_"或者点".",以避免名称冲突) ./kafka-topics.sh --list --zookeeper 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181 黑名单数据导入数据库,以便后续比对
将黑名单数据上传至HDFS,用于后续导入至HBase中进行后续比对。 [AppleScript] 纯文本查看 复制代码 hadoop fs -mkdir -p /tmp/data_bowen/blacklist
hadoop fs -put people_visit.log /tmp/data_bowen/blacklist
看一下blacklist黑名单数据格式: 根据blacklist黑名单数据格式创建外表,指定HDFS上数据文件路径: [AppleScript] 纯文本查看 复制代码 CREATE EXTERNAL TABLE blacklists (
name string,
sex string,
nation string,
id string)
row format delimited fields terminated by ','
LOCATION '/tmp/data_bowen/blacklist';
创建HBase表,并从外表中导入黑名单人员数据。(此黑名单表用于与实时流表作比对,若发现当前访问的人员在黑名单列表里,则会将其插入到最终结果表中) [AppleScript] 纯文本查看 复制代码 CREATE TABLE hyper_blacklist(
name string,
sex string,
nation string,
id string)
STORED AS HYPERDRIVE;
INSERT INTO hyper_blacklist SELECT * FROM blacklists;
创建流表(从kafka中实时获取用户访问数据)在此之前,我们先看下用户访问日志记录数据格式,可以发现黑名单用户表只包含了访问日志记录中的部分信息:
创建流表与kafka对接,flume采集访问记录送到kafka中,在创建并开启流任务后,kafka中来的数据会被传入SlipStream中预先定义好的流表中。如下即为流表的定义。创建并开启流任务的操作在下一步讲述: [AppleScript] 纯文本查看 复制代码 CREATE STREAM people_visit_stream
(name string
, sex string
, nation string
, id string
, birthdate string
, phone string
, email string
, address string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic" = "peopleVisitTopic"
, "kafka.zookeeper" = "172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181"
, "kafka.broker.list" = "172.18.xxx.xxx:9092,172.18.xxx.xxx:9092, 172.18.xxx.xxx:9092");
创建结果表结果表用于存放实时监测到的黑名单人员访问记录: [AppleScript] 纯文本查看 复制代码 CREATE TABLE blacklst_visit_result (
name string,
sex string,
nation string,
id string,
phone string,
address string)
STORED AS HYPERDRIVE; 创建并开启流任务[AppleScript] 纯文本查看 复制代码 CREATE STREAMJOB monitor_blacklist AS
("INSERT INTO blacklst_visit_result SELECT a.name,a.sex,a.nation,a.id,a.phone,a.address FROM people_visit_stream a JOIN hyper_blacklist b ON a.id == b.id");
开启流任务Job,查看运行中的StreamJob,如下:
START STREAMJOB monitor_blacklist;
LIST STREAMJOBS;
此StreamJob所做的任务即将从kafka中得到的访问数据,与黑名单数据进行比对,若发其存在于黑名单中,则将此条访问记录插入到结果表中。 准备日志文件,模拟访问数据(待采集的数据)开启flume客户端,开始监控指定日志目录 / [AppleScript] 纯文本查看 复制代码 # 进入flume目录
cd /home/apache-flume-1.7.0-bin
# 运行flume
bin/flume-ng agent -n flume2kafka -c conf -f /home/apache-flume-1.7.0-bin/conf/flume2kafka.conf
-Dflume.root.logger=INFO,console
往日志目录放people_visit.log文件,模拟真实访问数据。 mkdir -p /home/jbw/log cp people_visit.log jbw/log/
可以看到瞬间采集完毕: 查询结果表中数据如下:
以上显示的便是黑名单人员访问的记录了,当然我们还可以加上时间字段等其他信息,更精准地描述黑名单人员的访问记录。
|