Home Akka Scala Cheat Sheet
Post
Cancel

Akka Scala Cheat Sheet

Hello World

An akka application is usually formed of a single “ActorSystem” with a “root” behavior. The ActorSystem controls and coordinate all the Akka magic behind the scene. The root behavior is where you set up all your actors, more on that later. We will just take this hello world example as is for now.

1
2
3
4
5
6
7
8
9
10
11
12
13
import akka.NotUsed
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors

object HelloWorld extends App {
  
  val root: Behavior[NotUsed] = Behaviors.setup { context =>
    context.log.info("Hello World")
    Behaviors.empty
  }
  ActorSystem(root, "root")
  
}

Actor

An actor is an object of type ActorRef[T]. This object has some properties that make it ideal for concurrent and distributed applications. We send messages to an actor. Those messages are guaranteed to be processed one at a time. Messages coming from the same sender will be processed in the same order they were sent.

An actor of type ActorRef[T] can only process messages of type T. To send a message to an actor we use the method ! which is called “tell”

1
2
val actor: ActorRef[String] = // We will see how to "spawn" and actor later `Behaviors.logMessages` which logs all messages it receives.
actor ! "Hello World"

To “spawn” an actor we use context.spawn(behavior, actorName). Where behavior is an object of type Behavior[T] and the actorName is a unique string. For this example, we will use Behaviors.ignore which ignores any message it receives. We will wrap that with

1
2
3
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore)
val actor: ActorRef[String] = context.spawn(behavior, "actorName")
actor ! "Hello World"

Actors live in a hierarchy. Think of a family tree. This hierarchy has a root actor. we can use Behaviors.setup to create that root actor behavior. This method takes a function ActorContext[T] => Behavior[T]. A simple root actor behavior typically is of type [NotUsed] as it typically doesn’t handle messages. It would return Behaviors.empty which would deal with any messages it may receive as “unhandled”.

1
2
3
4
val root: Behavior[NotUsed] = Behaviors.setup { context =>
  // ...
  Behaviors.empty
}

So putting it all together we get

1
2
3
4
5
6
val root: Behavior[NotUsed] = Behaviors.setup { context =>
  val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore)
  val actor: ActorRef[String] = context.spawn(behavior, "actorName")
  actor ! "Hello World"
  Behaviors.empty
}

To kick of our system and start the magic we need an ActorSystem. The ActorSystem is the heart of Akka. We need to create the ActorSystem for Akka to run.

1
val system = ActorSystem(root, "name")

Finally, for a graceful shutdown of the ActorSystem we need to call system.terminate()

The complete example would be

1
2
3
4
5
6
7
8
9
10
11
object HelloWorld extends App {

  val root: Behavior[NotUsed] = Behaviors.setup { context =>
    val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore)
    val actor: ActorRef[String] = context.spawn(behavior, "actorName")
    actor ! "Hello World!"
    Behaviors.empty
  }

  ActorSystem(root, "name")
}

Let’s replace Bhaviors.logMessages(Behavior.ignore) by a behavior which will do more or less the same thing.

1
2
3
4
val behavior: Behavior[String] = Behaviors.receive{(context, message) =>
  context.log.info(message)
  Behaviors.same
}

The most interesting part here is Behaviors.same. It indicates that when the message is processed the behavior of the actor will not change.

By convention behaviors are created via the apply method of an object. This object is usually named to represent the business name of the actor.

1
2
3
4
5
6
object Actor{
  def apply(): Behavior[String] = Behaviors.receive{(context, message) =>
    context.log.info(message)
    Behaviors.same
  }
}

Thus the full example would be

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
object HelloWorld extends App {

  val root: Behavior[NotUsed] = Behaviors.setup { context =>
    val actor: ActorRef[String] = context.spawn(Actor(), "actorName")
    actor ! "Hello World"
    Behaviors.empty
  }

  ActorSystem(root, "root")
}

object Actor { // Lives in a separate file
  def apply(): Behavior[String] = Behaviors.receive{ (context, message) =>
    context.log.info(message)
    Behaviors.same
  }
}

Behaviors

