最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[综合] flume+kafka+spark stream+hbase做日志收集

[复制链接]
跳转到指定楼层
楼主
发表于 2019-10-25 12:24:11 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2019-10-25 12:27 编辑

前言
flume+kafka+Spark stream 是目前比较常用的一套大数据消息日志收集管理框架,至于最后是入到Hive或者者Hbase需看不同业务场景,下面以HBase为场景简述下整个配置与搭建流程以及这些框架如此搭配的优点。


前言
1. flume 配置
1.1 flume 简介
1.2 flume 配置
2. kafka 配置
2.1 kafka简介
2.2 kafka配置
3. spark stream 消费者
3.1 spark stream 简介
3.2 spark stream 写 Kafka 消费者
4. HBase 存储
4.1 HBase 简介
4.2 spark stream 写入 HBase (以HBase 1.2.0 为例)
5. Hive 外部表关联 HBase, impala 映射查询
5.1 Hive 外部银映射 HBase:
总结
1. flume 配置
1.1 flume 简介
从官网文档 https://flume.apache.org 可以知道Flume的定位是很清晰的,它提供了一个分布式的,高可用的桥梁链接,可以收集、聚合和移动大量的日志数据,从许多不同的数据源到集中式数据存储,大概的结构如下图,流程大致为 从源端(source)接收数据,经过管道(channel)的缓存等等,发送到目标(sink)端。:





其中source的定义flume提供了很多方式,常用的有以下几种:


Http source ,这种方式可以通过监听接口方式来收集log;
Exec source ,这种方式可以通过执行一些shell命令来收集log,例如通过 tail -f 文件 来监听文件追加的日志;
Spooling source,这种方式可以监听某个目录的log,当有新的log产生时即会被发送出去;
还有很多其他的方式,例如可以以kafka作为source,这样flume就充当了kafka的消费者,当然还有很多如 Avro source,Thrift source,TCP类的等等,具体参考官网文档更加相应场景配置即可。
channel同样flume提供了很多方式,memory channel这种方式已经不太建议了,原因也很明显,不够安全,当出现任何机器问题时数据就会丢失,file channel和kafka channel是比较推荐的做法,特别是当需要比较高的并发时,kafka channel是一个不错的选择。


sink同样flume提供了很多方式,常用的有以下几种:


HDFS/Hive/Hbase/ElasticSearch sink,直接写入hdfs/Hive/Hbase/ElasticSearch,这种方式适合那些比较无需做ETL的场景。
kafka sink,直接充当kafka的生产者,可以看到kafka可以在整个flume生命周期里可以自由穿插。
Http sink,直接通过post方法将数据发送到目标api。
其他的一些详细见官网文档即可。
1.2 flume 配置
下面以Spooling Directory Source -> file channel -> kafka sink为例:


一份样例配置参数:
[AppleScript] 纯文本查看 复制代码
# Name the components on this agent
agent.sources = dir-src
agent.sinks = kafka-sink
agent.channels = file-channel

# Describe/configure the source
agent.sources.dir-src.type = spooldir
agent.sources.dir-src.spoolDir = #监听目录
agent.sources.dir-src.fileHeader = true
agent.sources.dir-src.deserializer.maxLineLength=1000000

# Describe the sink
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.topic = test
agent.sinks.kafka-sink.kafka.bootstrap.servers = #kafka boostrapServer

# Use a channel which buffers events in file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = # checkpoint目录
agent.channels.file-channel.dataDirs = # 缓存的数据目录

# Bind the source and sink to the channel
agent.sources.dir-src.channels = file-channel
agent.sinks.kafka-sink.channel = file-channel 

配置详解:


首先每个flume配置表可以存在多个agent,多个source,多个channel,多个sink,所以可以根据相应业务场景进行组合。
对于每个agent,必须配置相应的source/channel/sink,通过agent.source = ???,agent.channel = ???,agent.sink = ???来指定。
对于具体的source/channel/sink,通过 agent.{source/channel/sink}.???.属性 = ... 来具体配置 source/channel/sink 的属性。
配置完source/channel/sink相应的属性后,需把相应的组件串联一起,如: agent.sources.dir-src.channels = file-channel其中dir-src这个source指定了其channel为我们定义好的file-channel.
一些Tips:

