最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

数据平台实践①—Flume+Kafka+SparkStreaming(pyspark)

[复制链接]
跳转到指定楼层
楼主
发表于 2017-1-20 11:47:47 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
蜻蜓点水
Flume——数据采集
如果说,爬虫是采集外部数据的常用手段的话,那么,Flume就是采集内部数据的常用手段之一(logstash也是这方面的佼佼者)。
下面介绍一下Flume的基本构造。

  • Agent:包含Source、Channel和Sink的主体,它是这3个组件的载体,是组成Flume的数据节点。
  • Event:Flume 数据传输的基本单元。
  • Source: 用来接收Event,并将Event批量传给Channel。
  • Channel:Source和Sink之间的Event缓冲通道,它有个type属性,一般为memory,可以提高传输速度。
  • Sink:负责将数据沉淀到最终存储区,或沉淀给下一个source,形成数据流。

Flume


在大致了解了以上要素之后,通过上图,我们就可以有一个大概的认识。一句话讲,Source接收数据,并转成Event单元,然后导入Channel缓冲通道,最后,经由Sink进行数据沉淀。当然这里的沉淀,有多种选择,除了上图中的HDFS外,还包括HBase、File,或者作为另一个Source的源。在一系列过程,一条有序的数据流就诞生了。
Kafka——数据的发布/订阅
Kafka,作为基于发布/订阅的消息系统,以其分布式性而受到大家的喜爱。
下面介绍一下Kafka的基本构造。

  • Broker(代理): Kafka集群可由一个或多个服务器组成,其中的每个服务节点称作这个集群的一个Broker。
  • Topic(主题): 一个Topic对应一类消息,Topic用作为消息划分类别。
  • Partition(分区): 一个Topic一般含有多个分区。
  • Producer(生产者):消息生产者,负责生产Topic消息。
  • Consumer(消费者): 消息消费者,负责消费Topic消息。

    Kafka

Zookeeper——服务器间协调
这里需要提一下Zookeeper,对于Kafka这样的分布式服务,大多需要多台服务器相互协调工作,且保持一致性。任意一台服务器出现问题,如果不及时处理,都有可能导致整个服务的崩溃,其后果是不堪设想的。ZooKeeper的分布式设计,可用于领导人选举、群组协同工作和配置服务等,保证了服务的一致性和可用性。

Zookeeper

Spark Streaming——Spark核心API
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。它可以通过Kafka、HDFS、Flume等多种渠道获取数据,转换数据后利用Spark Engine进行数据处理。现在,包括Python、Java等多种高级语言都对Spark进行支持。本文使用pyspark进行编程。

Spark Streaming

 实践出真知
要做什么
nginx日志分析,简单统计了下PV和UV,并做了H5图表实时展示。使用的是我开发的基于ace-admin和react的管理端LogAdmin对数据进行展示。这里提供github,感兴趣的朋友可以看一下。

LogAdmin

下面是我的主要步骤。
①Flume实时读入nginx日志,并将数据导入Kafka中。
②pyspark从Kafka读入数据,做实时处理,并将处理后的数据导出到redis队列中。
③编写脚本从redis中取出数据,存入mysql。
④H5展示。
【版本:Logstash1.7.0,Kafka 2.11(该版本中已集成了Zookeeper),Spark(2.0.2)】
①Flume实时读入nginx日志,并将数据导入Kafka中。
这一步中,只需配置flume.conf,并依次启动flume、zookeeper、kafka即可
flume.conf(配置中命名已较为明确,hdfs部分被注释了)
[AppleScript] 纯文本查看 复制代码
# agent-my80

# Finally, now that we've defined all of our components, tell
# agent-my80 which ones we want to activate.
#agent-my80.channels = ch1
#agent-my80.sources = avro-source1
#agent-my80.sinks = hdfs-sink1

agent-my80.channels = ch2
agent-my80.sources = exec-source1
agent-my80.sinks = kafka-sink1

# Define a memory channel called ch1 on agent-my80
#agent-my80.channels.ch1.type = memory
agent-my80.channels.ch2.type = memory


# Define an Avro source called avro-source1 on agent-my80 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
#agent-my80.sources.avro-source1.channels = ch1
#agent-my80.sources.avro-source1.type = avro
#agent-my80.sources.avro-source1.bind = 0.0.0.0
#agent-my80.sources.avro-source1.port = 44444
#agent-my80.sources.avro-source1.basenameHeader = true


agent-my80.sources.exec-source1.channels = ch2
agent-my80.sources.exec-source1.type = exec
agent-my80.sources.exec-source1.command = tail -f /home/www/logs/access.log

# # Define a logger sink that simply logs all events it receives
# # and connect it to the other end of the same channel.
#agent-my80.sinks.hdfs-sink1.channel = ch1
#agent-my80.sinks.hdfs-sink1.type = hdfs
#agent-my80.sinks.hdfs-sink1.hdfs.path = hdfs://my80:9000/flume-test
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = event-
#agent-my80.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}
#agent-my80.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
#agent-my80.sinks.hdfs-sink1.hdfs.round = true
#agent-my80.sinks.hdfs-sink1.hdfs.roundValue = 10
#agent-my80.sinks.hdfs-sink1.hdfs.fileType = DataStream

agent-my80.sinks.kafka-sink1.channel = ch2
agent-my80.sinks.kafka-sink1.type = org.apache.flume.sink.kafka.KafkaSink 
agent-my80.sinks.kafka-sink1.topic = my80-log
agent-my80.sinks.kafka-sink1.brokerList = localhost:9092
agent-my80.sinks.kafka-sink1.batchSize = 20