We have seen Behaviors.receive( (context, message) => behavior ) and we have seen Behaviors.setup(context => behavior). There is a third commonly used method Behaviors.receiveMessage(message => behavior) which can be used if we are not interested in the context. It can also be used in combination with Behaviors.setup. This could be used if you want to run some initialization once when creating a behavior.

1
2
3
4
5
6
7
8
9
object Actor {
  def apply(): Behavior[String] = Behaviors.setup { context =>
    // some initialization logic
    Behaviors.receiveMessage { message =>
      context.log.info(s"Message received: $message")
      Behaviors.same
    }
  }
}

stopped

If we want to stop and actor we can use Behaviors.stopped which stops the actor. That will result in a PostStop signal that the actor can intercept to clean up resources

1
2
3
4
5
6
7
8
9
10
Behaviors
  .receiveMessage[Message] {
    case EndOfTheLine =>
      Behaviors.stopped
  }
  .receiveSignal {
    case (context, PostStop) =>
      context.log.info("Stopped")
      Behaviors.same
  }

Schedulers and Timers

Schedulers

1
2
val scheduler = context.scheduleOnce(1.second, actor, Request("message", context.self))
scheduler.cancel()

System has more powerful scheduling facilities

1
system.scheduler.scheduleOnce(1.second, () => doSomething())

Timers

1
2
3
4
5
Behaviors.withTimers { timer =>
  timer.startSingleTimer(Request("ola", replyTo), 10.hours)
  // ...
}

Routers

Pool Router

1
2
3
context.spawnAnonymous(Routers.pool(poolSize = 4)(Worker()))

Routers.pool(poolSize = 4)(Worker()).withBroadcastPredicate(_ == true) // will broadcast all messages to all routees

Group Router

De-registering an actor might result in losing messages. Thus, it is recommended to stop the actor after the deregisteration notification is received. Which decreases the chance of messages being lost but doesn’t eliminate it.

1
2
3
4
5
6
7
8
9
10
11
12
13
val serviceKey = ServiceKey[Actor.Message]("id")
    
val actor = context.spawn(Actor(), "actor")

context.system.receptionist ! Receptionist.Register(serviceKey, actor) // you can register more actors at any point

val groupRouter: GroupRouter[Actor.Message] = Routers.group(serviceKey) // random routing by default

val routerActor: ActorRef[Actor.Message] = context.spawn(groupRouter, "groupRouter")

routerActor ! Actor.Request("message")

context.system.receptionist ! Receptionist.Deregister(serviceKey, actor) // may take a third argument for an actor to be notified with the deregistration

Watching an actor

An actor may watch another actor using context.watch(actorRef). If the watched actor stops. The watching actor will receive a Terminated(actorRef) signal.

1
2
3
4
5
.receiveSignal {
  case (context, Terminated(actor)) =>
    context.log.info(s"Actor ${actor.path.name} was terminated")
    Behaviors.same
}

You can also use watchWith which sends a custom message instead of the terminated signal. We can stop watching an actor using context.unwatch If you watch an already stopped actor, you will still receive the Terminated signal.

Message Adapters

Let’s say we have a customer and a restaurant. When the customer receives a message to order from a restaurant, it will send a message to that restaurant to create the order. The restaurant will respond with a message indicating the order has been created.

We will have two actors. a restaurant with behavior Behavior[Restaurant.Message] and a customer with behavior Behavior[Customer.Message]. Let’s look at an implementation of Restaurant.

1
2
3
4
5
6
7
8
9
10
11
12
object Restaurant {
  
  trait Message
  case class CreateOrder(replyTo: ActorRef[Message]) extends Message
  case object OrderCreated extends Message

  def apply(): Behavior[Message] = Behaviors.receiveMessage {
    case CreateOrder(replyTo) =>
      replyTo ! OrderCreated
      Behaviors.same
  }
}

There is a challenge here. The restaurant will send a message to the replyTo actor. But that actor is of type ActorRef[Restaurant.Message] and our customer is of type ActorRef[Customer.Message]. We can create a message adapter, in the customer side that does that conversion for us.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object Customer {
  trait Message
  case class OrderFromRestaurant(restaurant: ActorRef[Restaurant.Message]) extends Message
  case object OrderCreated extends Message

  def apply(): Behavior[Message] = Behaviors.setup { context =>
    val restaurantMessageToCustomerMessageAdapter =
      context.messageAdapter[Restaurant.Message] {
        case Restaurant.OrderCreated => OrderCreated
      }

    Behaviors.receiveMessage {
      case OrderFromRestaurant(restaurant) =>
        restaurant ! Restaurant.CreateOrder(restaurantMessageToCustomerMessageAdapter)
        Behaviors.same
      case OrderCreated =>
        context.log.info("Order Created")
        Behaviors.same
    }
  }

}

