最具影响力的数字化技术在线社区

168主编 发表于 2018-8-16 15:57:28

kafka与flume 的应用(实战)

版本号:RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1
1.flume安装RedHat6.5安装单机flume1.6:http://blog.leanote.com/post/2630794313@qq.com/26781d33b435

2.kafka安装RedHat6.5安装kafka集群 : http://blog.leanote.com/post/2630794313@qq.com/0230848f841a
3.Flume和Kafka整合在conf目录新建flume-kafka.conf文件:

[*]touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
[*]sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
输入以下内容:

[*]# 指定Agent的组件名称
[*]agent1.sources = source1
[*]agent1.sinks = sink1
[*]agent1.channels = channel1
[*]
[*]# 指定Flume source(要监听的路径)
[*]agent1.sources.source1.type = spooldir
[*]agent1.sources.source1.spoolDir = /usr/local/flume/logtest
[*]
[*]# 指定Flume sink
[*]#agent1.sinks.sink1.type = logger
[*]agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
[*]agent1.sinks.sink1.topic = test
[*]agent1.sinks.sink1.brokerList = 192.168.168.200:9092
[*]agent1.sinks.sink1.requiredAcks = 1
[*]agent1.sinks.sink1.batchSize = 100   
[*]
[*]# 指定Flume channel
[*]agent1.channels.channel1.type = memory
[*]agent1.channels.channel1.capacity = 1000
[*]agent1.channels.channel1.transactionCapacity = 100
[*]
[*]# 绑定source和sink到channel上
[*]agent1.sources.source1.channels = channel1
[*]agent1.sinks.sink1.channel = channel1
agent1.sinks.sink1.topic = test   代表flume监听路径下发生变化时,会把消息发送到localhost机器上的test主题。启动flume-kafka.conf:

[*]cd /usr/local/flume/apache-flume-1.6.0-bin
[*]bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name agent1 -Dflume.root.logger=INFO,console
运行成功日志如下:
[*]2017-07-07 22:22:02,270 (lifecycleSupervisor-1-2) Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean.
[*]2017-07-07 22:22:02,270 (lifecycleSupervisor-1-2) Component type: SINK, name: sink1 started
启动kafka的消费者,监听topic主题:
[*]kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
https://leanote.com/api/file/getImage?fileId=595f9a6bab64414eb90017c9testKafka.log :在/usr/local/flume目录下面新建一个testKafka.log日志文件,写入Flume connect Kafka success! 作为测试内容:

[*]touch /usr/local/flume/testKafka.log
[*]sudo gedit /usr/local/flume/testKafka.log
https://leanote.com/api/file/getImage?fileId=595f9ce8ab64414c88001812

然后拷贝testKafka.log到flume监听路径/usr/local/flume/logtest下:

[*]cp /usr/local/flume/testKafka.log /usr/local/flume/logtest
接着就可以在前一个终端看到刚刚采集的内容了,如下:
---------------------------------kafka------------------------------

[*]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
[*] INFO : Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[*]Flume connect Kafka success!
---------------------------------flume------------------------------

[*]2017-07-07 22:41:32,602 (pool-3-thread-1) Preparing to move file /usr/local/flume/logtest/testKafka.log to /usr/local/flume/logtest/testKafka.log.COMPLETED
[*]2017-07-07 22:41:35,669 (SinkRunner-PollingRunner-DefaultSinkProcessor) Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test)
[*]2017-07-07 22:41:35,728 (SinkRunner-PollingRunner-DefaultSinkProcessor) Connected to localhost:9092 for producing
[*]2017-07-07 22:41:35,757 (SinkRunner-PollingRunner-DefaultSinkProcessor) Disconnecting from localhost:9092
[*]2017-07-07 22:41:35,791 (SinkRunner-PollingRunner-DefaultSinkProcessor) Connected to slave2:9092 for producing

页: [1]
查看完整版本: kafka与flume 的应用(实战)