最具影响力的数字化技术在线社区

168大数据

 找回密码
 立即注册

QQ登录

只需一步,快速开始

1 2 3 4 5
打印 上一主题 下一主题
开启左侧

scala 基础 5

[复制链接]
跳转到指定楼层
楼主
发表于 2018-3-16 13:27:47 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

马上注册,结交更多数据大咖,获取更多知识干货,轻松玩转大数据

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

Actor
为什么要学习actor?
我们现在学的Scala Actor是Scala 2.10.x版本及以前版本的Actor
Scala在2.11.x版本中将Akka加入其中,作为默认的Actor,老版本的Actor已经废弃
我们学习Actor的目的就是为了学习Akka做铺垫

什么是Actor?
Actor是消息并发模型
Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制。
Scala是运用消息(message)的发送、接收来实现多线程的。
使用Scala能够更容易地实现多线程应用的开发。

Java并发编程与Scala Actor编程的区别
Scala的Actor类似于Java中的多线程编程,但是不同的是,Scala的Actor提供的模型与多线程有所不同。
Scala的Actor尽可能地避免锁和共享状态,从而避免多线程并发时出现资源争用的情况。进而提升多线程编程的性能。
此外,Scala Actor的这种模型还可以避免死锁等一系列传统多线程编程的问题。
原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,
控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。
而在Scala中,我们可以通过复制不可变状态的资源(即对象,Scala中一切都是对象,连函数、方法也是)的一个副本,
再基于Actor的消息发送、接收机制进行并行编程

Actor方法执行顺序
1.调用start()方法启动Actor
2.执行act()方法
3.向Actor发送消息

发送消息的方式
! 发送异步消息,没有返回值
!? 发送同步消息,等待返回值
!! 发送异步消息,返回值是Future[Any]

Akka简介
Spark的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现
Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松实现分布式RPC功能。
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信
每个Actor都有自己的收件箱(MailBox)。通过Actor能够简化锁及线程管理,
可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
1.提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
2.提供了异步非阻塞的、高性能的事件驱动编程模型
3.超级轻量级事件处理(每GB堆内存几百万Actor)

ActorSystem
在Akka中,ActorSystem是一个重量级的结构
他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象
我们可以使用这个ActorSystem创建很多Actor

Actor
在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法
1.preStart()方法:该方法在Actor对象构造方法执行后执行,整 个Actor生命周期中仅执行一次
2.receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行


掌握的内容
1.创建Actor
2.Actor的消息接受和发送
3.用Actor并发编程实现WordCount
actor的简单实现


import scala.actors.Actorobject ActorDemo1 extends Actor{override def act(): Unit = {for(i <- 1 to 10){println(s"actor1i")Thread.sleep(1000) }}}object ActorDemo2 extends Actor{override def act(): Unit = {for(i <- 1 to 10){println(s"actor2i")Thread.sleep(2000)}}}object ActorTest{def main(args: Array[String): Unit = {ActorDemo1.start()ActorDemo2.start()}}

actor通信

class ActorDemo3 extends Actor {override def act(): Unit = { while(true){receive({case "start" => {println("starting...")sender ! "started"}case AsynMsg(id,msg) => {println(s"idid,AsynMsgmsg")Thread.sleep(2000) sender ! ReplyMeg(10,"success")}case SyncMsg(id,msg) => {println(s"idid,AsynMsgmsg")Thread.sleep(2000)sender ! ReplyMeg(20,"success")}})}}}object ActorDemo3{def main(args: Array[String): Unit = {val demo = new ActorDemo3demo.start()//异步发送消息,没有返回值demo ! AsynMsg(1,"hainiu1")println("没有返回值的异步消息发送完成")//同步发送消息,线程等待返回值 val res: Any = demo !? SyncMsg(2,"hainiu2")println("有返回值的同步消息发送完成")println(res)//异步发送消息,有返回值,返回类型是Future[Any]val res2: Future[Any = demo !! AsynMsg(3,"hainiu3")Thread.sleep(3000)println("有返回值的异步消息发送完成") if(res2.isSet){val value = res2.apply()println(value)}else{print("None")}//发送string的同步有返回值,当actor中没有写相应的返回逻辑的时候 !? 方法会一直阻塞val res3: Any = demo !? "start"println(res3)}}case class AsynMsg(id:Int,msg:String)case class SyncMsg(id:Int,msg:String)//参数的长度最多是23个case class ReplyMeg(id:Int,msg:String)

用actor实现wordcount

import scala.actors.{Actor, Future}import scala.collection.mutable.ListBufferimport scala.io.Sourceobject ActorWordCount {def main(args: Array[String): Unit = {val files: Array[String = Array("C:\\tmp\\input\\111.txt", "C:\\tmp\\input\\111.txt", "C:\\tmp\\input\\111.txt")//存放接收到的每个actor处理的结果数据val replys: ListBuffer[Future[Any = new ListBuffer[Future[Any//存放有actor返回结果的Future数据val contens: ListBuffer[Map[String, Int = new ListBuffer[Map[String, Intfor (file <- files) {//      val linesist[String] = Source.fromFile(file).getLines().toList//      val wordsist[String] = lines.flatMap(_.split(" ")) //      val res: Map[String, Int] = words.map((_,1)).groupBy(_._1).mapValues(_.size)val task = new Tasktask.start()//异步发送有返回值val res: Future[Any = task !! file//把每个文件的结果数据存储到ListBufferreplys += res//List(Map("a"->1,"b"->2),Map("a",1))}while (replys.size > 0) {//过滤每个Futrue对象,如果没有数据就过滤掉val dones: ListBuffer[Future[Any = replys.filter(_.isSet)for (done <- dones) {contens += done.apply().asInstanceOf[Map[String, Intreplys -= done}}//List(Map("a"->1,"b"->2),Map("c",1))//List(Map("a"->1,"b"->2,("c",1)))      flatten//map("a" -> List(("a",1),("a",2)))      groupBy//map("a" -> 3)     mapValues(_.foldLeft(0)(_ + _._2))val result: Map[String, Int = contens.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))println(result)}}class Task extends Actor {override def act(): Unit = {receive({case file: String => {val lines: List[String = Source.fromFile(file).getLines().toListval words: List[String = lines.flatMap(_.split(" "))val res: Map[String, Int = words.map((_, 1)).groupBy(_._1).mapValues(_.size)//异步发送结果数据sender ! res}})}}


版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-青牛,http://hainiubl.com/

楼主热帖
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 转播转播 分享分享 分享淘帖 赞 踩

168大数据 - 论坛版权1.本主题所有言论和图片纯属网友个人见解,与本站立场无关
2.本站所有主题由网友自行投稿发布。若为首发或独家,该帖子作者与168大数据享有帖子相关版权。
3.其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和168大数据的同意,并添加本文出处。
4.本站所收集的部分公开资料来源于网络,转载目的在于传递价值及用于交流学习,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
5.任何通过此网页连接而得到的资讯、产品及服务,本站概不负责,亦不负任何法律责任。
6.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源,若标注有误或遗漏而侵犯到任何版权问题,请尽快告知,本站将及时删除。
7.168大数据管理员和版主有权不事先通知发贴者而删除本文。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /1 下一条

关于我们|小黑屋|Archiver|168大数据 ( 京ICP备14035423号|申请友情链接

GMT+8, 2024-4-26 08:52

Powered by BI168大数据社区

© 2012-2014 168大数据

快速回复 返回顶部 返回列表