Fault Tolerance

实例解读

  • 首先创建顶级Actor worker listener demoActor
  • 订阅deadLetter 发给demoActor
  • worker内的sender指向 listener,并向worker发送 Start消息

Worker

  • 设置监管策略
// Stop the CounterService child if it throws ServiceUnavailable
override val supervisorStrategy = OneForOneStrategy() {
case _: CounterService.ServiceUnavailable => Stop
}
  • 当收到 CounterService.ServiceUnavailable类型的exception时 stop相应的actor 1-1
  • 创建子Actor counterService
  • 消息处理
def receive = LoggingReceive {
case Start if progressListener.isEmpty =>
progressListener = Some(sender())
context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)

case Do =>
counterService ! Increment(1)
counterService ! Increment(1)
counterService ! Increment(1)

// Send current progress to the initial sender
counterService ? GetCurrentCount map {
case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
} pipeTo progressListener.get
}

Start => 如果progressListener为None的话就用sender来做progressListener,并且没1秒给自己发一个Do的消息
Do => 给counterService连续发3个Increment消息,然后 给counterService发送GetCurrentCount消息并把结果给listener

CounterService

  • 设置监管策略
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 1,
withinTimeRange = 5 seconds) {
case _: Storage.StorageException => Restart
}

1-1, 当接受到 Storage.StorageException 类型的exception时,重启对方,重试次数为1,如果在5秒内用尽重试次数,则直接stop对方

  • 重写preStart
override def preStart() {
println("preStart, initStorage")
initStorage()
}

preStart是在这当前Actor重新初始化的时候会被调用,重启也会

这里直接初始化storage

def initStorage() {
storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
// Tell the counter, if any, to use the new storage
counter foreach {
_ ! UseStorage(storage)
}
// We need the initial value to be able to operate
storage.get ! Get(key)
}
  • 创建CounterService的子Actor storage
  • 如果有counter这个actor就发送 UseStorage(storage),第一次启动的时候没有counter,但是当storage重启的时候counter还是存在的,让他使用新的storage
  • 发送storage的Get(key)消息来获取初始值

Get(key)做了什么

case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
...
@throws(classOf[StorageException])
def load(key: String): Option[Long] = synchronized {
db.get(key)
}
  • db是个Map
    Get是给 CounterService 返回一个消息, 内容是db离对应的那个key和value,类型是 Entry(key, db.load(key).getOrElse(0L))

CounterService 接到 Entry干了什么

case Entry(k, v) if k == key && counter == None =>
// Reply from Storage of the initial value, now we can create the Counter
val c = context.actorOf(Props(classOf[Counter], key, v))
counter = Some(c)
println(s"--------------- ${counter.get.path} --------------- ")
// Tell the counter to use current storage
c ! UseStorage(storage)
// and send the buffered backlog to the counter
for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
backlog = IndexedSeq.empty
  • 如果当前没有counter 就创建一个子Actor, 当storage彻底停止后,会重连
def forwardOrPlaceInBacklog(msg: Any) {
// We need the initial value from storage before we can start delegate to
// the counter. Before that we place the messages in a backlog, to be sent
// to the counter when it is initialized.
counter match {
case Some(c) =>
c forward msg
case None =>
println("&&&&&&&&&&&&&&& NO COUNTER &&&&&&&&&&&&&&&&&&&")
if (backlog.size >= MaxBacklog)
throw new ServiceUnavailable(
"CounterService not available, lack of initial value")
backlog :+= (sender() -> msg)
}
}
  • 当CounterService收到消息的时候会检查当前有没有counter,有的话就forward 这个消息,没有的话会暂时存储这个(sender() -> msg)
  • 所以当storage重连的时候,新建counter,会先把存储的消息都发给新counter,并且清空存储容器backlog

正常情况下counter也有看看怎么处理消息

def receive = LoggingReceive {
case UseStorage(s) =>
storage = s
storeCount()

case Increment(n) =>
count += n
storeCount()

case GetCurrentCount =>
sender() ! CurrentCount(key, count)

}

def storeCount() {
// Delegate dangerous work, to protect our valuable state.
// We can continue without storage.
storage foreach {
_ ! Store(Entry(key, count))
}
}
  • 第一次的时候UseStorage, 会发 Store(Entry(key, count)) key是actor构造里带进来的

存数据


case Store(Entry(key, count)) => db.save(key, count)
...
@throws(classOf[StorageException])
def save(key: String, value: Long): Unit = synchronized {
println(s"*******************$value**********************")
if (11 <= value || value <= 240)
throw new StorageException("Simulated store failure " + value)
db += (key -> value)
}
  • 就是存在一个Map里,并且为了得到exception,写了个条件