flume启动命令

[AppleScript] 纯文本查看 复制代码
flume-ng agent --conf /usr/local/apache-flume-1.7.0-bin/conf --conf-file /usr/local/apache-flume-1.7.0-bin/conf/flume.conf --name agent-my80 -Dflume.root.logger=INFO,console



zookeeper启动命令
[AppleScript] 纯文本查看 复制代码
/usr/local/kafka_2.11-0.10.1.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/zookeeper.properties

kafka启动命令
[AppleScript] 纯文本查看 复制代码
/usr/local/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /usr/local/kafka_2.11-0.10.1.0/config/server.properties


注意:有些朋友,是用自己的个人服务器做demo,那么会遇到内存不足的问题,这时候一般通过,修改Java堆大小来解决。比如我是修改的kafka的server.properties来解决这个问题。

[AppleScript] 纯文本查看 复制代码
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" 
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

入数据,做实时处理,并将处理后的数据导出到redis队列中。
这部分为方便demo解析,我对nginx日志格式做了修改。
该步骤主要是做正则解析+MapReduce+数据导入redis,并分别将请求内容和请求ip放入redis的list和set,这样主要是方便我统计每天的PV和UV。
还要注意一点,nginx日志中包括静态文件,显然这个不能算UV和PV,所以要过滤。

calculate.py

[AppleScript] 纯文本查看 复制代码
#coding=utf8
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re
import redis
import datetime

# 解析日志
def parse(logstring):
    #使用正则解析日志
    # regex = '(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}).*ip=\/(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*tbl=([a-zA-Z0-9_]+)'
    regex = 'ip:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*?time:\[(.*?)\].*?request:\"(.*?)\".*?status_code:(\d{1,3}).*?agent:\"(.*?)\"'
    pattern = re.compile(regex)
    m1 = pattern.search(str(logstring))
    if m1 is not None:
        m = m1.groups()
        # print(m
        if len(m)!=5 or not m[2]:
            m= None
        else:
            hd_list=[u".js",u".css",u".jpg",u".png",u".jpeg",u".gif",u".bmp",u".woff"];
            if doStrContainAnyWords(m[2],hd_list):
                m= None
    else:
        m = None
    return m

def doStrContainAnyWords(str,words=[]):
    for word in words:
        if word in str:
            return True;
    return False;

class RedisClient:
    pool = None
    def __init__(self):
        self.getRedisPool()

    def getRedisPool(self):
        redisIp='localhost'
        redisPort=6379
        redisDB=0
        self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB)
        return self.pool

    def addToHashSet(self, key, value):
        if self.pool is None:
            self.pool = self.getRedisPool()
        r = redis.Redis(connection_pool=self.pool)
        hashSetName="my80-log-iphash-"+datetime.datetime.now().strftime("%Y-%m-%d");

        flag=False;
        if r.exists(hashSetName) is False:
            flag=True

        if r.hexists(hashSetName,str(key)):
            r.hincrby(hashSetName, str(key), value)
        else:
            r.hset(hashSetName, str(key), value)

        if flag is True:
            r.expire(hashSetName,3600*24+300);


    def addToList(self,value):
        if self.pool is None:
            self.pool = self.getRedisPool()
        r = redis.Redis(connection_pool=self.pool)
        r.lpush('my80-log-list', value)

if __name__ == '__main__':
    zkQuorum = 'localhost:2181'
    topic = 'my80-log'
    sc = SparkContext("local[2]", appName="kafka_pyspark_redis")
    ssc = StreamingContext(sc, 10)
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "kafka-streaming-redis", {topic: 1})
    #kafka读取返回的数据为tuple,长度为2,tuple[1]为实际的数据,tuple[0]的编码为Unicode


    res = kvs.map(lambda x: x[1]).map(lambda x:parse(x)).filter(lambda x:True if x is not None else False)
    items = res.map(lambda item:{"ip":item[0],"time":item[1],"request":item[2],"status_code":item[3],"agent":item[4] } )
    # items = res.map(lambda item:{"ip":item[0],"time":item[1] } )
    # ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b).map(lambda x:{ x[0]:str(x[1]) } )
    ipcount = res.map(lambda item:(item[0],1)).reduceByKey(lambda a, b: a+b)

    r = RedisClient()

    def handleItem(time,rdd):
        if rdd.isEmpty() is False:
            for element in rdd.collect():
                r.addToList(element)
    items.foreachRDD(handleItem)


    def ipHandle(time,rdd):
        if rdd.isEmpty() is False:
            # rddstr = "{"+','.join(rdd.collect())+"}"
            for element in rdd.collect():
                r.addToHashSet(element[0], element[1] )
    ipcount.foreachRDD(ipHandle)


    ssc.start()
    ssc.awaitTermination()

安装好spark-2.0.2-bin-hadoop2.7,脚本测试ok,最后就需要通过spark streaming提交任务(即提交calculate.py)。任务正常执行的话,数据就会从Kafka导出,经处理后,导入redis。

[AppleScript] 纯文本查看 复制代码
/usr/local/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --jars /usr/local/spark-2.0.2-bin-hadoop2.7/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar /home/dcb/python/pyspark/calculate.py

③编写脚本从redis中取出数据,存入mysql。
这一步相信大家没问题。
④H5图表展示
H5图表实时展示+github,感兴趣的朋友可以看一下。
小结
Flume+Kafka+Spark,是一个相对比较流行且可行的实时计算组合,可定制性较高,如果项目需求比较复杂,建议深入了解后进行定制开发。
来自简书 作者playwolf719


楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-26 20:37

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表