最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[实践案例] WhereHows解读

[复制链接]
跳转到指定楼层
楼主
发表于 2021-1-26 16:31:57 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
1 系统介绍

WhereHows是linkedin开源的数据发现平台,它从各种数据源收集元数据,标准化处理,提供统一的元数据服务。WhereHows的名字与warehouse谐音。同时,还包含了两层含义:

  • Where:数据在哪,即元数据收集。
  • How:数据的前世今生,生产者、消费者是谁,即血缘。

在统一元数据的基础上,平台提供了一系列功能:

  • 数据发现:搜索数据。
  • 数据血缘:跟踪数据集和任务的上下游。
  • 工作流发现:通过项目、流、子流、任务的树结构,查看任务血缘。
  • 提供页面访问及后台API。
2 架构介绍2.1 总览

官方架构图:

上图中,除了缺少Kafka部分外,还是比较准确的。可以看到,核心是akka分布式框架。通过akka actor执行各个存储系统和调度系统元数据的收集,并转换为统一模型存入WhereHows的mysql中。而不同存储系统和调度系统的元数据的收集、转换、存储的逻辑存在差异,所以具体的ETL逻辑是通过jython脚本编写的,这样就实现了面对不同存储和引擎的扩展性。新增存储系统或者计算调度系统,只需要对应的ETL job jython脚本。

2.2 数据模型

官方数据模型:

数据模型分为四部分:

  • 数据集:数据集schema、备注、样本等。
  • 操作数据:流组、流、任务、任务依赖、执行、ID映射表等。
  • 血缘数据:任务的输入和输出、依赖、分区、位点等。
  • ETL和Web服务配置。

dataset_前缀的表与数据集相关;flow_和job_前缀的表与流和任务相关;cfg_前缀开头的表与物理配置相关;wh_前缀开头的表与内部定时执行的元数据收集任务相关,负责记录定时执行的任务信息,以及任务执行状态等。

最顶层的抽象是dataset数据集的概念,以URN作为数据集唯一标识,schema抽象了不同的数据结构。除此之外还包括app应用系统信息(cfg_application)和db数据库系统信息(cfg_database)。

dataset作为顶层抽象,通过dataset_id关联到底层实际物理元数据信息(dict_dataset_instance、cfg_object_name_map、cfg_database等),包括hdfs、hive、teradata、dalids等。其中有DB模型的(hive、teradata、dalids),也有非DB模型的(hdfs)。cfg_object_name_map.sub_type区分具体的物理类型。

任务执行信息和数据集之间通过job_execution_data_lineage关联,即血缘表。数据集和操作数据是两个端点,而血缘数据就是连接两者的桥梁。用户可以从一个数据集出发,找到它的生产者任务或消费者任务;也可以从一个任务触发,找到这个任务读\写的数据集。

2.3 ETL

WhereHows使用java和jython语言。以java构建框架,定义每个ETL阶段的输入和输出;使用jython资源文件实现ETL处理过程。这使得结构更加清晰,对于需要自定义的内容一目了然。同时,提高了引入新的源系统的灵活性和扩展性。框架定义的元数据收集步骤按照ETL三阶段执行:extract、transform、load。

2.3.1 数据集ETL

数据集元数据包括:schema、path、partition、sample data、watermark、owner等等。WhereHows收集了HDFS、Teradata等数据集元数据。

HDFS:

使用具有读权限的账号,扫描白名单下的目录,并收集数据集信息。程序将schemaFetch.jar文件copy到hadoop gateway上,执行扫描,再将结果copy回本地,经过转换存储到mysql。

Teradata:

直接在jython脚本中查询库中的元数据。

2.3.2 操作数据ETL

操作数据包括:流组定义、流定义、流执行信息、任务定义、任务执行信息、owner信息、调度信息等。WhereHows的操作数据ETL定时从调度系统的数据库中爬取信息到WhereHows的数据库中。

  • 痛点1:不同的调度系统具有不同的数据模型,需要将他们转换成统一的模型。例如:Azkaban的执行是在流级别,流执行具有唯一ID,但是任务执行没有唯一ID。反观Oozie,每个任务执行都具有唯一ID。为了解决这种情况,WhereHows结合流执行ID与任务在流中的顺序为Azkaban任务生成任务ID。同时,将所有的唯一ID映射为整型来适配相同的模型。每当添加调度系统,都要添加不同的映射规则将原生数据模型转换成WhereHows数据模型。
  • 痛点2:很多源系统没有跟踪版本,或者版本很不直观。例如,Oozie和Azkaban这些调度系统没有流版本。在这种情况下,WhereHows收集流元数据之后,与老的DAG结构进行比较,在内部推导维护版本信息。WhereHows将这种推导的DAG版本用作所有操作数据的通用版本。
2.3.3 血缘ETL