Log

  1. 几个顶级Actor started
  2. 顺带counterService started 后面是 storage
  3. storage 获取初始值 Get(counterService)
  4. counterService收到返回的消息 Entry(counterService,0)
  5. 这个时候还没有counter所以会创建一个counter
  6. 给新counter 应用storage UseStorage(Some(Actor[akka://FaultToleranceSample/user/worker/counterService/storage#-1172001374]))
  7. 应用后给他存初始值 Store(Entry(counterService,0))
  8. 这时候 Worker刚收到 Start消息
  9. 刚开始没有progressListener,所以默认给了 listener,每秒钟给worker自己发一个Do消息
  10. Do会发3个 Increment 和一个 GetCurrentCount
    10-1. Increment 如果创建counter比较慢,这时候消息会暂时存储在 backlog, 等counter有的时候会先执行 backlog里的消息,有counter的话就去storage里存储

    遇到条件的时候storage 抛exception
  • CounterService 收到 让storage重启,重试次数用尽后,storage就停止了
  • 因为watch了所以就能去做重连
case Terminated(actorRef) if Some(actorRef) == storage =>
// After 3 restarts the storage child is stopped.
// We receive Terminated because we watch the child, see initStorage.
storage = None
// Tell the counter that there is no storage for the moment
counter foreach {
_ ! UseStorage(None)
}
// Try to re-establish storage after while
context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)

这里清掉了storage,10秒后重连
Reconnect就会重新初始化storage,以为这时候没有counter了,所以还会新建一个counter

  • 在没开始 重连的时间内,会有消息发到deadLetter里,但counter一直存在, 所以虽然没有存储在storage,但Counter里的count一直是累加的
  • 重连会重新初始化 storage,新建storage 给counter应用新的storage,并且从db里重新拿新的初始值
  • 当新的消息来的时候,会吧counter里存的count 插入到 db,得到最新的值

Source Code


import DemoActor._
import akka.actor._
import akka.actor.SupervisorStrategy._

import scala.concurrent.duration._
import akka.util.Timeout
import akka.event.LoggingReceive
import akka.pattern.{ask, pipe}
import com.typesafe.config.ConfigFactory

/**
* Runs the sample
*/
object FaultHandlingDocSample extends App {

import Worker._

val config = ConfigFactory.parseString(
"""
akka.loglevel = "DEBUG"
akka.actor.debug {
receive = on
lifecycle = on
}
""")

val system = ActorSystem("FaultToleranceSample", config)
val worker = system.actorOf(Props[Worker], name = "worker")
val listener = system.actorOf(Props[Listener], name = "listener")
val demoActor = system.actorOf(props(42), "demoActor")
system.eventStream.subscribe(demoActor, classOf[DeadLetter])
// start the work and listen on progress
// note that the listener is used as sender of the tell,
// i.e. it will receive replies from the worker
worker.tell(Start, sender = listener)
}

/**
* Listens on progress from the worker and shuts down the system when enough
* work has been done.
*/
class Listener extends Actor with ActorLogging {

import Worker._

// If we don't get any progress within 15 seconds then the service is unavailable
context.setReceiveTimeout(15 seconds)

def receive = {
case Progress(percent) =>
log.info("Current progress: {} %", percent)
if (percent >= 100.0) {
log.info("That's all, shutting down")
context.system.terminate()
}

case ReceiveTimeout =>
// No progress within 15 seconds, ServiceUnavailable
log.error("Shutting down due to unavailable service")
context.system.terminate()
}
}

object Worker {

case object Start

case object Do

final case class Progress(percent: Double)

}

/**
* Worker performs some work when it receives the `Start` message.
* It will continuously notify the sender of the `Start` message
* of current ``Progress``. The `Worker` supervise the `CounterService`.
*/
class Worker extends Actor with ActorLogging {

import Worker._
import CounterService._

implicit val askTimeout = Timeout(5 seconds)

// Stop the CounterService child if it throws ServiceUnavailable
override val supervisorStrategy = OneForOneStrategy() {
case _: CounterService.ServiceUnavailable => Stop
}

// The sender of the initial Start message will continuously be notified
// about progress
var progressListener: Option[ActorRef] = None
val counterService = context.actorOf(Props[CounterService], name = "counterService")
val totalCount = 51

import context.dispatcher

// Use this Actors' Dispatcher as ExecutionContext

def receive = LoggingReceive {
case Start if progressListener.isEmpty =>
progressListener = Some(sender())
context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)

case Do =>
counterService ! Increment(1)
counterService ! Increment(1)
counterService ! Increment(1)

// Send current progress to the initial sender
counterService ? GetCurrentCount map {
case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
} pipeTo progressListener.get
}
}

object CounterService {

final case class Increment(n: Int)

sealed abstract class GetCurrentCount

case object GetCurrentCount extends GetCurrentCount

final case class CurrentCount(key: String, count: Long)

class ServiceUnavailable(msg: String) extends RuntimeException(msg)

private case object Reconnect

}

/**
* Adds the value received in `Increment` message to a persistent
* counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
* `CounterService` supervise `Storage` and `Counter`.
*/
class CounterService extends Actor {

import CounterService._
import Counter._
import Storage._

// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 1,
withinTimeRange = 5 seconds) {
case _: Storage.StorageException => Restart
}

val key = self.path.name
var storage: Option[ActorRef] = None
var counter: Option[ActorRef] = None
var backlog = IndexedSeq.empty[(ActorRef, Any)]
val MaxBacklog = 10000

import context.dispatcher

// Use this Actors' Dispatcher as ExecutionContext

override def preStart() {
println("preStart, initStorage")
initStorage()
}

/**
* The child storage is restarted in case of failure, but after 3 restarts,
* and still failing it will be stopped. Better to back-off than continuously
* failing. When it has been stopped we will schedule a Reconnect after a delay.
* Watch the child so we receive Terminated message when it has been terminated.
*/
def initStorage() {
storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
// Tell the counter, if any, to use the new storage
counter foreach {
_ ! UseStorage(storage)
}
// We need the initial value to be able to operate
storage.get ! Get(key)
}

def receive = LoggingReceive {

case Entry(k, v) if k == key && counter == None =>
// Reply from Storage of the initial value, now we can create the Counter
val c = context.actorOf(Props(classOf[Counter], key, v))
counter = Some(c)
println(s"--------------- ${counter.get.path} --------------- ")
// Tell the counter to use current storage
c ! UseStorage(storage)
// and send the buffered backlog to the counter
for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
backlog = IndexedSeq.empty

case msg: Increment =>
forwardOrPlaceInBacklog(msg)

case msg: GetCurrentCount => forwardOrPlaceInBacklog(msg)

case Terminated(actorRef) if Some(actorRef) == storage =>
// After 3 restarts the storage child is stopped.
// We receive Terminated because we watch the child, see initStorage.
storage = None
// Tell the counter that there is no storage for the moment
counter foreach {
_ ! UseStorage(None)
}
// Try to re-establish storage after while
context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)

case Reconnect =>
// Re-establish storage after the scheduled delay
initStorage()
}

