最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

[基础] Hadoop心跳机制源码分析

[复制链接]
跳转到指定楼层
楼主
发表于 2015-5-20 15:10:02 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
本帖最后由 168主编 于 2015-5-20 15:13 编辑

  hadoop心跳机制源码分析
前言:


这些天遇到了一些感情方面的问题,着实比较痛苦。不过最终在亲人、朋友的开导下,我已度过了最艰难的那段时期。在这里我要谢谢我的爸爸,是你一直相信着我,支持着我,虽然我一年在家只能呆上一两个月,可我却一直能够感受到家的温暖;我要感谢马总,是你用切身的经历为我开导,是你告诉我该如何正确地对待。我要感谢亮仔,虽然你就是个小屁孩,但谢谢你喝醉了酒还要陪我回铁道,用几乎是孩子的话来安慰我。我要感谢汤帅,是你让我知道了,其实你的情况比我还糟糕(呵呵,开个玩笑)。还有信管院的那些美女程序媛们,我也要感谢你们,是你们让我看到了希望,你懂的。

突然发现,我已经很长时间没有写技术博客了,因为一直被各种各样的事情给耽搁着,以至于这篇文章到现在才发表出来。

正文:

一.体系背景

首先和大家说明一下:hadoop的心跳机制的底层是通过RPC机制实现的,这篇文章我只介绍心跳实现的代码,对于底层的具体实现,大家可以参考我的另几篇博客:

1. hadoop的RPC机制(参考:http://weixiaolu.iteye.com/blog/1504898
2. 动态代理(参考 :http://weixiaolu.iteye.com/blog/1477774
3. Java NIO(参考 :http://weixiaolu.iteye.com/blog/1479656

以上三篇文章和这篇文章完整地分析了hadoop的数据传输过程。大家可以当成一个体系来阅读。

二.心跳机制

1. hadoop集群是master/slave模式,master包括Namenode和Jobtracker,slave包括Datanode和Tasktracker。

2. master启动的时候,会开一个ipc server在那里,等待slave心跳。

3. slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,这个时间可 以通过”heartbeat.recheck.interval”属性来设置。将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。

4. 需要指出的是:namenode与datanode之间的通信,jobtracker与tasktracker之间的通信,都是通过“心跳”完成的。

三.Datanode、Namenode心跳源码分析

既然“心跳”是Datanode主动给Namenode发送的。那Datanode是怎么样发送的呢?下面贴出Datanode.class中的关键代码:

代码一:
Java代码  

  • /**
  •    * 循环调用“发送心跳”方法,直到shutdown
  •    * 调用远程Namenode的方法
  •    */  
  •   public void offerService() throws Exception {  
  • •••  
  •     while (shouldRun) {  
  •       try {  
  •         long startTime = now();  
  •          // heartBeatInterval是在启动Datanode时根据配置文件设置的,是心跳间隔时间  
  •         if (startTime - lastHeartbeat > heartBeatInterval) {  
  •           lastHeartbeat = startTime;  
  • //Datanode发送心跳  
  •           DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
  •                                                        data.getCapacity(),  
  •                                                        data.getDfsUsed(),  
  •                                                        data.getRemaining(),  
  •                                                        xmitsInProgress.get(),  
  •                                                        getXceiverCount());  
  •           myMetrics.addHeartBeat(now() - startTime);  
  •            
  •           if (!processCommand(cmds))  
  •             continue;  
  •         }  
  •          
  •       •••  
  •       }  
  •     } // while (shouldRun)  
  •   } // offerService  


需要注意的是:发送心跳的对象并不是datanode,而是一个名为namenode的对象,难道在datanode端就直接有个namenode的引用吗?其实不然,我们来看看这个namenode吧:

代码二:
Java代码

  • public DatanodeProtocol namenode = null;  


namenode其实是一个DatanodeProtocol的引用,在对hadoop RPC机制分析的文章中我提到过,这是一个Datanode和Namenode通信的协议,其中有许多未实现的接口方法,sendHeartbeat()就是其中的一个。下面看看这个namenode对象是怎么被实例化的吧:

代码三:
Java代码  [url=][/url]

  • this.namenode = (DatanodeProtocol)   
  •     RPC.waitForProxy(DatanodeProtocol.class,  
  •                      DatanodeProtocol.versionID,  
  •                      nameNodeAddr,   
  •                      conf);  


其实这个namenode并不是Namenode的一个对象,而只是一个Datanode端对Namenode的代理对象,正是这个代理完成了“心跳”。代理的底层实现就是RPC机制了。参考博客:http://weixiaolu.iteye.com/blog/1504898

四.Tasktracker、Jobtracker心跳源码分析

同样我们从Tasktracker入手,下面贴出Tasktracker.class的关键代码:

代码四:
Java代码

  • 代码一:  
  • State offerService() throws Exception {  
  •     long lastHeartbeat = System.currentTimeMillis();  
  •     while (running && !shuttingDown) {  
  •      •••  
  •          
  •         // 发送心跳,调用代码二  
  •         HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);  
  •   
  •       •••  
  •     return State.NORMAL;  
  •   }  
  •   
  • 代码二:  
  • HeartbeatResponse transmitHeartBeat(long now) throws IOException {  
  •    •••  
  •     HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
  •                                                               justStarted,  
  •                                                               justInited,  
  •                                                               askForNewTask,   
  •                                                          heartbeatResponseId);                                
  • •••  
  •     return heartbeatResponse;  
  •   }  


其实我觉得分析到这里大家就可以自己分析了,jobClient也是一个协议:

代码五:
Java代码  

  • InterTrackerProtocol jobClient;  


该协议用于定义Tasktracker和Jobtracker的通信。同样,它也是一个代理对象:

代码六:
Java代码

  • this.jobClient = (InterTrackerProtocol)   
  • UserGroupInformation.getLoginUser().doAs(  
  •      new PrivilegedExceptionAction<Object>() {  
  •    public Object run() throws IOException {  
  •      return RPC.waitForProxy(InterTrackerProtocol.class,  
  •          InterTrackerProtocol.versionID,  
  •          jobTrackAddr, fConf);  
  •    }  
  • });  

代理的底层实现也是RPC机制。参考博客:http://weixiaolu.iteye.com/blog/1504898

终于,hadoop底层通信整个系列的源码分析全部完成了。我可以好好地复习学校的功课了。呵呵。



楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞1 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

沙发
发表于 2015-6-6 22:29:36 | 只看该作者
看帖回帖是一种美德,感谢分享!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-20 07:08

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表