Fault Tolerance

By default, when an exception is thrown, the actor is stopped. To modify that behavior we need a supervisory strategy.

There are three types of supervisory strategies:

  • stop: default.
  • resume: drop the message where an error has occurred and process the next message. Actor state will be maintained/
  • restart: drop the message where an error has occurred and reset the actor state.

To handle different types of failures in different ways. Behaviors.supervise can be nested.

1
2
3
4
Behaviors.supervise(
  Behaviors.supervise(worker)
    .onFailure[IllegalStateException](SupervisorStrategy.resume)
).onFailure(SupervisorStrategy.restart.withLimit(maxNrOfRetries = 3, withinTimeRange = 10.seconds))

By default, child actors created in the setup phase is stopped when an actor is restarted. To override this behavior we can use iSupervisorStrategy.restart.withStopChildren(false)

Signals

  • A restarted actor will receive a PreRestart signal
  • A parent watching a child actor would receive ChildFailed signal. Note that ChildFailed extends Terminated signal.
  • To move up the failure up the hierarchy we need to watch for Terminated and rethrow the exception

Patterns

Ask

1
2
3
4
5
6
import akka.actor.typed.scaladsl.AskPattern._

context.ask(recipient , replyTo => Recipient.Message(replyTo, ???)) {
  case Success(a) => Sender.Message(a) 
  case Failure(exception) => Sender.ErrorMessage(exception)
}

Handling async computations (pipe)

1
2
3
4
5
val result: Future[String] = asyncCall()
context.pipeToSelf(result){
  case Success(m) => MessageToSelf(m)
  case Failure(exception) => ErrorIndicatorToSelf(exception)
}

You might need to stash messages until the computation is completed.

Stash

1
2
3
4
5
6
Behaviors.withStash (capacity){ stash =>
  Behaviors.receive{message =>
    stash.stash(message)
    stash.unstashAll(behaviorProcessingTheUnstashedMessages())
  }
}

Serialization

Akka supports both Jackson and Google Protocol Buffers for serialization. We will explore Jackson serialization in this section.

  1. Will need the dependency "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,

  2. Create a trait that all serializable objects using the same serializer would use

1
2
3
trait Serializable
trait Event extends com.mfarag.Serializable
  1. And some configuration to bind a serializer with that trait:
    1
    2
    3
    4
    5
    6
    7
    8
    
    actor {
      serializers {
     jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
      }
      serialization-bindings {
     "com.mfarag.Serializable" = jackson-json
      }
    }
    

Persistence

Dependencies and setup

As of the time of this writing, Akka documentation uses org.fusesource.leveldbjni for local file-based db development. org.fusesource.leveldbjni is not easily supported on Apple Mac M1 chip based computers.

You will need to enable serialization for persisted events

  1. Akka Persistence and Akka Persistence JDBC Dependency
    1
    2
    3
    4
    5
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
      "com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion,
      "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test
    )
    
  2. Slick or alternative database access library.
    1
    2
    3
    4
    
    libraryDependencies ++= Seq(
      "com.typesafe.slick" %% "slick" % SlickVersion,
      "com.typesafe.slick" %% "slick-hikaricp" % SlickVersion
    )
    
  3. Database drivers
    1
    2
    3
    
    libraryDependencies ++= Seq(
      "org.postgresql" % "postgresql" % PostgresVersion
    )
    
  4. Database Schema Database Schema

  5. Database configurations Akka Persistence JDBC Configurations

Event Sourced Persistence

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Behaviors.setup { context =>
  val logState: State => Unit = state => context.log.info(s"Current State is: $state")

  val commandHandler: (State, Command) => Effect[Event, State] = {
    case (_, Increment) =>
      Effect.persist(Incremented)
        .thenRun(logState)
  }

  val eventHandler: (State, Event) => State = {
    case (state, Incremented) => state.incremented()
  }

  EventSourcedBehavior[Command, Event, State](
    PersistenceId.ofUniqueId("abc"),
    Zero,
    commandHandler,
    eventHandler
  )
}