def forwardOrPlaceInBacklog(msg: Any) {
// We need the initial value from storage before we can start delegate to
// the counter. Before that we place the messages in a backlog, to be sent
// to the counter when it is initialized.
counter match {
case Some(c) =>
c forward msg
case None =>
println("&&&&&&&&&&&&&&& NO COUNTER &&&&&&&&&&&&&&&&&&&")
if (backlog.size >= MaxBacklog)
throw new ServiceUnavailable(
"CounterService not available, lack of initial value")
backlog :+= (sender() -> msg)
}
}

}

object Counter {

final case class UseStorage(storage: Option[ActorRef])

}

/**
* The in memory count variable that will send current
* value to the `Storage`, if there is any storage
* available at the moment.
*/
class Counter(key: String, initialValue: Long) extends Actor {

import Counter._
import CounterService._
import Storage._

var count = initialValue
var storage: Option[ActorRef] = None

def receive = LoggingReceive {
case UseStorage(s) =>
storage = s
storeCount()

case Increment(n) =>
count += n
storeCount()

case GetCurrentCount =>
sender() ! CurrentCount(key, count)

}

def storeCount() {
// Delegate dangerous work, to protect our valuable state.
// We can continue without storage.
storage foreach {
_ ! Store(Entry(key, count))
}
}

}

object Storage {

final case class Store(entry: Entry)

final case class Get(key: String)

final case class Entry(key: String, value: Long)

class StorageException(msg: String) extends RuntimeException(msg)

}

/**
* Saves key/value pairs to persistent storage when receiving `Store` message.
* Replies with current value when receiving `Get` message.
* Will throw StorageException if the underlying data store is out of order.
*/
class Storage extends Actor {

import Storage._

val db = DummyDB

def receive = LoggingReceive {
case Store(Entry(key, count)) => db.save(key, count)
case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
}
}

object DummyDB {

import Storage.StorageException

private var db = Map[String, Long]()

@throws(classOf[StorageException])
def save(key: String, value: Long): Unit = synchronized {
println(s"*******************$value**********************")
if (11 <= value || value <= 240)
throw new StorageException("Simulated store failure " + value)
db += (key -> value)
}

@throws(classOf[StorageException])
def load(key: String): Option[Long] = synchronized {
db.get(key)
}
}