马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
1 序对ETL系统中数据转换和存储操作的相关日志进行记录以及实时分析有助于我们更好的观察和监控ETL系统的相关指标(如单位时间某些操作的处理时间),发现系统中出现的缺陷和性能瓶颈。 由于需要对日志进行实时分析,所以Storm是我们想到的首个框架。Storm是一个分布式实时计算系统,它可以很好的处理流式数据。利用storm我们几乎可以直接实现一个日志分析系统,但是将日志分析系统进行模块化设计可以收到更好的效果。模块化的设计至少有两方面的优点: - 模块化设计可以使功能更加清晰。整个日志分析系统可以分为“数据采集-数据缓冲-数据处理-数据存储”四个步骤。Apache项目下的flumeng框架可以很好的从多源目标收集数据,所以我们用它来从ETL系统中收集日志信息;由于采集数据与处理数据的速度可能会出现不一致,所以我们需要一个消息中间件来作为缓冲,kafka是一个极好的选择;然后对流式数据的处理,我们将选择大名鼎鼎的storm了,同时为了更好的对数据进行处理,我们把drools与storm进行了整合,分离出了数据处理规则,这样更有利于管理规则;最后,我们选择redis作为我们处理数据的存储工具,redis是一个内存数据库,可以基于健值进行快速的存取。
- 模块化设计之后,storm和前两个步骤之间就获得了很好的解耦,storm集群如果出现问题,数据采集以及数据缓冲的操作还可以继续运行,数据不会丢失。
2 相关框架的介绍和安装2.1 flumeng2.1.1 原理介绍Flume是一个高可用、高可靠、分布式的海量日志采集、聚合和传输系统。Flume支持在日志系统中定制日志发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接收方的能力。它拥有一个简单的、可扩展的流式数据流架构,如下图所示:
日志收集系统就是由一个或者多个agent(代理)组成,每个agent由source、channel、sink三部分组成,source是数据的来源,channel是数据进行传输的通道,sink用于将数据传输到指定的地方。我们可以把agent看做一段水管,source是水管的入口,sink是水管的出口,数据流就是水流。 Agent本质上是一个jvm进程,agent各个组件之间是通过event来进行触发和协调的。 2.1.2 flumeng的安装2.2 kafka2.2.1 原理介绍Kafka是linkedin用于日志处理的分布式消息队列。Kafka的架构如下图所示: Kafka的存储策略有一下几点: - kafka以topic来进行消息管理,每个topic包括多个partition,每个partition包括一个逻辑log,由多个segment组成。
- 每个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
- 每个partition在内存中对应一个index,记录每个segment中的第一条消息的偏移。
- 发布者发到某个topic的消息会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
2.2.2 kafka集群的搭建Kafka集群的搭建需要依赖zookeeper来进行负载均衡,所以我们需要在安装kafka之前搭建zookeeper集群。 2.3 storm2.3.1 原理介绍Storm是一个分布式的、高容错的实时计算系统。Storm对于实时计算的的意义相当于hadoop对于批处理的意义。hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美。同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。 Strom集群里面有两种节点,控制节点和工作节点,控制节点上面运行一个nimbus(类似于hadoop中的JobTracker)后台程序,Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。每一个工作节点上面运行一个叫做Supervisor(类似Hadoop中的TaskTracker)的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个Topology(类似hadoop中的Job)的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程 Worker(类似Hadoop中的Child)组成。结构如下图所示: Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。 storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。 Spout是流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。 Bolt可以接收任意多个输入stream。Bolt处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。 spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(类似 Job), 你可以把topology提交给storm的集群来运行。Topology的结构如下图所示: 2.3.2 storm集群的搭建Storm集群的搭建也要依赖于zookeeper,本系统中storm与kafka共用同样一个zookeeper集群。 - 下载安装包storm-0.9.0.1.tar.gz,并对该包进行解压。
配置nimbus。 修改storm的conf/storm.yaml文件如下:[AppleScript] 纯文本查看 复制代码 storm.zookeeper.servers:
//zookeeper集群
-
“10.200.187.71″
-
“10.200.187.73″
storm.local.dir:
“/usr/endy/fks/storm-workdir
“
storm.messaging.transport:
“backtype.storm.messaging.netty.Context”
storm.messaging.netty.server_worker_threads:
1
storm.messaging.netty.client_worker_threads:
1
storm.messaging.netty.buffer_size:
5242880
storm.messaging.netty.max_retries:
100
storm.messaging.netty.max_wait_ms:
1000
storm.messaging.netty.min_wait_ms:
100
注意:在每个配置项前面必须留有空格,否则会无法识别。storm.messaging.* 部分是Netty的配置。如果没有该部分。那么Storm默认还是使用ZeroMQ。 配置supervisor 修改storm的conf/storm.yaml文件如下:[AppleScript] 纯文本查看 复制代码 storm.zookeeper.servers:
- “10.200.187.71″
- “10.200.187.73″
nimbus.host: “10.200.187.71″
supervisor.slots.ports:
- 6700
- 6701
- 6702
storm.local.dir: “/usr/endy/fks/storm-workdir”
storm.messaging.transport: “backtype.storm.messaging.netty.Context”
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
注意 - nimbus.host是nimbus的IP或hostname
- supervisor.slots.ports 是配置slot的ip地址。配了几个地址,就有几个slot,即几个worker。如果尝试提交的topology所声明的worker数超过当前可用的slot,该topology提交会失败。
- storm.messaging 部分是Netty的配置。
2.4 droolsDrools是一个基于Java的、开源的规则引擎,可以将复杂多变的规则从硬编码中解放出来,以规则脚本的形式存放在文件中,使得规则的变更不需要修正代码重启机器就可以立即在线上环境生效。 日志分析系统中,drools的作用是利用不同的规则对日志信息进行处理,以获得我们想要的数据。但是,Drools本身不是一个分布式框架,所以规则引擎对log的处理无法做到分布式。我们的策略是将drools整合到storm的bolt中去,这就就解决了drools无法分布式的问题。这是因为bolt可以作为task分发给多个worker来处理,这样drools中的规则也自然被多个worker处理了。 2.5 redisRedis是key-value存储系统,它支持较为丰富的数据结构,有String,list,set,hash以及zset。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。 Redis是内存数据库,所以有非常快速的存取效率。日志分析系统数据量并不是特别大,但是对存取的速度要求较高,所以选择redis有很大的优势。 3 各个框架的整合3.1 ETL系统整合flumengFlume如何收集ETL系统中的日志是我需要考虑的第一个问题。log4j2提供了专门的Appender-FlumeAppender用于将log信息发送到flume系统,并不需要我们来实现。我们在log4j2的配置文件中配置了ETL系统将log信息发送到的目的地,即avro服务器端。该服务器端我们在flume的配置文件中进行了配置。配置信息如下所示:
[AppleScript] 纯文本查看 复制代码
producer.sources
=
s
producer.channels
=
c
producer.sinks
=
r
producer.sources.s.type
=
avro
producer.sources.s.channels
=
c
producer.sources.s.bind
=
10.200.187.71
producer.sources.s.port
=
4141
3.2 flumeng与kafka的整合 我们从ETL系统中获得了日志信息,将该信息不作任何处理传递到sink端,sink端发送数据到kafka。这个发送过程需要我们编写代码来实现,我们的实现代码为KafkaSink类。主要代码如下所示: [AppleScript] 纯文本查看 复制代码 public class KafkaSink extends AbstractSink implements Configurable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
private Properties parameters;
private Producer<String, String> producer;
private Context context;
@Override
public void configure(Context context) {
this.context = context;
ImmutableMap<String, String> props = context.getParameters();
parameters = new Properties();
for (String key : props.keySet()) {
String value = props.get(key);
this.parameters.put(key, value);
}
}
@Override
public synchronized void start() {
super.start();
ProducerConfig config = new ProducerConfig(this.parameters);
this.producer = new Producer<String, String>(config);
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
String partitionKey = (String) parameters.get(KafkaFlumeConstans.PARTITION_KEY_NAME);
String encoding = StringUtils.defaultIfEmpty(
(String) this.parameters.get(KafkaFlumeConstans.ENCODING_KEY_NAME),
KafkaFlumeConstans.DEFAULT_ENCODING);
String topic = Preconditions.checkNotNull(
(String) this.parameters.get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
“custom.topic.name is required”);
String eventData = new String(event.getBody(), encoding);
KeyedMessage<String, String> data;
// if partition key does’nt exist
if (StringUtils.isEmpty(partitionKey)) {
data = new KeyedMessage<String, String>(topic, eventData);
} else {
data = new KeyedMessage<String, String>(topic, String.valueOf(new Random().nextInt(Integer.parseInt(partitionKey))), eventData);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(“Send Message to Kafka : [" + eventData + "] — [" + EventHelper.dumpEvent(event) + "]“);
}
producer.send(data);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
@Override
public void stop() {
producer.close();
}
}
该类中,我们读取了一些配置信息,这些配置信息我们在flumeng的flume-conf.properties文件中进行了定义,定义内容如下:
[AppleScript] 纯文本查看 复制代码
producer.sinks.r.type
=
org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=10.200.187.71:9092
producer.sinks.r.partition.key=0
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=fks1
producer.sinks.r.channel
=
c
producer.channels.c.type
=
memory
producer.channels.c.capacity
=
1000
将上面的KafkaSink类打包成flumeng-kafka.jar,并将该jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目录下,启动flume,我们就可以将ETL系统中产生的日志信息发送到kafka中的fks1这个topic中去了。 3.3 kafka与storm的整合Storm中的spout如何主动消费kafka中的消息需要我们编写代码来实现,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus实现了一个kafka与storm整合的插件,下载该插件,将插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目录下。利用插件中的StormSpout类,我们就可以消费kafka中的消息了。主要代码如下所示:
[AppleScript] 纯文本查看 复制代码 public class KafkaSpout extends BaseRichSpout {
public static class MessageAndRealOffset {
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
}
}
static enum EmitState {
EMITTED_MORE_LEFT,
EMITTED_END,
NO_EMITTED
}
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;
long _lastUpdateMs = 0;
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
List zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
context.registerMetric(“kafkaOffset”, new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List pms = _coordinator.getMyManagedPartitions();
Set latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, 60);
context.registerMetric(“kafkaPartition”, new IMetric() {
@Override
public Object getValueAndReset() {
List pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
}
}, 60);
}
@Override
public void close() {
_state.close();
}
@Override
public void nextTuple() {
List managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now – _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.ack(id.offset);
}
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.fail(id.offset);
}
}
@Override
public void deactivate() {
commit();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}
}
3.4
storm中bolt与drools的整合 Drools可以将storm中处理数据的规则提取到一个drl文件中,该文件就成了唯一处理规则的文件。任何时候规则出现变化,我们只需要修改该drl文件,而不会改变其它的代码。Bolt与drools的整合代码如下所示: [AppleScript] 纯文本查看 复制代码 public
class
LogRulesBolt
implements
IBasicBolt
{
Logger
logger=
LoggerFactory.getLogger(LogRulesBolt.class);
private
static
final
long
serialVersionUID
=
1L;
public
static
final
String
LOG_ENTRY
=
“str”;
private
StatelessKnowledgeSession
ksession;
private
String
drlFile;
public
LogRulesBolt()
{}
public
LogRulesBolt(String
drlFile)
{
this.drlFile=drlFile;
}
@Override
public
void
prepare(Map
stormConf,
TopologyContext
context)
{
KnowledgeBuilder
kbuilder
=
KnowledgeBuilderFactory.newKnowledgeBuilder();
try
{
kbuilder.add(
ResourceFactory.newInputStreamResource(new
FileInputStream(new
File(drlFile))),
ResourceType.DRL
);
}
catch
(FileNotFoundException
e)
{
logger.error(e.getMessage());
}
KnowledgeBase
kbase
=
KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages(
kbuilder.getKnowledgePackages()
);
ksession
=
kbase.newStatelessKnowledgeSession();
}
@Override
public
void
execute(Tuple
input,
BasicOutputCollector
collector)
{
String
logContent=(String)input.getValueByField(LOG_ENTRY);
logContent=logContent.trim();
if(!””.equals(logContent)&&logContent!=null)
{
LogEntry
entry
=
new
LogEntry(logContent);
try{
ksession.execute(
entry
);
}catch(Exception
e)
{
logger.error(“drools
to
handle
log
["+logContent+"]
is
failure
!”);
logger.error(e.getMessage());
}
collector.emit(new
Values(entry));
}
else
{
logger.error(“log
content
is
empty
!”);
}
}
@Override
public
void
cleanup()
{
}
@Override
public
void
declareOutputFields(OutputFieldsDeclarer
declarer)
{
declarer.declare(new
Fields(LOG_ENTRY));
}
@Override
public
Map<String,
Object>
getComponentConfiguration()
{
return
null;
}
}
通过规则处理数据之后,我们就可以将处理过的数据发送到下一个bolt中,然后将数据存储到redis中。 4 相关思考4.1 系统的优点- 模块化的设计,使功能分散到各个模块中,对各个功能进行了解耦,使系统的容错性更高。
- kafka作为中间缓冲,解决了flume和storm速度不匹配的问题。
- 利用drools将规则和数据进行了解耦。把规则写到一个配置文件中,避免了每次修改规则就要修改代码的缺点。
- storm和drools整合解决了drools的规则引擎无法并行化的问题。
- redis是内存数据库,可以很快速的写数据到数据库中,加快了整个系统的处理速度,避免了数据库的瓶颈。
4.2 待考虑的问题- 整个系统还没有用大量数据进行测试,稳定性以及性能瓶颈需要进一步的考虑、发现和改进。
- 在现有的系统中,flume只能发送数据到kafka的单个broker的单个partition中,后期需要修改代码以适应多个broker多个partition。这点是可以实现的,我已经实现了一部分。可以将数据发送到单个broker的多个partition中。
- 现有的系统,修改规则文件之后,需要重新启动topology,无法进行热加载。这点是需要进一步考虑的。
- drools是一个优异的规则引擎。但是它的速度仍然让我有点担心。这个问题可能在以后数据变大之后会体现出来。我们思考了esper这个开源的规则引擎,它的速度更快,但是它类sql语言的规则处理语言不是太适合我们的日志分析系统。以后是不是能够作进一步的开发,用esper代替drools是我们要考虑的一个问题。
- 思考现有的架构,flume并不是缺一不可的模块,我们可以在ETL系统中直接将log信息发送到kafka中,然后利用storm进行处理。但是为了整个系统的可扩展性(例如我们还想要将log信息发送到HDFS中,利用flume可以直接配置)和易配置性,利用flume会更好。是否要用flume,flume是否会影响整个系统的速度,需要以后进一步的论证。
- flume、kafka、storm、redis的各个参数的取值对系统的影响也较大。所以这些参数需要在以后的应用中选定合适的值。
4.3 框架层面的思考- flume是纯java实现的框架,比较有趣的是各种source接口(如avro source、thrift source)以及sink(HDFS sink、Logger sink)接口的实现。以后有兴趣可以进一步阅读源代码。
- kafka的思路很好,充分利用了磁盘顺序写入和顺序读取的路子,存储的性能很好,只要几个节点就能处理大量的消息了;另外,它突破了常规的一些消息中间件由服务端来记录消息消费状态的传统,彻底由客户端自己来记录究竟处理到哪里了,失败也罢成功也罢,客户端本来是最清楚的了,由它来记录消费状态是最适合不过了。Kafka中这种处理思路是我们值得学习的地方,我们也可以看代码来体会这种设计。Kafka是由scala实现的,没有scala基础的可以先看看scala编程。
- storm主要是用clojure、java来实现的,还包括部分的Python代码。代码量25000行左右。在它的源代码中,用java实现框架结构,clojure实现功能细节。storm中的模拟本地集群的实现,保证消息只处理一次的功能的实现,都很巧妙,值得我们去看代码,不管是现在用得到还是用不到。
- redis是c实现的,速度很快,代码量不大。
|