实例解读
- 首先创建顶级Actor
worker
listener
demoActor
- 订阅deadLetter 发给
demoActor
worker
内的sender指向listener
,并向worker
发送Start
消息
Worker
- 设置监管策略
// Stop the CounterService child if it throws ServiceUnavailable |
- 当收到 CounterService.ServiceUnavailable类型的exception时 stop相应的actor 1-1
- 创建子Actor
counterService
- 消息处理
def receive = LoggingReceive { |
Start => 如果progressListener为None的话就用sender来做progressListener,并且没1秒给自己发一个Do的消息
Do => 给counterService连续发3个Increment消息,然后 给counterService发送GetCurrentCount消息并把结果给listener
CounterService
- 设置监管策略
override val supervisorStrategy = OneForOneStrategy( |
1-1, 当接受到 Storage.StorageException 类型的exception时,重启对方,重试次数为1,如果在5秒内用尽重试次数,则直接stop对方
- 重写preStart
override def preStart() { |
preStart是在这当前Actor重新初始化的时候会被调用,重启也会
这里直接初始化storage
def initStorage() { |
- 创建CounterService的子Actor
storage
- 如果有counter这个actor就发送
UseStorage(storage)
,第一次启动的时候没有counter,但是当storage重启的时候counter还是存在的,让他使用新的storage - 发送storage的Get(key)消息来获取初始值
Get(key)做了什么
case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L)) |
- db是个Map
Get是给 CounterService 返回一个消息, 内容是db离对应的那个key和value,类型是Entry(key, db.load(key).getOrElse(0L))
CounterService 接到 Entry干了什么
case Entry(k, v) if k == key && counter == None => |
- 如果当前没有counter 就创建一个子Actor, 当storage彻底停止后,会重连
def forwardOrPlaceInBacklog(msg: Any) { |
- 当CounterService收到消息的时候会检查当前有没有counter,有的话就forward 这个消息,没有的话会暂时存储这个
(sender() -> msg)
- 所以当storage重连的时候,新建counter,会先把存储的消息都发给新counter,并且清空存储容器
backlog
正常情况下counter也有看看怎么处理消息
def receive = LoggingReceive { |
- 第一次的时候UseStorage, 会发
Store(Entry(key, count))
key是actor构造里带进来的
存数据
|
- 就是存在一个Map里,并且为了得到exception,写了个条件
Log
- 几个顶级Actor started
- 顺带counterService started 后面是 storage
- storage 获取初始值
Get(counterService)
- counterService收到返回的消息
Entry(counterService,0)
- 这个时候还没有counter所以会创建一个counter
- 给新counter 应用storage
UseStorage(Some(Actor[akka://FaultToleranceSample/user/worker/counterService/storage#-1172001374]))
- 应用后给他存初始值
Store(Entry(counterService,0))
- 这时候
Worker
刚收到Start
消息 - 刚开始没有
progressListener
,所以默认给了listener
,每秒钟给worker自己发一个Do消息 - Do会发3个
Increment
和一个GetCurrentCount
10-1.Increment
如果创建counter比较慢,这时候消息会暂时存储在 backlog, 等counter有的时候会先执行 backlog里的消息,有counter的话就去storage里存储
…
遇到条件的时候storage 抛exception
- CounterService 收到 让storage重启,重试次数用尽后,storage就停止了
- 因为watch了所以就能去做重连
case Terminated(actorRef) if Some(actorRef) == storage => |
这里清掉了storage,10秒后重连
Reconnect就会重新初始化storage,以为这时候没有counter了,所以还会新建一个counter
- 在没开始 重连的时间内,会有消息发到deadLetter里,但counter一直存在, 所以虽然没有存储在storage,但Counter里的count一直是累加的
- 重连会重新初始化 storage,新建storage 给counter应用新的storage,并且从db里重新拿新的初始值
- 当新的消息来的时候,会吧counter里存的count 插入到 db,得到最新的值
Source Code
|