Snapshots

Make sure that the state you’re taking snapshots of is serializable. Make sure that you enabled snapshots in configuration

  1. You can persist snapshot based on a predicate (State, Event, Long) => Boolean
1
2
3
4
5
6
val snapshotPredicate: (State, Event, Long) => Boolean = {
  case (Value(n), event, sequenceNumber) if someCondition(n) => true
  case _ => false
}

eventSourcedBehaviour.snapshotWhen(snapshotPredicate)
  1. You can persist every N events
1
eventSourcedBehaviour.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents, keepNSnapshots))
  1. You can use both for example snapshot every n events or when a specific business condition is met

Testing

1
2
3
4
5
6
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike

class TestingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { // AnyWordSpecLike can be replaced with any scalatest style trait and dependency
  
}
1
2
3
4
5
6
7
8
9
10
11
12
val actor: ActorRef[Actor.Message] = testKit.spawn(Actor(), "actor")
val probe: TestProbe[Actor.Message] = testKit.createTestProbe[Actor.Message]()

actor ! Actor.Request("Hello", probe.ref)

probe.expectNoMessage()
probe.expectMessage(Actor.Response("World"))
probe.expectMessage(1 second, Actor.Response("World"))
probe.within(1.second, 2.second){
  probe.expectMessage(Actor.Response("World"))
}
val replies: Seq[Actor.Response] = probe.receiveMessages(numberOfMessages, timeoutDuration).collect { case m: Actor.Response => m }

Mocking actor behavior

1
2
3
4
5
6
7
8
9
10
11
12
13
val mockProbe: TestProbe[Actor.Message] = testKit.createTestProbe[Actor.Message]()

val mockedBehavior: Behaviors.Receive[Actor.Message] = Behaviors.receiveMessage[Actor.Message] {
  case Actor.Request(_, replyTo) =>
    replyTo ! Actor.Response("World")
    Behaviors.same
}

val mockedActor: ActorRef[Actor.Message] = testKit.spawn(
  Behaviors.monitor(
    mockProbe.ref,
    mockedBehavior
  ))

Manipulating Time

1
2
3
4
5
6
7
8
9
10
import akka.actor.testkit.typed.scaladsl.ManualTime

class TimeTester extends ScalaTestWithActorTestKit(ManualTime.config){

  val time = ManualTime()
  time.timePasses(10.hours)

  time.expectNoMessageFor(duration, testProbe)
}

Intercepting Logs

I think this should be a last resort. Logs are very fragile and do not really represent business logic. The logic can change and the logs may stay the same.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import akka.actor.testkit.typed.scaladsl.LoggingTestKit

LoggingTestKit
  .info(filter) // intercept logs of level info containing the string in filter
  .withMessageRegex(regex) // and satisfies this regex
  .withOccurrences(1)
  .expect {
    actor ! Actor.Request(replyToProbe.ref)
  }


LoggingTestKit
  .error[IllegalArgumentException]
  .expect{
    actor ! Actor.ErrorOut
  }

Synchronous Testing

1
2
3
4
5
6
val actor: BehaviorTestKit[Actor.Message] = BehaviorTestKit(Actor())
val replyToInbox: TestInbox[Actor.Message] = TestInbox[Actor.Message]()
actor.run(Actor.Request(replyToInbox.ref))
actor.expectEffect(NoEffects)
replyToInbox.expectMessage(Actor.Response(expectedMessage))
actor.logEntries() should contain(CapturedLogEvent(Level.INFO, expectedLogMessage))

dependencies

1
2
3
4
5
6
7
val AkkaVersion = "2.6.18"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "ch.qos.logback" % "logback-classic" % "1.2.10",
  "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
  "org.scalatest" %% "scalatest" % "3.2.11" % Test // You may add additional dependencies for specific scalatest test style
)
This post is licensed under CC BY 4.0 by the author.
Contents

Akka Streams

Rust Cheat Sheet

Comments powered by Disqus.