flume在收集log的时候经常会出现Line length exceeds max (2048), truncating line!,这个一般情况对于一些log的存储没影响,但是遇到需要解析log的情况就有问题了,有时一个json或者其他格式的log被截断了,解析也会出问题,所以在source的属性配置里可以通过参数deserializer.maxLineLength调高默认的2048。
flume在监听相应的目录时,如果有重名的文件,或者直接在监听目录下修改相应正在读取的文件时,都会报错,而且flume-ng目前没有这种容错机制,报错只能重启了,还有一个比较大的问题,flume-ng没有提供相应的kill脚本,只能通过shell直接ps -aux | grep flume找到相应的PID,然后手动kill。
flume在监听相应目录时,如果目录下的文件是通过HTTP或者scp传输过来的,小文件的话没问题,但是当文件大小超过网络传输速率,就会造成flume读取文件时报错直接显示文件大小正在变化,这点也是比较麻烦的,所以建议是现有个临时目录先存放文件,等文件传输完成后再通过shell的mv命令直接发送到监听目录。
有时候我们的log文件是以压缩的方式传输过来,但是如果我们想解析后才发送出去的话,可以将当前的Spooling Directory Source的改为Exec Source,可以指定改source的command参数里写shell解析命令。
flume的启动:


flume-ng agnet --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"


2. kafka 配置
2.1 kafka简介
kafka的官网 https://kafka.apache.org 同样对kafka的定位做了一个清晰阐述,分布式的消息流平台,与传统的MQ架构类似,kafka解耦了生产者,中间层与消费者三个组件,乍一听似乎与其他的MQ框架没有太大的区别,于是对比了很久,各个框架间并没有表现出显著性的区别以致某一方是不可替代的,但是其中仍有一些值得细细推敲的地方,具体可见下表(以rabbit MQ 为例):


属性        rabbit MQ        Kafka
多语言支持        支持,语言无关        支持,语言无关
消息延迟        微妙级        毫秒级
负载均衡        miror queue        多broker,多replication
协议问题        遵从AMQP协议, broker由Exchange,Binding,queue组成,客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费, rabbitMQ以broker为中心,有消息的确认机制。        遵从一般的MQ结构,producer,broker,consumer,consumer从broker上批量pull数据,通过提交offset来做相应消息拉取管控。
集群扩展        支持        原生支持
事务支持        原生支持        支持
除上述所列各点外,还有几点需单独拿出讨论的:


kafka利用zookeeper做均衡管理,最新的kafka版本在消费者消费完信息后会将offset保存在kafka本身服务上,而不是zookeeper上,这在很大程度保证了消息队列被消费不会出现缺失与重复,但是要保证0重复0丢失,对于consumer提交offset的设计仍有比较大的考验。
kafka在创建topic时一般都是分区存储,如此带来的问题是每个分区间的消息顺序是很难保证全局性,只能在单个分区下保证,因此kafka在日志这个领域会更加的吻合和焕发光芒。
2.2 kafka配置
同样,下面是一份单broker的kafka配置方案:
[AppleScript] 纯文本查看 复制代码
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

delete.topic.enable = true 



有几点配置需要注意:

broker.id 必须是全局唯一的,多个broker尽量部署在不同的集群上,通过指定相同的zookeeper.connect 进行统一管理。
listeners 是监听相应的IPort,如果kafka已经部署在集群上,会通过java.net.InetAddress.getCanonicalHostName()自动获取到相应的地址。
num.partitions 是为每个topic保留的默认分区,如果创建topic时不指定即采用默认1。
其他的一些配置参数看注释既可以,delete.topic.enable = true可以让topic的删除什么更加方便。
kafka的启动:

kafka-server-start server.properties

创建无备份,分区为1的topic:


kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic test

删除topic:

kafka-topics  --delete --zookeeper localhost:2181 --topic test
zookeeper-client
rmr /brokers/topics/test

3. spark stream 消费者
3.1 spark stream 简介
Spart Stream 是 Spark 框架下一个流处理的子项目,其基础数据DStream封装的是spark的RDD,通过轮询不断地从源端拉取数据,spark stream支持多种源端数据的拉取,同时基于spark的核心计算模块,使得其在实时性和大数据方面有着很强的优势,其流程结构大概如下图所示:


