最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Flume] Flume的Source

[复制链接]
跳转到指定楼层
楼主
发表于 2019-10-25 11:32:05 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2019-10-25 11:44 编辑

flume中提供了多种source来应对不同场景的数据传输,最常用的有exec source和spooling directory source。下面对flume的source做详细的说明。
1.Spooling Directory Source
这种方式是将要传输的文件放在磁盘的某个目录下,这个目录可以理解为一个池子,当池子中有文件的时候就会被放入channel,当确认文件已经放入channel,原始文件会被重命名或者删除。
这种方式是可靠的、并且不会丢失数据,当agent挂掉重启后也能接着传输。需要注意的是放入池子的文件不能再被写入或者不能出现重名文件,否则传输会报错。
需要注意的是尽管这种方式提供了可靠性保证,但是下游的某种失败也会导致event重复。
以下是该source的配置属性,加粗的为必选属性。

属性名称
默认值
说明
channels
channel,可以为多个
type
组件的类型名称,这里为spooldir
spoolDir
读取文件的目录
fileSuffix
.COMPLETED
为已经读取的文件添加的后缀
deletePolicy
never
删除spool中文件的策略,有never和immediate
fileHeader
false
是否在全路径的文件名上添加header
fileHeaderKey
file
将一个绝对路径的文件名添加到event header时的key
basenameHeader
false
是否在基本文件名上添加一个header
basenameHeaderKey
basename
将一个基本文件添加到event header时的key
includePattern
^.*$
用来标识需要被传输的文件名的正则表达式,可以与ignorePattern同时使用
如果同时命中了两条规则,则这个文件会被忽略。
ignorePattern
^$
用来标识将要被忽略的文件的正则表达式。
trackerDir
.flumespool
用来存储正在处理的文件的元数据的目录,如果这个目录不是一个绝对路径,
可以将其理解为一个和poolDir相关的目录
consumeOrder
oldest
消耗spool中文件的顺序,有oldest,youngest,random,当为oldest或者youngest时,
会通过文件最后一次修改时间去排序,若修改时间一致,则按照文件名称的字典顺序排序。
这两种方式每次都会扫描整个目录,当有大量文件的时候速度会比较慢。
当为random时会随机的消耗文件,这种方式速度快但是有可能造成最早生成的文件最后消耗,对于时序有要求的不建议使用。
pollDelay
500
轮巡新文件的延时,单位为毫秒
recursiveDirectorySearch
false
是否监控并读取子目录的文件。
maxBackoff
4000
两次写入channel的时间间隔,单位为毫秒,
source会以一个较大的间隔时间开始,然后每次以指数减少的方式缩短间隔时间,直到channel写满抛出异常,是一个动态的参数。
batchSize
100
每个批次的大小
inputCharset
UTF-8
deserializers用来处理文件的字符集
decodeErrorPolicy
FAIL
字符无法解码时的策略。
FAIL:抛出一个解码失败的异常。
REPLACE:替换字符,一般使用Unicode字符U+FFFD。
IGNORE:丢弃无法解析的队列
deserializer
LINE
deserializer用于将file解析成event,默认将文件的每一行解析为一个event。
deserializer.*

可以使用多个 deserializer对一个event处理。
bufferMaxLineLength
5000
每个需要提交的buffer中的最大文件行数,使用deserializer.maxLineLength替代。
selector.type
replicating
取值有replicating、multiplexing
selector.*

依赖于selector.type
interceptors
拦截器,可配置多个,使用空格分割。
配置示例
[AppleScript] 纯文本查看 复制代码
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
2.Scribe Source
scribe是facebook 提供的另外一种数据传输系统,使用scribe source相当于在scribe之后又套接了flume传输数据。以下是各个数据传输系统的对比图。

scribe source 的属性配置如下:

属性
默认值
说明
type
org.apache.flume.source.scribe.ScribeSource
port
1499
端口
maxReadBufferBytes
16384000
thrift默认的配置
workerThreads
5
工作线程数
selector.type
replicating
取值有replicating、multiplexing
selector.*

依赖于selector.type
配置示例:
[AppleScript] 纯文本查看 复制代码
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1
3.Exec Source
exec source是一种比较简单的用来实时传输的source,它会启动一个用户给定的类Unix命令,这个命令会不断的产生数据流,exec source接收它产生的数据流。一般常见的命令有tail -f ,也可以使用一些比较复杂的命令。
但是这种方式无法保证可靠性,source无法感知event放入channel时产生的错误。比如当channel写满的时候,source还会不断的发送数据到channel,这会导致数据丢失。
exec source的属性配置如下:

属性
默认值
说明
channels
channel,可以为多个
type
组件的类型名称,这里为exec
command
需要执行的命令。
shell
一个用户执行命令的shell调用。
restartThrottle
10000
重启之前等待的时间。
restart
false
当执行的命令挂掉后是否需要重启。
logStdErr
false
是否要将命令的错误日志记录。
batchSize
20
读取并发送到channel的最大行数。
batchTimeout
3000
批次间隔时间。
selector.type
replicating
取值有replicating、multiplexing
selector.*

依赖于selector.type
interceptors
拦截器,可配置多个,使用空格分割。
配置示例:
[AppleScript] 纯文本查看 复制代码
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
4.Taildir Source
这种方式会监听指定的文件,并以接近实时的频率去读取文件新产生的行。这种方式是可靠的并且不会丢失数据,因为在读取文件的过程中,会周期的记录文件读取的位置到一个json格式的文件中。当agent出现问题重启时会去读取位置文件中的信息,然后重新传输,也可以从指定的位置进行传输。taildir source不支持传输二进制的文件。
Taildir source的属性配置如下:

属性
默认值
说明
channels
channel,可以为多个
type
组件的类型名称,这里为TAILDIR
filegroups
空格分割的多个文件组,每个组中有一组需要被tail的文件。
filegroupName
文件组的绝对路径。
positionFile
~/.flume/taildir_position.json
用于tail文件时记录最新的位置。
headers
多个header可以用来标识一个文件组
byteOffsetHeader
false
是否为已经tail的行添加一个叫byteoffset的字节偏移
skipToEnd
false
在位置文件没有写入的情况下是否跳过位置信息直接到末尾
idleTimeout
120000
当一个文件不活动的时间达到idleTimeout指定的时间,
则关闭对该文件的传输,
当有新的行写入时,会自动重新传输。
writePosInterval
3000
将每个文件最后一次读取位置写入位置文件的间隔时间。
batchSize
100
一次读取并发送到channel的文件行数的大小。
backoffSleepIncrement
1000
上一次没有读取到新数据,再一次读取的时延的增量,单位毫秒。
maxBackoffSleep
5000
上一次没有读取到新数据,再一次读取数据时间隔的最大时间,单位毫秒。
cachePatternMatching
true
当一个目录有成千上万个文件,使用正则表达式匹配比较耗时,
缓存文件列表能改善这个问题。文件消耗的顺序也会被缓存
要求文件系统保持至少一秒的间隔去跟踪文件的修改时间。
fileHeader
false
是否添加一个header去存储文件的绝对路径。
fileHeaderKey
file
上述header的key
配置示例:
[AppleScript] 纯文本查看 复制代码
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
来源:大数据随笔
楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-7 06:25

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表