本文主要展示 Akka Actor 的生命周期管理和默认的监管策略。基于 Akka 2.3.4,Scala 2.11。
Akka Actor 生命周期钩子
Actor 实例化后就由 Actor 运行时调度执行。
Akka Actor 定义了下列的生命周期回调钩子:
- preStart:在 actor 实例化后执行,重启时不会执行。
- postStop:在 actor 正常终止后执行,异常重启时不会执行。
- preRestart:在 actor 异常重启前保存当前状态。
- postRestart:在 actor 异常重启后恢复重启前保存的状态。
例子
三个源文件:
Main.scala:初始化、发送消息给Actor。
ParentActor.scala :接收从 Main.scala 里发送的消息,初始化或控制 ChildActor。
ChildActor.scala:子 Actor,被 ParentActor 控制。
ChildActor.scala
package net.coderbee.akka.simple
import akka.actor.Actor
class ChildActor extends Actor {
override def receive() = {
case "abc" => println("get abc string ")
case "exception" => throw new NullPointerException()
case _ => println("children cann't handle unknown message")
}
override def preStart {
println("actor:" + self.path + ",child preStart .")
}
override def postStop {
println("actor:" + self.path + ",child postStop .")
}
override def preRestart(reason: Throwable, message: Option[Any]) {
println("actor:" + self.path + ",preRestart child, reason:" + reason + ", message:" + message)
}
override def postRestart(reason: Throwable) {
println("actor:" + self.path + ",postRestart child, reason:" + reason)
}
}
ParentActor.scala
package net.coderbee.akka.simple
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.ReceiveTimeout
import scala.concurrent.duration
import akka.actor.ActorSystem
class ParentActor extends Actor with ActorLogging {
println("start myactor ")
// context.setReceiveTimeout(duration.Duration.apply(30, duration.SECONDS))
// val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case ("newChild", name: String) => context.actorOf(Props[ChildActor], name)
case ("stop", name: String) => {
val child = context.actorFor(self.path + "/" + name);
context.stop(child)
}
// case ReceiveTimeout => throw new RuntimeException("received timeout"); // 每隔超时时间没收到消息就抛出异常
case "suicide" =>
case x: Any => log.info("received unknown message :" + x)
}
/**
* 在 actor 实例化后执行,重启时不会执行
*/
override def preStart {
println("actor:" + self.path + ", parent preStart ")
}
/**
* 在 actor 正常终止后执行,异常重启时不会执行。
*/
override def postStop {
println("actor:" + self.path + ",parent postStop .")
}
/**
* 在 actor 异常重启前保存状态
*/
override def preRestart(reason: Throwable, message: Option[Any]) {
println("actor:" + self.path + ", preRestart parent, reason:" + reason + ", message:" + message)
}
/**
* 在 actor 异常重启后恢复状态
*/
override def postRestart(reason: Throwable) {
println("actor:" + self.path + ", postRestart parent, reason:" + reason)
}
}
Main.scala
package net.coderbee.akka.simple
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.actor.Kill
import scala.actors.threadpool.TimeUnit
object Main extends App {
val sysname = "mysystem"
val system = ActorSystem("mysystem")
val myActor = system.actorOf(Props[ParentActor], name = "myactor")
myActor ! ("newChild", "child-1")
myActor ! ("newChild", "child-2")
myActor ! "test"
val parent = system.actorFor("akka://mysystem/user/myactor")
parent ! "test"
// parent ! ("stop", "child-2")
val child2 = system.actorSelection("akka://mysystem/user/myactor/child-2")
child2 ! Kill // 杀死 child2
// child2 ! "exception"
Thread.sleep(5000) // 等待 child2 被杀死
myActor ! ("newChild", "child-2")
// Thread.sleep(5000)
// myActor ! ("newChild", "child-2")
}
上述代码(正常停止、启动 Actor)的输出如下:
start myactor
actor:akka://mysystem/user/myactor, parent preStart
actor:akka://mysystem/user/myactor/child-1,child preStart .
actor:akka://mysystem/user/myactor/child-2,child preStart .
[INFO] [08/14/2014 22:58:34.214] [mysystem-akka.actor.default-dispatcher-5] [akka://mysystem/user/myactor] received test
[INFO] [08/14/2014 22:58:34.214] [mysystem-akka.actor.default-dispatcher-5] [akka://mysystem/user/myactor] received test
[ERROR] [08/14/2014 22:58:34.230] [mysystem-akka.actor.default-dispatcher-2] [akka://mysystem/user/myactor/child-2] Kill (akka.actor.ActorKilledException)
actor:akka://mysystem/user/myactor/child-2,child postStop .
actor:akka://mysystem/user/myactor/child-2,child preStart .
在 Main.scala 里,如果不等的 child-2 被杀死就创建同名的 actor将导致名为 myactor 的父 actor 异常,使它重启,而根据 Akka Actor 的监管策略,它也会重启它的子 Actor,所以 child-1 和child-2 也会被重启,输出如下:
start myactor
actor:akka://mysystem/user/myactor, parent preStart
actor:akka://mysystem/user/myactor/child-1,child preStart .
actor:akka://mysystem/user/myactor/child-2,child preStart .
[INFO] [08/14/2014 22:59:35.742] [mysystem-akka.actor.default-dispatcher-4] [akka://mysystem/user/myactor/child-2] Message [akka.actor.Kill$] from Actor[akka://mysystem/deadLetters] to Actor[akka://mysystem/user/myactor/child-2] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [08/14/2014 22:59:35.745] [mysystem-akka.actor.default-dispatcher-7] [akka://mysystem/user/myactor] received test
[INFO] [08/14/2014 22:59:35.745] [mysystem-akka.actor.default-dispatcher-7] [akka://mysystem/user/myactor] received test
[ERROR] [08/14/2014 22:59:35.759] [mysystem-akka.actor.default-dispatcher-2] [akka://mysystem/user/myactor] actor name [child-2] is not unique!
akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
at net.coderbee.akka.simple.ParentActor$$anonfun$receive$1.applyOrElse(ParentActor.scala:18)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at net.coderbee.akka.simple.ParentActor.aroundReceive(ParentActor.scala:10)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
actor:akka://mysystem/user/myactor, preRestart parent, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!, message:Some((newChild,child-2))
start myactor
actor:akka://mysystem/user/myactor, postRestart parent, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
actor:akka://mysystem/user/myactor/child-2,preRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!, message:None
actor:akka://mysystem/user/myactor/child-1,preRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!, message:None
actor:akka://mysystem/user/myactor/child-1,postRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
actor:akka://mysystem/user/myactor/child-2,postRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
从输出可以看到:父 Actor 首先调用 preRestart ,然后 被实例化,再调用 postRestart,最后再重启它的子 Actor,子 Actor 也遵循上述的步骤。
欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。