最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Kafka] kafka与flume 的应用(实战)

[复制链接]
跳转到指定楼层
楼主
发表于 2018-8-16 15:57:28 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
版本号:RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1
1.flume安装RedHat6.5安装单机flume1.6:http://blog.leanote.com/post/2630794313@qq.com/26781d33b435

2.kafka安装RedHat6.5安装kafka集群 : http://blog.leanote.com/post/2630794313@qq.com/0230848f841a
3.Flume和Kafka整合在conf目录新建flume-kafka.conf文件:
  • touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
  • sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
输入以下内容:
  • # 指定Agent的组件名称  
  • agent1.sources = source1  
  • agent1.sinks = sink1  
  • agent1.channels = channel1  
  • # 指定Flume source(要监听的路径)  
  • agent1.sources.source1.type = spooldir  
  • agent1.sources.source1.spoolDir = /usr/local/flume/logtest
  • # 指定Flume sink  
  • #agent1.sinks.sink1.type = logger  
  • agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink  
  • agent1.sinks.sink1.topic = test  
  • agent1.sinks.sink1.brokerList = 192.168.168.200:9092  
  • agent1.sinks.sink1.requiredAcks = 1  
  • agent1.sinks.sink1.batchSize = 100   
  • # 指定Flume channel  
  • agent1.channels.channel1.type = memory  
  • agent1.channels.channel1.capacity = 1000  
  • agent1.channels.channel1.transactionCapacity = 100  
  • # 绑定source和sink到channel上  
  • agent1.sources.source1.channels = channel1  
  • agent1.sinks.sink1.channel = channel1  

agent1.sinks.sink1.topic = test   代表flume监听路径下发生变化时,会把消息发送到localhost机器上的test主题。

启动flume-kafka.conf:

  • cd /usr/local/flume/apache-flume-1.6.0-bin
  • bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name agent1 -Dflume.root.logger=INFO,console

运行成功日志如下:

  • 2017-07-07 22:22:02,270 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
  • 2017-07-07 22:22:02,270 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: sink1 started

启动kafka的消费者,监听topic主题:

  • kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

testKafka.log :

在/usr/local/flume目录下面新建一个testKafka.log日志文件,写入Flume connect Kafka success! 作为测试内容:
  • touch /usr/local/flume/testKafka.log
  • sudo gedit /usr/local/flume/testKafka.log


然后拷贝testKafka.log到flume监听路径/usr/local/flume/logtest下:
  • cp /usr/local/flume/testKafka.log /usr/local/flume/logtest
接着就可以在前一个终端看到刚刚采集的内容了,如下:
---------------------------------kafka------------------------------
  • [root@master kafka_2.11-0.9.0.0]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test  
  • [2017-07-07 22:36:38,687] INFO [Group Metadata Manager on Broker 200]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
  • Flume connect Kafka success!

---------------------------------flume------------------------------

  • 2017-07-07 22:41:32,602 (pool-3-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /usr/local/flume/logtest/testKafka.log to /usr/local/flume/logtest/testKafka.log.COMPLETED
  • 2017-07-07 22:41:35,669 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test)
  • 2017-07-07 22:41:35,728 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to localhost:9092 for producing
  • 2017-07-07 22:41:35,757 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from localhost:9092
  • 2017-07-07 22:41:35,791 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to slave2:9092 for producing

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-17 05:56

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表