Actor - FSM

Source Code

package sample.redelivery

import akka.actor._
import scala.concurrent.duration._
import java.util.concurrent.ThreadLocalRandom
import java.util.UUID

object SimpleOrderedRedeliverer {

def props(retryTimeout: FiniteDuration) = Props(classOf[SimpleOrderedRedeliverer], retryTimeout)


case class Deliver(to: ActorRef, msg: Any, uuid: UUID)

case class Delivered(uuid: UUID)

case class AcceptedForDelivery(uuid: UUID)

case class Busy(refused: UUID, currentlyProcessing: UUID)


case class Ackable(from: ActorRef, msg: Any, uuid: UUID)

case class Ack(uuid: UUID)


sealed trait State

case object Idle extends State

case object AwaitingAck extends State

sealed trait Data

case object NoData extends Data


case class LastRequest(last: Deliver, requester: ActorRef) extends Data


private case object Retry

}

class SimpleOrderedRedeliverer(retryTimeout: FiniteDuration) extends Actor with FSM[SimpleOrderedRedeliverer.State, SimpleOrderedRedeliverer.Data] {

import SimpleOrderedRedeliverer._

// So that we don't make a typo when referencing this timer.
val RetryTimer = "retry"

// Start idle with neither last request, nor most recent requester.
startWith(Idle, NoData)


def process(request: Deliver, requester: ActorRef): State = {
request.to ! Ackable(requester, request.msg, request.uuid)
setTimer(RetryTimer, Retry, retryTimeout, repeat = false)
goto(AwaitingAck) using LastRequest(request, requester)
}


when(Idle) {
case Event(request: Deliver, _) =>
println("idle now, processing")
process(request, sender()) replying AcceptedForDelivery(request.uuid)
}

when(AwaitingAck) {

case Event(Retry, LastRequest(request, requester)) =>
println("will retry")
process(request, requester)

case Event(Ack(uuid), LastRequest(request, requester)) if uuid == request.uuid =>
cancelTimer(RetryTimer)
requester ! Delivered(uuid)
println(s"matched - $uuid")
goto(Idle) using NoData

case Event(request: Deliver, LastRequest(current, _)) =>
println("reject")
stay() replying Busy(request.uuid, current.uuid)
}

}

object Receiver {

def props = Props(classOf[Receiver])
}

class Receiver extends Actor {

def shouldSendAck = ThreadLocalRandom.current.nextDouble() < 0.25

def receive = {
case SimpleOrderedRedeliverer.Ackable(from, msg, uuid) =>
val goingToSendAck = shouldSendAck
println(s""" [Receiver] got "$msg"; ${if (goingToSendAck) "" else " ***NOT***"} going to send Ack this time""")
// Send a [[SimpleOrderedRedeliverer.Ack]] -- if they're lucky!
if (goingToSendAck) sender() ! SimpleOrderedRedeliverer.Ack(uuid)
}
}

object Requester {

def props = Props(classOf[Requester])

private case object Tick

}

class Requester extends Actor {

import Requester._
import context.dispatcher

val redeliverer = context.actorOf(SimpleOrderedRedeliverer.props(retryTimeout = 3.seconds))
val receiver = context.actorOf(Receiver.props)


val messages = List("Hello!", "Ping!", "Howdy!")


self ! Tick


def nextTickIn: FiniteDuration = (1.0 + ThreadLocalRandom.current.nextDouble() * 9.0).seconds

def receive = {
case Tick =>
val msg = util.Random.shuffle(messages).head
val uuid = UUID.randomUUID()
println(s"""[Requester] requesting ("$msg", $uuid) to be sent to [Receiver]...""")

redeliverer ! SimpleOrderedRedeliverer.Deliver(receiver, msg, uuid)


context.system.scheduler.scheduleOnce(nextTickIn, self, Tick)

// case msg => println(s"[Requester] got $msg")
}

}

object FsmSimpleRedelivery extends App {

val system = ActorSystem()

system.actorOf(Requester.props)

}

Output

[Requester] requesting ("Ping!", 5da81190-5089-403b-9561-7147e3568fc0) to be sent to [Receiver]...
idle now, processing
[Receiver] got "Ping!"; going to send Ack this time
matched - 5da81190-5089-403b-9561-7147e3568fc0
[Requester] requesting ("Howdy!", 4e26f458-5b72-470f-98bf-913cee94dc95) to be sent to [Receiver]...
idle now, processing
[Receiver] got "Howdy!"; ***NOT*** going to send Ack this time
will retry
[Receiver] got "Howdy!"; going to send Ack this time
matched - 4e26f458-5b72-470f-98bf-913cee94dc95
[Requester] requesting ("Howdy!", 4f5686bb-5f88-4eca-9983-cddc1364de14) to be sent to [Receiver]...
idle now, processing
[Receiver] got "Howdy!"; ***NOT*** going to send Ack this time
will retry
[Receiver] got "Howdy!"; ***NOT*** going to send Ack this time
will retry
[Receiver] got "Howdy!"; ***NOT*** going to send Ack this time
[Requester] requesting ("Hello!", 730788c3-a1ab-4593-aa54-8ab240592031) to be sent to [Receiver]...
reject
will retry
[Receiver] got "Howdy!"; going to send Ack this time
matched - 4f5686bb-5f88-4eca-9983-cddc1364de14

解释

  1. requester 发送一个新消息Ping
  2. 初始状态Idle 处理 判断这次要ACK 切换状态
  3. uuid匹配所以match, 切换为Idle
  4. requester发送 一个新消息 Howdy
  5. idle下判断是否ack, 这次不去验证, 一定时间内没有收到ack验证消息则retry
  6. retry判断这次要ack,uuid匹配match 切换回idle
  7. 继续发新消息 Howdy
  8. idle下处理切换 等待状态,判断不走ack 就一直retry,在retry过程中,收到另一个新消息Hello
  9. 由于uuid不同,直接reject,之前的Howdy 仍然在retry 直到最后match