Akka Actor 生命周期

本文主要展示 Akka Actor 的生命周期管理和默认的监管策略。基于 Akka 2.3.4,Scala 2.11。

Akka Actor 生命周期钩子

Actor 实例化后就由 Actor 运行时调度执行。

Akka Actor 定义了下列的生命周期回调钩子:

  • preStart:在 actor 实例化后执行,重启时不会执行。
  • postStop:在 actor 正常终止后执行,异常重启时不会执行。
  • preRestart:在 actor 异常重启前保存当前状态。
  • postRestart:在 actor 异常重启后恢复重启前保存的状态。

例子

三个源文件:

Main.scala:初始化、发送消息给Actor。
ParentActor.scala :接收从 Main.scala 里发送的消息,初始化或控制 ChildActor。
ChildActor.scala:子 Actor,被 ParentActor 控制。

ChildActor.scala

package net.coderbee.akka.simple

import akka.actor.Actor

class ChildActor extends Actor {

    override def receive() = {
        case "abc" => println("get abc string ")
        case "exception" => throw new NullPointerException()
        case _ => println("children cann't handle unknown message")
    }

    override def preStart {
        println("actor:" + self.path + ",child preStart .")
    }

    override def postStop {
        println("actor:" + self.path + ",child postStop .")
    }

    override def preRestart(reason: Throwable, message: Option[Any]) {
        println("actor:" + self.path + ",preRestart child, reason:" + reason + ", message:" + message)
    }

    override def postRestart(reason: Throwable) {
        println("actor:" + self.path + ",postRestart child, reason:" + reason)
    }
}

ParentActor.scala

package net.coderbee.akka.simple

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.ReceiveTimeout
import scala.concurrent.duration
import akka.actor.ActorSystem

class ParentActor extends Actor with ActorLogging {
    println("start myactor ")

//    context.setReceiveTimeout(duration.Duration.apply(30, duration.SECONDS))
    //    val log = Logging(context.system, this)

    def receive = {
        case "test" => log.info("received test")
        case ("newChild", name: String) => context.actorOf(Props[ChildActor], name)
        case ("stop", name: String) => {
            val child = context.actorFor(self.path + "/" + name);
            context.stop(child)
        }
//        case ReceiveTimeout => throw new RuntimeException("received timeout"); // 每隔超时时间没收到消息就抛出异常
        case "suicide" => 
        case x: Any => log.info("received unknown message :" + x)
    }

    /**
     * 在 actor 实例化后执行,重启时不会执行
     */
    override def preStart {
        println("actor:" + self.path + ", parent preStart ")
    }

    /**
     * 在 actor 正常终止后执行,异常重启时不会执行。
     */
    override def postStop {
        println("actor:" + self.path + ",parent postStop .")
    }

    /**
     * 在 actor 异常重启前保存状态
     */
    override def preRestart(reason: Throwable, message: Option[Any]) {
        println("actor:" + self.path + ", preRestart parent, reason:" + reason + ", message:" + message)
    }

    /**
     * 在 actor 异常重启后恢复状态
     */
    override def postRestart(reason: Throwable) {
        println("actor:" + self.path + ", postRestart parent, reason:" + reason)
    }
}

Main.scala

package net.coderbee.akka.simple

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.actor.Kill
import scala.actors.threadpool.TimeUnit

object Main extends App {
    val sysname = "mysystem"
    val system = ActorSystem("mysystem")

    val myActor = system.actorOf(Props[ParentActor], name = "myactor")

    myActor ! ("newChild", "child-1")
    myActor ! ("newChild", "child-2")
    myActor ! "test"

    val parent = system.actorFor("akka://mysystem/user/myactor")
    parent ! "test"
//    parent ! ("stop", "child-2")

    val child2 = system.actorSelection("akka://mysystem/user/myactor/child-2")
    child2 ! Kill     // 杀死 child2
//    child2 ! "exception"
    Thread.sleep(5000)     // 等待 child2  被杀死
    myActor ! ("newChild", "child-2")
//    Thread.sleep(5000)
//    myActor ! ("newChild", "child-2")
}

上述代码(正常停止、启动 Actor)的输出如下:

start myactor 
actor:akka://mysystem/user/myactor, parent preStart 
actor:akka://mysystem/user/myactor/child-1,child preStart .
actor:akka://mysystem/user/myactor/child-2,child preStart .
[INFO] [08/14/2014 22:58:34.214] [mysystem-akka.actor.default-dispatcher-5] [akka://mysystem/user/myactor] received test
[INFO] [08/14/2014 22:58:34.214] [mysystem-akka.actor.default-dispatcher-5] [akka://mysystem/user/myactor] received test
[ERROR] [08/14/2014 22:58:34.230] [mysystem-akka.actor.default-dispatcher-2] [akka://mysystem/user/myactor/child-2] Kill (akka.actor.ActorKilledException)
actor:akka://mysystem/user/myactor/child-2,child postStop .
actor:akka://mysystem/user/myactor/child-2,child preStart .

在 Main.scala 里,如果不等的 child-2 被杀死就创建同名的 actor将导致名为 myactor 的父 actor 异常,使它重启,而根据 Akka Actor 的监管策略,它也会重启它的子 Actor,所以 child-1 和child-2 也会被重启,输出如下:

start myactor 
actor:akka://mysystem/user/myactor, parent preStart 
actor:akka://mysystem/user/myactor/child-1,child preStart .
actor:akka://mysystem/user/myactor/child-2,child preStart .
[INFO] [08/14/2014 22:59:35.742] [mysystem-akka.actor.default-dispatcher-4] [akka://mysystem/user/myactor/child-2] Message [akka.actor.Kill$] from Actor[akka://mysystem/deadLetters] to Actor[akka://mysystem/user/myactor/child-2] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [08/14/2014 22:59:35.745] [mysystem-akka.actor.default-dispatcher-7] [akka://mysystem/user/myactor] received test
[INFO] [08/14/2014 22:59:35.745] [mysystem-akka.actor.default-dispatcher-7] [akka://mysystem/user/myactor] received test
[ERROR] [08/14/2014 22:59:35.759] [mysystem-akka.actor.default-dispatcher-2] [akka://mysystem/user/myactor] actor name [child-2] is not unique!
akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
     at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
     at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
     at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
     at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
     at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
     at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
     at net.coderbee.akka.simple.ParentActor$$anonfun$receive$1.applyOrElse(ParentActor.scala:18)
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
     at net.coderbee.akka.simple.ParentActor.aroundReceive(ParentActor.scala:10)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
     at akka.dispatch.Mailbox.run(Mailbox.scala:220)
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

actor:akka://mysystem/user/myactor, preRestart parent, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!, message:Some((newChild,child-2))
start myactor 
actor:akka://mysystem/user/myactor, postRestart parent, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
actor:akka://mysystem/user/myactor/child-2,preRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!, message:None
actor:akka://mysystem/user/myactor/child-1,preRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!, message:None
actor:akka://mysystem/user/myactor/child-1,postRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!
actor:akka://mysystem/user/myactor/child-2,postRestart child, reason:akka.actor.InvalidActorNameException: actor name [child-2] is not unique!

从输出可以看到:父 Actor 首先调用 preRestart ,然后 被实例化,再调用 postRestart,最后再重启它的子 Actor,子 Actor 也遵循上述的步骤。


欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据