最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

Dump Plugin并行化实践

[复制链接]
跳转到指定楼层
楼主
发表于 2014-12-28 18:14:45 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

先简单介绍下Dump Plugin的由来,在搜索Dump中心服务化的项目中,我们把Dump中心的增量数据产出分为2个阶段,Loader阶段和Join阶段,Loader阶段把数据准备成Key-Values形式,Join阶段将数据取出,计算各种业务逻辑并产出最终数据。业务逻辑的计算是相当繁琐且易出错,这类事情做一遍足以,所以设计了一个接口,按照业务自身划分成一个个小块逻辑实现接口。这些个小业务逻辑模块即构成Dump的业务Plugin。
这样做的好处:
1,  按业务本身划分,结构相对清晰,容易维护。
2,  架构和业务通过接口交互,重构架构将尽可能少的影响业务代码
3,  每个业务模块的耗时能准确统计出并能做针对性的优化。

在最初的版本中,先根据依赖关系计算好plugin的执行顺序,然后顺序执行,是一个串行的过程,如下图:

此种方式,计算耗时与业务的复杂程度成正比。而目前Dump中心已经有十几个个业务逻辑Plugin,并且plugin之间有复杂的依赖关系。所以我们尝试用更高效的并发方式去运行这些plugin。这个项目用的开发语言是Java,Java的多线程有多种成熟的设计模式,结合现有框架,我们设计了两种方案并分别尝试。

方案1,以单条数据为粒度,在一条数据的运行内部实现并行化,如下图:

简单的来说,就是起一个工作线程组来运行plugin,来一条数据后,工作线程根据依赖关系获取当前可运行的plugin,当所有plugin都运行完毕后,输出数据。类似于Work Thread模式,工作线程没数据就等着,来了数据就做。主要代码流程如下:

[AppleScript] 纯文本查看 复制代码
public class Main {
 private Semaphore mainSemaphore, workSemaphore;
 private Data data;
 private int workThreadNum;
 
 public Data run(Data data) {
   this.data = data;
   workSemaphore.release(workThreadNum);
   mainSemaphore.acquire(workThreadNum);
   return this.data;
 }
 
 class WorkThread implements Runnable {
 private boolean loop = true;
 public void run() {
   while(loop) {
     workSemaphore.acquire();
     //getValidPlugin: 一个synchronized的调用,获得未运行的Plguin
     Plugin plugin = getValidPlugin();
     if(plugin != null)
       plugin.run(data);
     else
       mainSemaphore.release(1);
     }
   }
 }
}

代码中使用两个Semaphore信号量来同步主线程和工作线程,每条数据都需要激活和同步,并有一个synchronized的方法来获取当前可运行的Plugin,线程同步开销比较大。实现过程中,采用重任务优先,预先计算等方法,降低并行额外引入的开销。在单个Plugin耗时长,关键路径和非关键路径上的plugin耗时相差不大的情况下,此种方案效果不错。但在目前的业务情况下,效果提升不明显,实测约提升了10%。

通过分析plugin的依赖关系,发现目前业务逻辑下,有两个耗时大的plugin均是关键路径上的,方案1的并行是针对单个宝贝的,我们想能否在批量数据或数据流中实现数据维度的并行。数据维度的并行,最简单的方案是将数据逐条扔给ThreadPoolExecutor,每个线程串行执行,但这种方案对于现有结构来说不合适,原因是plugin的代码无法保证线程安全,于是就有了方案2,如下图:

每个Plugin都起一个工作线程,数据像流水线一样从Plugin中间流过,plugin的依赖关系决定数据的流向,类似于Guarded Suspension模式,工作线程维护一个Queue来缓存,等plugin准备好,就从Queue中取数据处理。主要代码流程如下:

[AppleScript] 纯文本查看 复制代码
public interface QueuePutter {
 public void put(Data data);
}
 
public class Main implements QueuePutter{
 private BlockingQueue<Data> resultQueue = new LinkedBlockingQueue<Data>();
 
 public List<Data> run(List<Data> dataList) {
   List<Data> resultList = new ArrayList<Data>();
   for(Data data : dataList) {
     firstPluginThread.put(data);
   }
   putLastData();
   while(true) {
     Data data = resultQueue.take();
     if(isLastData(data)) break;
     resultList.add(data);
   }
   return resultList;
 }
 
 public void put(Data data) {
   this.resultQueue.put(data);
 }
}
 
public class PluginThread implements Runnable,QueuePutter {
   private Plugin plugin = null;
   private PluginThread nextPluginThread = null;
   private boolean loop = true;
   private BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
 
   public PluginThread(Plugin plugin, QueuePutter next) {
     this.plugin = plugin;
     this.nextPluginThread = next;
   }
 
   public void run() {
     while(loop) {
       Data data = this.queue.take();
       data = this.plugin.run(data);
       this.nextPluginThread.put(data);
     }
   }
 
   public void put(Data data) {
     this.queue.put(data);
   }
 }


代码中同步操作通过BlockingQueue来实现。主线程将数据分发给第一个plugin线程,而最后一个plugin线程负责将数据写回给主线程。主线程用一条特殊的数据来标识这组数据的结尾,而后在主线程队列里一直扫描特殊数据,FIFO队列保证了处理的时序。逻辑上来说,方案2的单条数据的处理还是串行,而是多条数据之间的并行,整体性能只取决于最慢的Plugin的耗时,实测中对于批量数据来说,效果要好于方案1。

总结:实践Dump Plugin并行的两种实现方式,对单数据的列并行和对批量数据/数据流的行并行。



楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-5-3 00:08

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表