血缘信息包括:任务和数据集之间的依赖关系,读还是写,涉及多少条记录等。

How:

基本策略是从日志和配置文件中收集血缘信息。比如:一个Pig任务有很多Hadoop任务,在Hadoop任务的配置文件中可以看到“mapreduce.input.fileinputformat.inputdir”一类的属性,可以代表Hadoop任务的血缘。WhereHows收集所有子Hadoop任务的输入和输出,将它们聚合到父级别任务,并存储下来。

When:

血缘ETL任务需要一个入口点,即任务ID信息列表,根据这个列表提取血缘信息。WhereHows作为一个元数据仓库,延迟容忍度高,5到10分钟的延迟是可以接受的,所以采用“微批拉取”的解决方案。通过一个拉取任务,检查最近30分钟(窗口可配)有哪些任务结束,然后以此作为入口点,拉取它们的日志和配置文件分析血缘。

3 代码解读3.1 调度

WhereHows使用了akka分布式并发框架,其中三个核心的actor包括:SchedulerActor、EtlJobActor、TreeBuilderActor。这三个actor以静态变量的形式注册在ActorRegistry中:

[AppleScript] 纯文本查看 复制代码
public class ActorRegistry {
 
  public static Scheduler scheduler = actorSystem.scheduler();
 
  public static ActorRef schedulerActor = actorSystem.actorOf(Props.create(SchedulerActor.class), "SchedulerActor");
 
  public static ActorRef etlJobActor =
      actorSystem.actorOf(new SmallestMailboxPool(ETL_POOL_SIZE).props(Props.create(EtlJobActor.class)), "EtlJobActor");
 
  public static ActorRef treeBuilderActor =
      actorSystem.actorOf(Props.create(TreeBuilderActor.class), "TreeBuilderActor");
}

通过SchedulerUtil启动SchedulerActor,则启动了整个调度引擎。定时向SchedulerActor发送“checking”消息触发调度检查:

[AppleScript] 纯文本查看 复制代码
public class SchedulerUtil {
 
  public static synchronized void start(Long mins) {
    if (schedulerRef != null) {
      schedulerRef.cancel();
    }
 
    schedulerRef = ActorRegistry.scheduler
      .schedule(Duration.create(0, TimeUnit.MILLISECONDS), Duration.create(mins, TimeUnit.MINUTES),
        ActorRegistry.schedulerActor, "checking", ActorRegistry.dispatcher, null);
  }
 }

3.1.1 SchedulerActor

SchedulerActor在收到“checking”消息后,从应用配置的目录位置,读取所有需要执行的job文件:

并从DB中读取已经记录的定时调度job(包括job的下一次调度时间)。

遍历所有的job文件,将每个job的下一次调度时间更新到DB中。如果已经记录的job中存在当前job,则判断是否到了调度时间,到了就执行,没到则跳过。而触发job执行就是将对应的job信息作为消息发送给EtlJobActor。

[AppleScript] 纯文本查看 复制代码
public void onReceive(Object message) throws Exception {
    if (message.equals("checking")) {
      runDueJobs();
    }
  }
 
  private void runDueJobs() throws Exception {
    Map<String, Properties> enabledJobs = JobsUtil.getScheduledJobs(ETL_JOBS_DIR);
    Map<String, Long> scheduledJobs = getScheduledJobs();
 
    long now = System.currentTimeMillis() / 1000;
    for (Map.Entry<String, Properties> entry : enabledJobs.entrySet()) {
      String etlJobName = entry.getKey();
      Properties properties = entry.getValue();
      EtlJobMessage etlMsg = new EtlJobMessage(etlJobName, properties);
 
      // Schedule next run if a cron expr is defined.
      String cronExpr = etlMsg.getCronExpr();
      if (cronExpr != null) {
        EtlJobDao.updateNextRun(etlJobName, cronExpr, new Date());
      }
 
      if (scheduledJobs.getOrDefault(etlJobName, Long.MAX_VALUE) > now) {
        continue;
      }
 
      Long whExecId = EtlJobDao.insertNewRun(etlJobName);
      etlMsg.setWhEtlExecId(whExecId);
 
      if (Global.getCurrentRunningJob().contains(etlJobName)) {
        EtlJobDao.endRun(etlMsg.getWhEtlExecId(), JobStatus.ERROR, "Previous is still running, Aborted!");
      } else {
        Global.getCurrentRunningJob().add(etlJobName);
        ActorRegistry.etlJobActor.tell(etlMsg, getSelf());
      }
    }
}

3.1.2 EtlJobActor

EtlJobActor的任务是启动一个新的进程执行SchedulerActor发送的job。这个新的进程的执行框架在Launcher中定义。

3.1.3 Launcher