3.2 spark stream 写 Kafka 消费者
spark stream 写 kafka 消费者,官方提供了相应的示例,这里再稍微简述下:

首先sbt引入spark stream/Kafka相关依赖
[AppleScript] 纯文本查看 复制代码
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0" % Provided[/color][/size][/font]
[font=微软雅黑][size=2][color=#000000]libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"  % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0"  
其次定义好kafka参数:
[AppleScript] 纯文本查看 复制代码
val kafkaParams = Map[String, Object]([/color][/size][/font]
[font=微软雅黑][size=2][color=#000000]        "bootstrap.servers" -> "192.168.1.23:9093,gz19:9092,gz21:9092,gz24:9092,gz18:9092,gz89:9092,bigdata.zuzuche.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "kafka_consumer_tantu",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    ) 
订阅相应的topic:
[AppleScript] 纯文本查看 复制代码
val stream = KafkaUtils.createDirectStream[String, String]([/color][/size][/font]
[font=微软雅黑][size=2][color=#000000]            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        ) 
接下来就可以对stream做进一步的处理,跟spark rdd的处理类似。


同样在写spark stream的时候有一些细节需要注意:


spark stream的轮询时间最小可以达到500ms, 但是如此带来的集群资源消耗也会更大,轮询的时间间隔应根据具体的场景设定。
spark stream本质上仍为spark的任务,只是添加了轮询机制使其一直挂在后台,当spark-submit提交spark stream的时候若设定的excutor大于kafka topic创建时设定的分区,多出来的部分会处于空闲,所以两者的配置要互相参考。
4. HBase 存储
4.1 HBase 简介
HBase是NoSql中的一个代表,是一个面向列的数据库,支持亿级别的行*百万级别的列,若要定位到某个字段的值,通常需要限定如下:表名 -> rowid -> column family:column name -> timestamp,其中rowid为全局唯一的行键,行键的设计会影响到列的同个列下的排序,column family为列簇,其含义接近于HIve中的分区,通过column family的限定,其下相应的column会被集中存放,不同column family的column会分开存放,这样当需要索引少量的列时,无需遍历全部字段,当然,column family也不是越多越好,而且官方文档似乎也不支持过多的列簇,关于HBase的表结构,参考如下图:



4.2 spark stream 写入 HBase (以HBase 1.2.0 为例)
引入HBase相关依赖:


libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0"

将数据存储为HBase对应的格式:

// 随机产生某个uuid为行键
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
// 将列簇,列明,列值添加进对应结构
put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_name"), Bytes.toBytes("column_value"))


插入HBase:

// 表名
val tablename = "table_name"
// 创建初始配置
val hbaseconf = HBaseConfiguration.create()
// 创建链接
val conn = ConnectionFactory.createConnection(hbaseconf)
// 指定表
val table: HTable = new HTable(hbaseconf, Bytes.toBytes(tablename))
// 提交事务,插入数据
table.put(put)

5. Hive 外部表关联 HBase, impala 映射查询
Hive做HBase的外部关联,需提前定义好列字段,而通常HBase的列都是无限扩展的,所以通过Hive外部映射HBase,只能处理一些日常的查询需求。


5.1 Hive 外部银映射 HBase:
CREATE EXTERNAL TABLE hive_external_HBase(
key string,
time string,
`_track_id` string,
)   
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'   
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,search:time,search:_track_id")   
TBLPROPERTIES("hbase.table.name" = "HBase_table_name");

语法与创建Hive基本一致,需要注意的是hive字段不支持特殊字符如$_*&等开头,需加转义符。


最后,Hive有时候查询的速度并不能达到我们的想象,再做以不impala映射,用impala的查询引擎,会明显快很多:


INVALIDATE METADATA;

总结
flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:


为什么不用kafka直接接收源数据,而用flume作为Kafka的源?
从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。

为什么用spark stream作为kafka的消费者而不是其他?
就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。


为什么是入到hbase而不是其他Nosql?
无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。
————————————————
版权声明:本文为CSDN博主「杨铖」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/yc_1993/article/details/80865009

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-6 20:50

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表