Home Akka Classic Scala Cheat Sheet
Post
Cancel

Akka Classic Scala Cheat Sheet

Creating your first actor

1
2
3
4
5
6
7
8
9
class MyActor extends Actor {
  def receive: Receive = {
    case msg => // do something with that message
  }
}

object MyActor{
  def props():Props = Props(new MyActor)
}
1
val system = ActorSystem("name")

Creating an Actor Reference

Root Actor Reference

1
val actor = system.actorOf(MyActor.props())

Creating a child actor

1
val actor = context.actorOf(MyActor.props())
1
actor ! "First message"
1
actor.forward("keep original sender")
1
2
3
4
5
6
7
8
9
10
11
12
13
import akka.patter.ask
// implicit Timeout
// implicit ExecutionContext
val response = myActor ? Message
response.mapTo[Response] onComplete {
  case Success(result) =>
  case Failure(reason) =>
}

// alternatively
import akka.pattern.pipe
myActor ? Message pipeTo self // preferred to avoid running concurrent calculations
// That will return either the successful result, or Status.Failure

pipeTo should be used with any future

Actor Selection

1
2
3
context.actorSelection("/user/path/to/specific/actor")
context.actorSelection("../sibling")
context.actorSelection("/user/wild-card/*") // All children of "wild-card"
  • ActorSelection is unverified and the message might end up on deadLetters
  • Less performant compared to messaging the actorRef directly.
  • Can’t be used for death watch

Actor Identification

1
actor ! Identify(key)
1
2
3
4
override def receive: Receive = {
  case ActorIdentity(key, Some(actorRef)) => //actor exists and alive
  case ActorIdentity(key, None) => // No one is alive here
}

Actor Lifecycle

Lifecycle Hooks

  • preStart
  • postStop
  • preRestart(reason: Throwable, message: Option[Any]) // on the failing actor
  • postRestart(reason: Throwable) // on the new actor

Stopping an actor

1
2
context.stop(self)
context.stop(anotherActor)
1
2
actor ! PoisonPill  // context.stop(self)
actor ! Kill        // Throws ActorKilledException

Death Watch

1
2
3
context.watch(anotherActor)

case Terminated(otherActor) => // Do something

If Terminated message is not handled, a DeathPactException will be thrown by default.

Scheduling

Timer trait

  • Only sends messages to the actor it is mixed-in with
  • Tied to the lifecycle of the actor. If the actor dies, the timer is cancelled
  • If the timer is cancelled and a scheduled message in flight, that message will be discarded
1
2
3
4
5
6
7
8
9
class MyActor extends ActorLogging with Timers{
  timers.startSingleTimer(
    key,
    message,
    duration
  )
  
  timers.cancel(key)
}

Scheduler

  • Needs an execution context
  • Not tied to an actor. As such every incarnation of an actor might start a new scheduler
  • Not tied to the lifecycle of the timer and would continue to send messages even if the recipient is dead
1
2
3
4
5
6
7
import context.dispatcher

system.scheduler.scheduleOnce(
  duration,
  actorRef,
  Message
)

Testing

AfterAll

shutdown the actor system after all tests are done.

TestActorRef

1
2
3
4
5
// implicit actorSystem in scope
val actor = TestActorRef(new MyActor)
val myUnderlyingActor = actor.underlyingActor
actor ! message  // Processed synchronously
myUnderlyingActor.field shouldEqual expectedValue
  • This breaks encapsulation
  • In reality this scenario doesn’t happen

TestProbe

1
2
3
4
5
6
7
8
9
10
val testProbe = TestProbe()
val myActor = system.actorOf(MyActor.props(testProbe.ref))
myActor ! message
testProbe.expectMsg(expectedMessage)

testProbe.within(minDuration, maxDuration){
  myActor ! anotherMessage
  testProbe.expectMsg(anotherExpectedMessage)
}

Fault Tolerance (error handling)

1
2
3
4
5
6
class MyActor extends Actor{
  override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maximumNumberOfRetries, withinTimeRange) {
    
  }
  // rest of the actor code
}
1
2
3
4
5
6
7
8
override val supervisorStrategy: SupervisorStrategy = {
  val decider: Decider = {
    case Guest.CaffeineException => Stop
  }
  OneForOneStrategy() {
    decider.orElse(super.supervisorStrategy.decider)
  }
}
  • OneForOneStrategy: Only fail the affected actor
  • AllForOneStrategy: Fail the affected actor and all its siblings

Directives:

  • Resume (maintain the state of the actor)
  • Restart (reset the state of the actor)
  • Stop
  • Escalate (default if you ignore it)

The faulty message will be dropped

Default strategy

Stop: - ActorInitializationException - ActorKilledException - DeathPactException Restart: - Other Exceptions Escalate: - Other Throwables

Routing

1
2
3
context.actorOf(MyActor.props.withRouter(Config()), "router-name") // Pull config from config file

context.actorOf(RoundRobinPool(4).props(MyActor.props), "router-name")

Attach a router to an actor

1
2
3
4
5
deployment {
  /path/myActor {
    router          = round-robin-pool
    nr-of-instances = 4
  }
1
FromConfig.props(MyActor.props)
  • An optional resizer can be given to a router to dynamically adjust the number of routees
  • By default, all failures are escalated

Router Type

  • Pool Router (built in, creates and supervises a number of routees)
  • Group Router (Built in, routes to existing actors that are supervised by their own parent(s))

Routers do not have a mailbox, they route messages to their routees directly except for

  • PoisonPill and Kill are not delivered to any routee
  • Broadcast is delivered to all routees

Routing Logic

  • RandomRoutingLogic
  • RoundRobinRoutingLogic
  • SmallestMailboxRoutingLogic (Doesn’t work well for remote routees)
  • ConsistentHashingRoutingLogic (by key?)
  • BroadcastRoutingLogic
  • ScatterGatherFirstCompletedRoutingLogic
  • TailChoppingRoutingLogic (send to one, waits for response, if it doesn’t receive it, it moves to the next and so on)
  • extend RoutingLogic (must be thread safe)

Dispatcher

1
context.actorOv(MyActor.props.withDispatcher("dispatcher-name"))

Dispatchers provided by Akka

  • Dispatcher (default): sharing threads from a thread pool for its actors
  • PinnedDispatcher: Dedicates a unique thread for each actor
  • CallingThreadDispatcher: For testing, runs invocations on the current thread only

Provide MessageDispatcherConfigurator to provide your own dispatcher

In configuration throughput = 5 the number of messages that an actor processes before yielding the thread.

You might need to use a separate dispatcher when there is long-running blocking operations

Modifying Actor Behaviour

1
context.become(newBhaviour, discardOld)

stashing messages stash and unstash -> Could create a memory leak! stash size can be configured, and excess messages will be deleted.

This post is licensed under CC BY 4.0 by the author.
Contents

Privacy Policy

Akka Streams

Comments powered by Disqus.