Launcher负责job的ETL的触发。ETL的实际处理class在job文件中定义,以HDFS_METADATA_ETL.job文件为例,该job用来提取HDFS数据集的元数据:

[AppleScript] 纯文本查看 复制代码
# Fetch HDFS dataset metadata

# Common ETL configs

job.class=metadata.etl.dataset.hdfs.HdfsMetadataEtl

job.cron.expr=0 0 5 ? * TUE,THU,SUN *

job.timeout=3000

job.ref.id=11

# Cluster name

hdfs.cluster=your_cluster_name

# Run job on remote machine, true/false

hdfs.remote.mode=false

# Remote Hadoop gateway machine name (could be localhost)

hdfs.remote.machine=

# Private key used to log in to the remote machine

hdfs.private_key_location=

# JAR file location on the remote machine

hdfs.remote.jar=

# User login on remote machine

hdfs.remote.user=

# Metadata JSON file location on remote machine

hdfs.remote.raw_metadata=

# Sample data CSV file location on remote machine

hdfs.remote.sample=

# Place to store field metadata file

hdfs.local.field_metadata=

# Place to store metadata CSV file

hdfs.local.metadata=hdfs_metadata.csv

# Place to store metadata JSON file

hdfs.local.raw_metadata=hdfs_raw_metadata.json

# Place to store sample file

hdfs.local.sample=hdfs_sample.dat

# The whitelist of folder to collect metadata

hdfs.white_list=

# Optional. number of threads you want to scrape the HDFS

hdfs.num_of_thread=1

# Optional. The map of file path regex and dataset source. e.g. [{"/data/tracking.":"Kafka"},{"/data/retail.":"Teradata"}]

hdfs.file_path_regex_source_map=

文件中的“job.class”属性定义了实际执行的class为“metadata.etl.dataset.hdfs.HdfsMetadataEtl”。Launcher在实例化具体的job class之后,调用其run方法触发ETL执行。

3.2 ETL

所有的ETL类都继承EtlJob。EtlJob定义了extract、transform、load三个抽象方法,分别对应着ETL的三个执行步骤,由具体的实现类实现。在EtlJob.run()方法中顺序执行三个方法:

[AppleScript] 纯文本查看 复制代码
public abstract void extract() throws Exception;
  public abstract void transform() throws Exception;
  public abstract void load() throws Exception;
  public void run() throws Exception {
    setup();
    extract();
    transform();
    load();
    close();
  }

以实现类HdfsMetadataEtl为例:

(1)extract

将schemaFetch.jar拷贝到HDFS文件系统上,并执行。执行过程中,扫描白名单中的路径,将所有文件的元数据信息记录在HDFS文件系统的指定路径。执行结束后,将结果拷贝回本地。

(2)transform

根据名为“HdfsTransform.py”的jython脚本中的逻辑,对上一步中拷贝到本地的数据进行转换处理:

[AppleScript] 纯文本查看 复制代码
public void transform() throws Exception {
    InputStream inputStream = classLoader.getResourceAsStream("jython/HdfsTransform.py");
    interpreter.execfile(inputStream);
    inputStream.close();
  }

(3)load

在名为“HdfsLoad.py”的jython脚本中,将transform转换结果存入DB:

