最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Kafka] flume+kafka+slipstream实现黑名单用户访问实时监测

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-21 09:59:15 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2018-12-21 17:54 编辑

说明
之前说过,关于flume和kafka的实践操作就不单独拿出来讲了,打算用一个案例将两者结合,同流式计算一起在具体案例中讲述。

本篇博文案例背景:用户访问系统会留下一条条访问记录(除了姓名和身份证号外,还包括ip地址、登陆地点、设备等一系列详情信息),在数据库中我们有一份黑名单用户数据(姓名和身份证号),现在业务需求是我们需要实时监测访问系统的黑名单用户,将访问信息实时写入数据库中,并在前台展示。

此项目可用于公安系统通缉,比如犯罪嫌疑人在网吧登录、在宾馆登记等等都会留下访问记录(对接公安系统),只要检测到黑名单人员相关信息,便会实时反馈到公安系统,以便实时处理。


技术实现方案
flume的sink类型指定为kafka类型,采集指定日志目录送至kafka,slipstream的流表对接kafka消费数据。在stream job业务逻辑中编写黑名单比对业务流程,此后流任务启动起来后,每出现一条黑名单人员的访问记录,它便会被实时监测到,并写入结果表中记录下来。


Flume对接Kafka配置
解压flume,编写 flume2kafka.conf 配置文件:
[AppleScript] 纯文本查看 复制代码
tar -zxvf apache-flume-1.7.0-bin.tar.gz 
cd apache-flume-1.7.0-bin/conf/
vi flume2kafka.conf


添加如下配置:
[AppleScript] 纯文本查看 复制代码
flume2kafka.sources=r3
flume2kafka.sinks=k3
flume2kafka.channels=c3
 
# 配置source
flume2kafka.sources.r3.type = spooldir
flume2kafka.sources.r3.spoolDir = /home/jbw/log/
flume2kafka.sources.r3.fileHeader = true
 
# 配置channel,将buffer事件放在内存中
flume2kafka.channels.c3.type = memory
flume2kafka.channels.c3.capacity = 10000
flume2kafka.channels.c3.transactionCapacity = 1000
 
# 配置sink
flume2kafka.sinks.k3.type=org.apache.flume.sink.kafka.KafkaSink
flume2kafka.sinks.k3.brokerList=172.18.xxx.xxx:9092,172.18.xxx.xxx:9092,172.18.xxx.xxx:9092
flume2kafka.sinks.k3.topic=peopleVisitTopic
flume2kafka.sinks.k3.serializer.class=kafka.serializer.StringEncoder
flume2kafka.sinks.k3.requiredAcks = 1
flume2kafka.sinks.k3.batchSize = 20
 
# 把source和sink绑定在channel上
flume2kafka.sources.r3.channels=c3
flume2kafka.sinks.k3.channel=c3

创建Kafka Topic
[AppleScript] 纯文本查看 复制代码
cd kafka/bin
# 开启一个kafka的Topic
./kafka-topics.sh --create --topic people_visit_topic --zookeeper 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181 --partitions 1 --replication-factor 2



可以看到名为peopleVisitTopic的topic创建成功,此topic即为flume端将采集到的数据送入的topic,也是stream消费的topic。

(需要注意,kafka的topic命名规范指出,名称中不能含有下划线"_"或者点".",以避免名称冲突)

./kafka-topics.sh --list --zookeeper 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181

黑名单数据导入数据库,以便后续比对

将黑名单数据上传至HDFS,用于后续导入至HBase中进行后续比对。

[AppleScript] 纯文本查看 复制代码
hadoop fs -mkdir -p /tmp/data_bowen/blacklist
hadoop fs -put people_visit.log /tmp/data_bowen/blacklist

看一下blacklist黑名单数据格式:

根据blacklist黑名单数据格式创建外表,指定HDFS上数据文件路径:

[AppleScript] 纯文本查看 复制代码
CREATE EXTERNAL TABLE blacklists (
   name string,
   sex string,
   nation string,
   id string)
row format delimited fields terminated by ','
LOCATION '/tmp/data_bowen/blacklist';

创建HBase表,并从外表中导入黑名单人员数据。(此黑名单表用于与实时流表作比对,若发现当前访问的人员在黑名单列表里,则会将其插入到最终结果表中)

[AppleScript] 纯文本查看 复制代码
CREATE TABLE hyper_blacklist(
   name string,
   sex string,
   nation string,
   id string)
STORED AS HYPERDRIVE;
 
INSERT INTO hyper_blacklist SELECT * FROM blacklists;


创建流表(从kafka中实时获取用户访问数据)

在此之前,我们先看下用户访问日志记录数据格式,可以发现黑名单用户表只包含了访问日志记录中的部分信息:


创建流表与kafka对接,flume采集访问记录送到kafka中,在创建并开启流任务后,kafka中来的数据会被传入SlipStream中预先定义好的流表中。如下即为流表的定义。创建并开启流任务的操作在下一步讲述:

[AppleScript] 纯文本查看 复制代码
CREATE STREAM people_visit_stream
(name string
  , sex string
  , nation string
  , id string
  , birthdate string
  , phone string
  , email string
  , address string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES("topic" = "peopleVisitTopic"
              , "kafka.zookeeper" = "172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181, 172.18.xxx.xxx:2181"
              , "kafka.broker.list" = "172.18.xxx.xxx:9092,172.18.xxx.xxx:9092, 172.18.xxx.xxx:9092");



创建结果表

结果表用于存放实时监测到的黑名单人员访问记录:

[AppleScript] 纯文本查看 复制代码
CREATE TABLE blacklst_visit_result (
   name string,
   sex string,
   nation string,
   id string,
   phone string,
   address string)
STORED AS HYPERDRIVE;

创建并开启流任务
[AppleScript] 纯文本查看 复制代码
CREATE STREAMJOB monitor_blacklist AS
("INSERT INTO blacklst_visit_result SELECT a.name,a.sex,a.nation,a.id,a.phone,a.address FROM people_visit_stream a JOIN hyper_blacklist b ON a.id == b.id");



开启流任务Job,查看运行中的StreamJob,如下:
START STREAMJOB monitor_blacklist;
LIST STREAMJOBS;




此StreamJob所做的任务即将从kafka中得到的访问数据,与黑名单数据进行比对,若发其存在于黑名单中,则将此条访问记录插入到结果表中。

准备日志文件,模拟访问数据(待采集的数据)

开启flume客户端,开始监控指定日志目录 /

[AppleScript] 纯文本查看 复制代码
# 进入flume目录
cd /home/apache-flume-1.7.0-bin
# 运行flume
bin/flume-ng agent -n flume2kafka -c conf -f /home/apache-flume-1.7.0-bin/conf/flume2kafka.conf 
-Dflume.root.logger=INFO,console

往日志目录放people_visit.log文件,模拟真实访问数据。

mkdir -p /home/jbw/log

cp people_visit.log jbw/log/



可以看到瞬间采集完毕:

查询结果表中数据如下:


以上显示的便是黑名单人员访问的记录了,当然我们还可以加上时间字段等其他信息,更精准地描述黑名单人员的访问记录。
楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-29 23:49

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表