最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

[复制链接]
跳转到指定楼层
楼主
发表于 2017-7-4 15:13:46 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
搭建前提:
hadoop2.6、Spark1.6-hadoop-2.6集群都是正确搭建并可运行


一 、需求描述
日志文件预处理:运营商数据  kafka做队列缓冲  flume分发  streaming计算  HDFS存储


二、 系统搭建
No.1  flume-ng 1.6集群

1.下载安装并配置好flume的运行环境

2.编写配置文件
# ------------------- 定义数据流----------------------
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel


#-------- kafkaSource相关配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
agent.sources.kafkaSource.topic = T20161031
#agent.sources.kafkaSource.groupId = flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000






#------- memoryChannel相关配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000


#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream


3.启动Flume
bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console


No.2 kafka 2.10-0.10


1.下载安装并配置kafka集群


2.创建topic
bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181 --replication-factor 5 --partition 5 --topic T20161031


3.开启生产者线程并向T20161031写入数据
bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161031


4.开启消费者线程查看消息是否写入当前topic
bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181 --topic T20161031 --from-beginning


5.关闭当前消费者线程


No.3 sparkStreaming 1.6.2


1. 下载jar包 (这里注意当前Hadoop集群的版本,我的是2.6.X)
scala-library.jar
spark-assembly-1.6.2-hadoop2.6.0.jar


2.编写程序
package com.unisk.spark.sparkStreaming;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;


public class StreamingOnHDFS {


public static void main(String[] args){
final SparkConf conf = new
SparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;
Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒
final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";


JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory, conf) ;
}
} ;
JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;
JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays. asList(line.split(" ")) ;
}
}) ;
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,
Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1) ;
}
}) ;
JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,
Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}) ;
wordscount.print();

jsc.start() ;
jsc.awaitTermination() ;
jsc.close() ;
}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){
System. out.println("==========Creating new context==============") ;
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;
ssc.checkpoint(checkpointDirectory) ;
return ssc;
}
}

3. 打包(打成runnable jar)并上传


4. 开启spark1.6集群


5. 进入spark安装目录
bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar


6. 简单的wordcount程序,print方法将结果打印出来,若需要将结果存储至其他位置如redis等,可使用foreachRDD方法


楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-16 07:52

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表