[AppleScript] 纯文本查看 复制代码
public void load() throws Exception {
    InputStream inputStream = classLoader.getResourceAsStream("jython/HdfsLoad.py");
    interpreter.execfile(inputStream);
    inputStream.close();
[/color][/size][/font][p=26, null, left][font=微软雅黑][size=3][color=#000000]}

3.3 Kafka

WhereHows集成了kafka,可以通过消息触发元数据和血缘的更新。kafka的consumer和producer同样封装为akka的actor。

3.3.1 KafkaClientMaster

KafkaClientMaster是主actor,在ApplicationStart中启动:

[AppleScript] 纯文本查看 复制代码
public static void main(String[] args) {
    final ActorSystem actorSystem = ActorSystem.create("KAFKA");
    String kafkaJobDir = config.getString("kafka.jobs.dir");
    actorSystem.actorOf(Props.create(KafkaClientMaster.class, kafkaJobDir), "KafkaMaster");
  }


其中,配置属性“kafka.jobs.dir”代表kafka消费/生产配置文件的目录。

KafkaClientMaster在preStart阶段从“kafka.jobs.dir”获取所有的kafka任务文件,遍历文件,构造对应的KafkaWorker(actor)并启动。在构造KafkaWorker的过程中,根据文件中的consumer topic和consumer属性构造kafka consumer,作为KafkaWorker的数据来源;根据“kafka.processor”属性实例化处理数据的Processor class;如果processor在处理过程中需要发送消息,则根据producer topic和producer属性构造kafka producer。通过向KafkaWorker发送“WORKER_START”消息触发kafka consumer的消费。

KafkaClientMaster由于在preStart阶段已经启动了所有worker,不需要接收、处理消息。

[AppleScript] 纯文本查看 复制代码
public void preStart() throws Exception {
 
    _kafkaJobList = JobsUtil.getEnabledJobs(KAFKA_JOB_DIR);
    log.info("Kafka jobs: {}", _kafkaJobList.keySet());
 
    if (_kafkaJobList.size() == 0) {
      context().stop(getSelf());
      return;
    }
    log.info("Starting KafkaClientMaster...");
 
    for (Map.Entry<String, Properties> entry : _kafkaJobList.entrySet()) {
      // handle one kafka topic
      final String kafkaJobName = entry.getKey();
      final Properties props = entry.getValue();
 
      final int numberOfWorkers = Integer.parseInt(props.getProperty(Constant.KAFKA_WORKER_COUNT, "1"));
 
      log.info("Create Kafka client with config: " + props);
      try {
        // create worker
        for (int i = 0; i < numberOfWorkers; i++) {
          ActorRef worker = makeKafkaWorker(kafkaJobName, props);
          _kafkaWorkers.add(worker);
          worker.tell(KafkaWorker.WORKER_START, getSelf());
          log.info("Started Kafka worker #{} for job {}", i, kafkaJobName);
        }
      } catch (Exception e) {
        log.error("Error starting Kafka job: " + kafkaJobName, e);
      }
    }
  }

3.3.2 KafkaWorker

KafkaWorker收到“WORKER_START”消息后,开始不断从kafka consumer拉取数据,并传给processor处理:

[AppleScript] 纯文本查看 复制代码
public void onReceive(@Nonnull Object message) throws Exception {
    if (!message.equals(WORKER_START)) {
      log.warn("Must send WORKER_START message first!");
      unhandled(message);
    }
 
    while (RUNNING) {
      ConsumerRecords<String, IndexedRecord> records;
      try {
        records = _consumer.poll(POLL_TIMEOUT_MS);
      } catch (SerializationException e) {
        log.error("Serialization Error: ", e);
        moveOffset(extractPartition(e), 1);
        continue;
      }
 
      try {
        process(records);
      } catch (Exception e) {
        log.error("Unhandled processing exception: ", e);
        break;
      }
    }
 
    getContext().stop(getSelf());
  }
 
  private void process(@Nonnull ConsumerRecords<String, IndexedRecord> records) {
    for (ConsumerRecord<String, IndexedRecord> record : records) {
      _receivedRecordCount++;
      _processor.process(record.value());
      if (_receivedRecordCount % 1000 == 0) {
        log.info("{}: received {} messages", _topic, _receivedRecordCount);
      }
    }
    _consumer.commitAsync();
}

3.3.3 KafkaMessageProcessor

上面提到“kafka.processor”属性指定实际处理消息的class。所有处理kafka消息的class都继承抽象类KafkaMessageProcessor,并实现process抽象方法。WhereHows提供了三个KafkaMessageProcessor实现类:MetadataChangeProcessor、MetadataInventoryProcessor、MetadataLineageProcessor。

MetadataChangeProcessor

从命名可以看出来,MetadataChangeProcessor处理单个数据集元数据的变更,输入消息为MetadataChangeEvent。它解析消息内容,并将变更作用于存储中。MetadataChangeEvent的changeType字段代表操作类型,包括创建、更新、删除。

处理失败的消息再通过kafka发送出去,便于记录、跟踪和重试。

MetadataInventoryProcessor

MetadataInventoryProcessor负责某个集群的数据集的批量处理。目前WhereHows代码中,只实现了删除。MetadataInventoryProcessor处理的消息为MetadataInventoryEvent,其中包含某个集群当前所有的数据集名称(nativeNames)。MetadataInventoryProcessor先根据消息中的集群信息,查询该集群在WhereHows中已存的数据集名称,再将存量数据集名称与消息中的数据集名称列表做减法,得到需要从元数据中删除的数据集名称。最后将这些需要删除的数据集包装为MetadataChangeEvent通过kafka发送出去,由上面的MetadataChangeProcessor进行消费处理。

MetadataLineageProcessor

MetadataLineageProcessor处理的消息是MetadataLineageEvent。MetadataLineageProcessor遍历消息中的血缘列表,对每个血缘的source列表和destination列表进行去重和校验,然后将血缘列表插入到数据库中。不过入库这一步暂未实现。

4 总结

WhereHows作为一个post-hoc型数据集管理系统,可以认为是GOODS思想的一个具体实现,高扩展性是其一个优点。引入新的源系统,可以通过jython脚本动态引入ETL逻辑,但仍要写java代码实现ETL job,灵活性在这里受到了限制。

5 参考资料

[1] WhereHows官方:https://github.com/linkedin/WhereHows


楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-20 22:31

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表