最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[Flume] flume的 sink

[复制链接]
跳转到指定楼层
楼主
发表于 2019-10-25 11:48:10 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
1.HDFS Sink
hdfs sink支持将text类型或者sequenceFile类型的event写入hdfs,同时也支持在写入的时候进行压缩。
flume使用hadoop提供的FileSystem相关的操作工具,使用hsync或者hflush的方式将数据写入hdfs。

HDFS在写数据务必要保证数据的一致性与持久性,从HDFS最初的版本到2.0版本HDFS提供了两种同步语义。
1.将client端写入的数据刷到每个DataNode的OS缓存中,如果每个副本所在的DataNode同时crash时(例如机房断电)就会导致数据丢失(sync和hflush方法)。
2. 将client端写入的数据刷到每个DataNode的磁盘中(hsync方法);
在Hadoop2.0和cdh4中DFSOutputStream提供了sync,hflush和hsync三个方法,sync和hflush均是语义1,而hsync是语义2,hsync比sync和hflush的同步性更强

[AppleScript] 纯文本查看 复制代码
// Write the data to HDFS
        try {
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
                   "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }

        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) {
          writers.add(bucketWriter);
        }
      }

      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }

      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }

      transaction.commit();
2.Kafka Sink
kafka sink支持将数据打入kafka以供其他系统通过topic接入。
flume发送消息到kafka源码解析:
[AppleScript] 纯文本查看 复制代码
kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
producer.flush();
for (Future<RecordMetadata> future : kafkaFutures) {
          future.get();
      }
以上是flume将数据发送到kafka的功能性代码,可以看到使用的是kafka的producer,整个发送过程都有事务保证,同时提供了一个发送后的回调函数,但是遗憾的是如果发生异常这个回调函数只是打印错误日志。源码如下:
[AppleScript] 纯文本查看 复制代码
class SinkCallback implements Callback {
  private static final Logger logger = LoggerFactory.getLogger(SinkCallback.class);
  private long startTime;

  public SinkCallback(long startTime) {
    this.startTime = startTime;
  }

  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if (exception != null) {
      logger.debug("Error sending message to Kafka {} ", exception.getMessage());
    }

    if (logger.isDebugEnabled()) {
      long eventElapsedTime = System.currentTimeMillis() - startTime;
      if (metadata != null) {
        logger.debug("Acked message partition:{} ofset:{}", metadata.partition(),
                metadata.offset());
      }
      logger.debug("Elapsed time for send: {}", eventElapsedTime);
    }
  }
}
所以,flume自身内部有事务机制保证数据,但是与kafka交互过程中不能保证数据的准确性。
3.Hive Sink
支持将含有分隔符的text或者json数据导入hive,通过连接hive server,只要有数据导入hive,则在hive中这些数据就是立即可见的。同时也可以在hive中创建分区。
flume数据导入hive使用thrift方式,并且具有事务性。核心代码如下:
[AppleScript] 纯文本查看 复制代码
 private int drainOneBatch(Channel channel)
          throws HiveWriter.Failure, InterruptedException {
    int txnEventCount = 0;
    try {
      Map<HiveEndPoint,HiveWriter> activeWriters = Maps.newHashMap();
      for (; txnEventCount < batchSize; ++txnEventCount) {
        // 0) Read event from Channel
        Event event = channel.take();
        if (event == null) {
          break;
        }

        //1) Create end point by substituting place holders
        HiveEndPoint endPoint = makeEndPoint(metaStoreUri, database, table,
                partitionVals, event.getHeaders(), timeZone,
                needRounding, roundUnit, roundValue, useLocalTime);

        //2) Create or reuse Writer
        HiveWriter writer = getOrCreateWriter(activeWriters, endPoint);

        //3) Write
        LOG.debug("{} : Writing event to {}", getName(), endPoint);
        writer.write(event);

      } // for

      //4) Update counters
      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }
      sinkCounter.addToEventDrainAttemptCount(txnEventCount);

      // 5) Flush all Writers
      for (HiveWriter writer : activeWriters.values()) {
        writer.flush(true);
      }

      sinkCounter.addToEventDrainSuccessCount(txnEventCount);
      return txnEventCount;
    } catch (HiveWriter.Failure e) {
      // in case of error we close all TxnBatches to start clean next time
      LOG.warn(getName() + " : " + e.getMessage(), e);
      abortAllWriters();
      closeAllWriters();
      throw e;
    }
  }

  private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> activeWriters,
                                       HiveEndPoint endPoint)
          throws HiveWriter.ConnectException, InterruptedException {
    try {
      HiveWriter writer = allWriters.get( endPoint );
      if (writer == null) {
        LOG.info(getName() + ": Creating Writer to Hive end point : " + endPoint);
        writer = new HiveWriter(endPoint, txnsPerBatchAsk, autoCreatePartitions,
                callTimeout, callTimeoutPool, proxyUser, serializer, sinkCounter);

        sinkCounter.incrementConnectionCreatedCount();
        if (allWriters.size() > maxOpenConnections) {
          int retired = closeIdleWriters();
          if (retired == 0) {
            closeEldestWriter();
          }
        }
        allWriters.put(endPoint, writer);
        activeWriters.put(endPoint, writer);
      } else {
        if (activeWriters.get(endPoint) == null)  {
          activeWriters.put(endPoint,writer);
        }
      }
      return writer;
    } catch (HiveWriter.ConnectException e) {
      sinkCounter.incrementConnectionFailedCount();
      throw e;
    }
  }

  private HiveEndPoint makeEndPoint(String metaStoreUri, String database, String table,
                                    List<String> partVals, Map<String, String> headers,
                                    TimeZone timeZone, boolean needRounding,
                                    int roundUnit, Integer roundValue,
                                    boolean useLocalTime)  {
    if (partVals == null) {
      return new HiveEndPoint(metaStoreUri, database, table, null);
    }

    ArrayList<String> realPartVals = Lists.newArrayList();
    for (String partVal : partVals) {
      realPartVals.add(BucketPath.escapeString(partVal, headers, timeZone,
              needRounding, roundUnit, roundValue, useLocalTime));
    }
    return new HiveEndPoint(metaStoreUri, database, table, realPartVals);
  }

  public Status process() throws EventDeliveryException {
    // writers used in this Txn

    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    transaction.begin();
    boolean success = false;
    try {
      // 1 Enable Heart Beats
      if (timeToSendHeartBeat.compareAndSet(true, false)) {
        enableHeartBeatOnAllWriters();
      }

      // 2 Drain Batch
      int txnEventCount = drainOneBatch(channel);
      transaction.commit();
      success = true;

      // 3 Update Counters
      if (txnEventCount < 1) {
        return Status.BACKOFF;
      } else {
        return Status.READY;
      }
    } catch (InterruptedException err) {
      LOG.warn(getName() + ": Thread was interrupted.", err);
      return Status.BACKOFF;
    } catch (Exception e) {
      throw new EventDeliveryException(e);
    } finally {
      if (!success) {
        transaction.rollback();
      }
      transaction.close();
    }
  }

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-7 00:16

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表