马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
1 系统介绍WhereHows是linkedin开源的数据发现平台,它从各种数据源收集元数据,标准化处理,提供统一的元数据服务。WhereHows的名字与warehouse谐音。同时,还包含了两层含义: - Where:数据在哪,即元数据收集。
- How:数据的前世今生,生产者、消费者是谁,即血缘。
在统一元数据的基础上,平台提供了一系列功能: - 数据发现:搜索数据。
- 数据血缘:跟踪数据集和任务的上下游。
- 工作流发现:通过项目、流、子流、任务的树结构,查看任务血缘。
- 提供页面访问及后台API。
2 架构介绍2.1 总览官方架构图: 上图中,除了缺少Kafka部分外,还是比较准确的。可以看到,核心是akka分布式框架。通过akka actor执行各个存储系统和调度系统元数据的收集,并转换为统一模型存入WhereHows的mysql中。而不同存储系统和调度系统的元数据的收集、转换、存储的逻辑存在差异,所以具体的ETL逻辑是通过jython脚本编写的,这样就实现了面对不同存储和引擎的扩展性。新增存储系统或者计算调度系统,只需要对应的ETL job jython脚本。 2.2 数据模型官方数据模型: 数据模型分为四部分: - 数据集:数据集schema、备注、样本等。
- 操作数据:流组、流、任务、任务依赖、执行、ID映射表等。
- 血缘数据:任务的输入和输出、依赖、分区、位点等。
- ETL和Web服务配置。
dataset_前缀的表与数据集相关;flow_和job_前缀的表与流和任务相关;cfg_前缀开头的表与物理配置相关;wh_前缀开头的表与内部定时执行的元数据收集任务相关,负责记录定时执行的任务信息,以及任务执行状态等。 最顶层的抽象是dataset数据集的概念,以URN作为数据集唯一标识,schema抽象了不同的数据结构。除此之外还包括app应用系统信息(cfg_application)和db数据库系统信息(cfg_database)。 dataset作为顶层抽象,通过dataset_id关联到底层实际物理元数据信息(dict_dataset_instance、cfg_object_name_map、cfg_database等),包括hdfs、hive、teradata、dalids等。其中有DB模型的(hive、teradata、dalids),也有非DB模型的(hdfs)。cfg_object_name_map.sub_type区分具体的物理类型。 任务执行信息和数据集之间通过job_execution_data_lineage关联,即血缘表。数据集和操作数据是两个端点,而血缘数据就是连接两者的桥梁。用户可以从一个数据集出发,找到它的生产者任务或消费者任务;也可以从一个任务触发,找到这个任务读\写的数据集。 2.3 ETLWhereHows使用java和jython语言。以java构建框架,定义每个ETL阶段的输入和输出;使用jython资源文件实现ETL处理过程。这使得结构更加清晰,对于需要自定义的内容一目了然。同时,提高了引入新的源系统的灵活性和扩展性。框架定义的元数据收集步骤按照ETL三阶段执行:extract、transform、load。 2.3.1 数据集ETL数据集元数据包括:schema、path、partition、sample data、watermark、owner等等。WhereHows收集了HDFS、Teradata等数据集元数据。 HDFS: 使用具有读权限的账号,扫描白名单下的目录,并收集数据集信息。程序将schemaFetch.jar文件copy到hadoop gateway上,执行扫描,再将结果copy回本地,经过转换存储到mysql。 Teradata: 直接在jython脚本中查询库中的元数据。 2.3.2 操作数据ETL操作数据包括:流组定义、流定义、流执行信息、任务定义、任务执行信息、owner信息、调度信息等。WhereHows的操作数据ETL定时从调度系统的数据库中爬取信息到WhereHows的数据库中。 - 痛点1:不同的调度系统具有不同的数据模型,需要将他们转换成统一的模型。例如:Azkaban的执行是在流级别,流执行具有唯一ID,但是任务执行没有唯一ID。反观Oozie,每个任务执行都具有唯一ID。为了解决这种情况,WhereHows结合流执行ID与任务在流中的顺序为Azkaban任务生成任务ID。同时,将所有的唯一ID映射为整型来适配相同的模型。每当添加调度系统,都要添加不同的映射规则将原生数据模型转换成WhereHows数据模型。
- 痛点2:很多源系统没有跟踪版本,或者版本很不直观。例如,Oozie和Azkaban这些调度系统没有流版本。在这种情况下,WhereHows收集流元数据之后,与老的DAG结构进行比较,在内部推导维护版本信息。WhereHows将这种推导的DAG版本用作所有操作数据的通用版本。
2.3.3 血缘ETL血缘信息包括:任务和数据集之间的依赖关系,读还是写,涉及多少条记录等。 How: 基本策略是从日志和配置文件中收集血缘信息。比如:一个Pig任务有很多Hadoop任务,在Hadoop任务的配置文件中可以看到“mapreduce.input.fileinputformat.inputdir”一类的属性,可以代表Hadoop任务的血缘。WhereHows收集所有子Hadoop任务的输入和输出,将它们聚合到父级别任务,并存储下来。 When: 血缘ETL任务需要一个入口点,即任务ID信息列表,根据这个列表提取血缘信息。WhereHows作为一个元数据仓库,延迟容忍度高,5到10分钟的延迟是可以接受的,所以采用“微批拉取”的解决方案。通过一个拉取任务,检查最近30分钟(窗口可配)有哪些任务结束,然后以此作为入口点,拉取它们的日志和配置文件分析血缘。 3 代码解读3.1 调度WhereHows使用了akka分布式并发框架,其中三个核心的actor包括:SchedulerActor、EtlJobActor、TreeBuilderActor。这三个actor以静态变量的形式注册在ActorRegistry中: [AppleScript] 纯文本查看 复制代码
public class ActorRegistry {
public static Scheduler scheduler = actorSystem.scheduler();
public static ActorRef schedulerActor = actorSystem.actorOf(Props.create(SchedulerActor.class), "SchedulerActor");
public static ActorRef etlJobActor =
actorSystem.actorOf(new SmallestMailboxPool(ETL_POOL_SIZE).props(Props.create(EtlJobActor.class)), "EtlJobActor");
public static ActorRef treeBuilderActor =
actorSystem.actorOf(Props.create(TreeBuilderActor.class), "TreeBuilderActor");
}
通过SchedulerUtil启动SchedulerActor,则启动了整个调度引擎。定时向SchedulerActor发送“checking”消息触发调度检查: [AppleScript] 纯文本查看 复制代码 public class SchedulerUtil {
public static synchronized void start(Long mins) {
if (schedulerRef != null) {
schedulerRef.cancel();
}
schedulerRef = ActorRegistry.scheduler
.schedule(Duration.create(0, TimeUnit.MILLISECONDS), Duration.create(mins, TimeUnit.MINUTES),
ActorRegistry.schedulerActor, "checking", ActorRegistry.dispatcher, null);
}
} 3.1.1 SchedulerActorSchedulerActor在收到“checking”消息后,从应用配置的目录位置,读取所有需要执行的job文件: 并从DB中读取已经记录的定时调度job(包括job的下一次调度时间)。 遍历所有的job文件,将每个job的下一次调度时间更新到DB中。如果已经记录的job中存在当前job,则判断是否到了调度时间,到了就执行,没到则跳过。而触发job执行就是将对应的job信息作为消息发送给EtlJobActor。 [AppleScript] 纯文本查看 复制代码 public void onReceive(Object message) throws Exception {
if (message.equals("checking")) {
runDueJobs();
}
}
private void runDueJobs() throws Exception {
Map<String, Properties> enabledJobs = JobsUtil.getScheduledJobs(ETL_JOBS_DIR);
Map<String, Long> scheduledJobs = getScheduledJobs();
long now = System.currentTimeMillis() / 1000;
for (Map.Entry<String, Properties> entry : enabledJobs.entrySet()) {
String etlJobName = entry.getKey();
Properties properties = entry.getValue();
EtlJobMessage etlMsg = new EtlJobMessage(etlJobName, properties);
// Schedule next run if a cron expr is defined.
String cronExpr = etlMsg.getCronExpr();
if (cronExpr != null) {
EtlJobDao.updateNextRun(etlJobName, cronExpr, new Date());
}
if (scheduledJobs.getOrDefault(etlJobName, Long.MAX_VALUE) > now) {
continue;
}
Long whExecId = EtlJobDao.insertNewRun(etlJobName);
etlMsg.setWhEtlExecId(whExecId);
if (Global.getCurrentRunningJob().contains(etlJobName)) {
EtlJobDao.endRun(etlMsg.getWhEtlExecId(), JobStatus.ERROR, "Previous is still running, Aborted!");
} else {
Global.getCurrentRunningJob().add(etlJobName);
ActorRegistry.etlJobActor.tell(etlMsg, getSelf());
}
}
} 3.1.2 EtlJobActorEtlJobActor的任务是启动一个新的进程执行SchedulerActor发送的job。这个新的进程的执行框架在Launcher中定义。 3.1.3 LauncherLauncher负责job的ETL的触发。ETL的实际处理class在job文件中定义,以HDFS_METADATA_ETL.job文件为例,该job用来提取HDFS数据集的元数据: [AppleScript] 纯文本查看 复制代码 # Fetch HDFS dataset metadata
# Common ETL configs
job.class=metadata.etl.dataset.hdfs.HdfsMetadataEtl
job.cron.expr=0 0 5 ? * TUE,THU,SUN *
job.timeout=3000
job.ref.id=11
# Cluster name
hdfs.cluster=your_cluster_name
# Run job on remote machine, true/false
hdfs.remote.mode=false
# Remote Hadoop gateway machine name (could be localhost)
hdfs.remote.machine=
# Private key used to log in to the remote machine
hdfs.private_key_location=
# JAR file location on the remote machine
hdfs.remote.jar=
# User login on remote machine
hdfs.remote.user=
# Metadata JSON file location on remote machine
hdfs.remote.raw_metadata=
# Sample data CSV file location on remote machine
hdfs.remote.sample=
# Place to store field metadata file
hdfs.local.field_metadata=
# Place to store metadata CSV file
hdfs.local.metadata=hdfs_metadata.csv
# Place to store metadata JSON file
hdfs.local.raw_metadata=hdfs_raw_metadata.json
# Place to store sample file
hdfs.local.sample=hdfs_sample.dat
# The whitelist of folder to collect metadata
hdfs.white_list=
# Optional. number of threads you want to scrape the HDFS
hdfs.num_of_thread=1
# Optional. The map of file path regex and dataset source. e.g. [{"/data/tracking.":"Kafka"},{"/data/retail.":"Teradata"}]
hdfs.file_path_regex_source_map=
文件中的“job.class”属性定义了实际执行的class为“metadata.etl.dataset.hdfs.HdfsMetadataEtl”。Launcher在实例化具体的job class之后,调用其run方法触发ETL执行。 3.2 ETL所有的ETL类都继承EtlJob。EtlJob定义了extract、transform、load三个抽象方法,分别对应着ETL的三个执行步骤,由具体的实现类实现。在EtlJob.run()方法中顺序执行三个方法: [AppleScript] 纯文本查看 复制代码 public abstract void extract() throws Exception;
public abstract void transform() throws Exception;
public abstract void load() throws Exception;
public void run() throws Exception {
setup();
extract();
transform();
load();
close();
}
以实现类HdfsMetadataEtl为例: (1)extract 将schemaFetch.jar拷贝到HDFS文件系统上,并执行。执行过程中,扫描白名单中的路径,将所有文件的元数据信息记录在HDFS文件系统的指定路径。执行结束后,将结果拷贝回本地。 (2)transform 根据名为“HdfsTransform.py”的jython脚本中的逻辑,对上一步中拷贝到本地的数据进行转换处理: [AppleScript] 纯文本查看 复制代码 public void transform() throws Exception {
InputStream inputStream = classLoader.getResourceAsStream("jython/HdfsTransform.py");
interpreter.execfile(inputStream);
inputStream.close();
}
(3)load 在名为“HdfsLoad.py”的jython脚本中,将transform转换结果存入DB: [AppleScript] 纯文本查看 复制代码 public void load() throws Exception {
InputStream inputStream = classLoader.getResourceAsStream("jython/HdfsLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
[/color][/size][/font][p=26, null, left][font=微软雅黑][size=3][color=#000000]} 3.3 KafkaWhereHows集成了kafka,可以通过消息触发元数据和血缘的更新。kafka的consumer和producer同样封装为akka的actor。 3.3.1 KafkaClientMasterKafkaClientMaster是主actor,在ApplicationStart中启动: [AppleScript] 纯文本查看 复制代码 public static void main(String[] args) {
final ActorSystem actorSystem = ActorSystem.create("KAFKA");
String kafkaJobDir = config.getString("kafka.jobs.dir");
actorSystem.actorOf(Props.create(KafkaClientMaster.class, kafkaJobDir), "KafkaMaster");
}
其中,配置属性“kafka.jobs.dir”代表kafka消费/生产配置文件的目录。 KafkaClientMaster在preStart阶段从“kafka.jobs.dir”获取所有的kafka任务文件,遍历文件,构造对应的KafkaWorker(actor)并启动。在构造KafkaWorker的过程中,根据文件中的consumer topic和consumer属性构造kafka consumer,作为KafkaWorker的数据来源;根据“kafka.processor”属性实例化处理数据的Processor class;如果processor在处理过程中需要发送消息,则根据producer topic和producer属性构造kafka producer。通过向KafkaWorker发送“WORKER_START”消息触发kafka consumer的消费。 KafkaClientMaster由于在preStart阶段已经启动了所有worker,不需要接收、处理消息。 [AppleScript] 纯文本查看 复制代码 public void preStart() throws Exception {
_kafkaJobList = JobsUtil.getEnabledJobs(KAFKA_JOB_DIR);
log.info("Kafka jobs: {}", _kafkaJobList.keySet());
if (_kafkaJobList.size() == 0) {
context().stop(getSelf());
return;
}
log.info("Starting KafkaClientMaster...");
for (Map.Entry<String, Properties> entry : _kafkaJobList.entrySet()) {
// handle one kafka topic
final String kafkaJobName = entry.getKey();
final Properties props = entry.getValue();
final int numberOfWorkers = Integer.parseInt(props.getProperty(Constant.KAFKA_WORKER_COUNT, "1"));
log.info("Create Kafka client with config: " + props);
try {
// create worker
for (int i = 0; i < numberOfWorkers; i++) {
ActorRef worker = makeKafkaWorker(kafkaJobName, props);
_kafkaWorkers.add(worker);
worker.tell(KafkaWorker.WORKER_START, getSelf());
log.info("Started Kafka worker #{} for job {}", i, kafkaJobName);
}
} catch (Exception e) {
log.error("Error starting Kafka job: " + kafkaJobName, e);
}
}
} 3.3.2 KafkaWorkerKafkaWorker收到“WORKER_START”消息后,开始不断从kafka consumer拉取数据,并传给processor处理: [AppleScript] 纯文本查看 复制代码 public void onReceive(@Nonnull Object message) throws Exception {
if (!message.equals(WORKER_START)) {
log.warn("Must send WORKER_START message first!");
unhandled(message);
}
while (RUNNING) {
ConsumerRecords<String, IndexedRecord> records;
try {
records = _consumer.poll(POLL_TIMEOUT_MS);
} catch (SerializationException e) {
log.error("Serialization Error: ", e);
moveOffset(extractPartition(e), 1);
continue;
}
try {
process(records);
} catch (Exception e) {
log.error("Unhandled processing exception: ", e);
break;
}
}
getContext().stop(getSelf());
}
private void process(@Nonnull ConsumerRecords<String, IndexedRecord> records) {
for (ConsumerRecord<String, IndexedRecord> record : records) {
_receivedRecordCount++;
_processor.process(record.value());
if (_receivedRecordCount % 1000 == 0) {
log.info("{}: received {} messages", _topic, _receivedRecordCount);
}
}
_consumer.commitAsync();
} 3.3.3 KafkaMessageProcessor上面提到“kafka.processor”属性指定实际处理消息的class。所有处理kafka消息的class都继承抽象类KafkaMessageProcessor,并实现process抽象方法。WhereHows提供了三个KafkaMessageProcessor实现类:MetadataChangeProcessor、MetadataInventoryProcessor、MetadataLineageProcessor。 MetadataChangeProcessor 从命名可以看出来,MetadataChangeProcessor处理单个数据集元数据的变更,输入消息为MetadataChangeEvent。它解析消息内容,并将变更作用于存储中。MetadataChangeEvent的changeType字段代表操作类型,包括创建、更新、删除。 处理失败的消息再通过kafka发送出去,便于记录、跟踪和重试。 MetadataInventoryProcessor MetadataInventoryProcessor负责某个集群的数据集的批量处理。目前WhereHows代码中,只实现了删除。MetadataInventoryProcessor处理的消息为MetadataInventoryEvent,其中包含某个集群当前所有的数据集名称(nativeNames)。MetadataInventoryProcessor先根据消息中的集群信息,查询该集群在WhereHows中已存的数据集名称,再将存量数据集名称与消息中的数据集名称列表做减法,得到需要从元数据中删除的数据集名称。最后将这些需要删除的数据集包装为MetadataChangeEvent通过kafka发送出去,由上面的MetadataChangeProcessor进行消费处理。 MetadataLineageProcessor MetadataLineageProcessor处理的消息是MetadataLineageEvent。MetadataLineageProcessor遍历消息中的血缘列表,对每个血缘的source列表和destination列表进行去重和校验,然后将血缘列表插入到数据库中。不过入库这一步暂未实现。 4 总结WhereHows作为一个post-hoc型数据集管理系统,可以认为是GOODS思想的一个具体实现,高扩展性是其一个优点。引入新的源系统,可以通过jython脚本动态引入ETL逻辑,但仍要写java代码实现ETL job,灵活性在这里受到了限制。 5 参考资料[1] WhereHows官方:https://github.com/linkedin/WhereHows
|