Hello World
An akka application is usually formed of a single “ActorSystem” with a “root” behavior. The ActorSystem controls and coordinate all the Akka magic behind the scene. The root behavior is where you set up all your actors, more on that later. We will just take this hello world example as is for now.
1
2
3
4
5
6
7
8
9
10
11
12
13
import akka.NotUsed
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object HelloWorld extends App {
val root: Behavior[NotUsed] = Behaviors.setup { context =>
context.log.info("Hello World")
Behaviors.empty
}
ActorSystem(root, "root")
}
Actor
An actor is an object of type ActorRef[T]
. This object has some properties that make it ideal for concurrent and distributed applications. We send messages to an actor. Those messages are guaranteed to be processed one at a time. Messages coming from the same sender will be processed in the same order they were sent.
An actor of type ActorRef[T]
can only process messages of type T
. To send a message to an actor we use the method !
which is called “tell”
1
2
val actor: ActorRef[String] = // We will see how to "spawn" and actor later `Behaviors.logMessages` which logs all messages it receives.
actor ! "Hello World"
To “spawn” an actor we use context.spawn(behavior, actorName)
. Where behavior is an object of type Behavior[T]
and the actorName is a unique string. For this example, we will use Behaviors.ignore
which ignores any message it receives. We will wrap that with
1
2
3
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore)
val actor: ActorRef[String] = context.spawn(behavior, "actorName")
actor ! "Hello World"
Actors live in a hierarchy. Think of a family tree. This hierarchy has a root actor. we can use Behaviors.setup
to create that root actor behavior. This method takes a function ActorContext[T] => Behavior[T]
. A simple root actor behavior typically is of type [NotUsed]
as it typically doesn’t handle messages. It would return Behaviors.empty
which would deal with any messages it may receive as “unhandled”.
1
2
3
4
val root: Behavior[NotUsed] = Behaviors.setup { context =>
// ...
Behaviors.empty
}
So putting it all together we get
1
2
3
4
5
6
val root: Behavior[NotUsed] = Behaviors.setup { context =>
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore)
val actor: ActorRef[String] = context.spawn(behavior, "actorName")
actor ! "Hello World"
Behaviors.empty
}
To kick of our system and start the magic we need an ActorSystem
. The ActorSystem is the heart of Akka. We need to create the ActorSystem for Akka to run.
1
val system = ActorSystem(root, "name")
Finally, for a graceful shutdown of the ActorSystem we need to call system.terminate()
The complete example would be
1
2
3
4
5
6
7
8
9
10
11
object HelloWorld extends App {
val root: Behavior[NotUsed] = Behaviors.setup { context =>
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore)
val actor: ActorRef[String] = context.spawn(behavior, "actorName")
actor ! "Hello World!"
Behaviors.empty
}
ActorSystem(root, "name")
}
Let’s replace Bhaviors.logMessages(Behavior.ignore)
by a behavior which will do more or less the same thing.
1
2
3
4
val behavior: Behavior[String] = Behaviors.receive{(context, message) =>
context.log.info(message)
Behaviors.same
}
The most interesting part here is Behaviors.same
. It indicates that when the message is processed the behavior of the actor will not change.
By convention behaviors are created via the apply method of an object. This object is usually named to represent the business name of the actor.
1
2
3
4
5
6
object Actor{
def apply(): Behavior[String] = Behaviors.receive{(context, message) =>
context.log.info(message)
Behaviors.same
}
}
Thus the full example would be
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
object HelloWorld extends App {
val root: Behavior[NotUsed] = Behaviors.setup { context =>
val actor: ActorRef[String] = context.spawn(Actor(), "actorName")
actor ! "Hello World"
Behaviors.empty
}
ActorSystem(root, "root")
}
object Actor { // Lives in a separate file
def apply(): Behavior[String] = Behaviors.receive{ (context, message) =>
context.log.info(message)
Behaviors.same
}
}
Behaviors
We have seen Behaviors.receive( (context, message) => behavior )
and we have seen Behaviors.setup(context => behavior)
. There is a third commonly used method Behaviors.receiveMessage(message => behavior)
which can be used if we are not interested in the context. It can also be used in combination with Behaviors.setup
. This could be used if you want to run some initialization once when creating a behavior.
1
2
3
4
5
6
7
8
9
object Actor {
def apply(): Behavior[String] = Behaviors.setup { context =>
// some initialization logic
Behaviors.receiveMessage { message =>
context.log.info(s"Message received: $message")
Behaviors.same
}
}
}
stopped
If we want to stop and actor we can use Behaviors.stopped
which stops the actor. That will result in a PostStop
signal that the actor can intercept to clean up resources
1
2
3
4
5
6
7
8
9
10
Behaviors
.receiveMessage[Message] {
case EndOfTheLine =>
Behaviors.stopped
}
.receiveSignal {
case (context, PostStop) =>
context.log.info("Stopped")
Behaviors.same
}
Schedulers and Timers
Schedulers
1
2
val scheduler = context.scheduleOnce(1.second, actor, Request("message", context.self))
scheduler.cancel()
System has more powerful scheduling facilities
1
system.scheduler.scheduleOnce(1.second, () => doSomething())
Timers
1
2
3
4
5
Behaviors.withTimers { timer =>
timer.startSingleTimer(Request("ola", replyTo), 10.hours)
// ...
}
Routers
Pool Router
1
2
3
context.spawnAnonymous(Routers.pool(poolSize = 4)(Worker()))
Routers.pool(poolSize = 4)(Worker()).withBroadcastPredicate(_ == true) // will broadcast all messages to all routees
Group Router
De-registering an actor might result in losing messages. Thus, it is recommended to stop the actor after the deregisteration notification is received. Which decreases the chance of messages being lost but doesn’t eliminate it.
1
2
3
4
5
6
7
8
9
10
11
12
13
val serviceKey = ServiceKey[Actor.Message]("id")
val actor = context.spawn(Actor(), "actor")
context.system.receptionist ! Receptionist.Register(serviceKey, actor) // you can register more actors at any point
val groupRouter: GroupRouter[Actor.Message] = Routers.group(serviceKey) // random routing by default
val routerActor: ActorRef[Actor.Message] = context.spawn(groupRouter, "groupRouter")
routerActor ! Actor.Request("message")
context.system.receptionist ! Receptionist.Deregister(serviceKey, actor) // may take a third argument for an actor to be notified with the deregistration
Watching an actor
An actor may watch another actor using context.watch(actorRef)
. If the watched actor stops. The watching actor will receive a Terminated(actorRef)
signal.
1
2
3
4
5
.receiveSignal {
case (context, Terminated(actor)) =>
context.log.info(s"Actor ${actor.path.name} was terminated")
Behaviors.same
}
You can also use watchWith
which sends a custom message instead of the terminated signal. We can stop watching an actor using context.unwatch
If you watch an already stopped actor, you will still receive the Terminated signal.
Message Adapters
Let’s say we have a customer and a restaurant. When the customer receives a message to order from a restaurant, it will send a message to that restaurant to create the order. The restaurant will respond with a message indicating the order has been created.
We will have two actors. a restaurant with behavior Behavior[Restaurant.Message]
and a customer with behavior Behavior[Customer.Message]
. Let’s look at an implementation of Restaurant.
1
2
3
4
5
6
7
8
9
10
11
12
object Restaurant {
trait Message
case class CreateOrder(replyTo: ActorRef[Message]) extends Message
case object OrderCreated extends Message
def apply(): Behavior[Message] = Behaviors.receiveMessage {
case CreateOrder(replyTo) =>
replyTo ! OrderCreated
Behaviors.same
}
}
There is a challenge here. The restaurant will send a message to the replyTo
actor. But that actor is of type ActorRef[Restaurant.Message]
and our customer is of type ActorRef[Customer.Message]
. We can create a message adapter, in the customer side that does that conversion for us.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object Customer {
trait Message
case class OrderFromRestaurant(restaurant: ActorRef[Restaurant.Message]) extends Message
case object OrderCreated extends Message
def apply(): Behavior[Message] = Behaviors.setup { context =>
val restaurantMessageToCustomerMessageAdapter =
context.messageAdapter[Restaurant.Message] {
case Restaurant.OrderCreated => OrderCreated
}
Behaviors.receiveMessage {
case OrderFromRestaurant(restaurant) =>
restaurant ! Restaurant.CreateOrder(restaurantMessageToCustomerMessageAdapter)
Behaviors.same
case OrderCreated =>
context.log.info("Order Created")
Behaviors.same
}
}
}
Fault Tolerance
By default, when an exception is thrown, the actor is stopped. To modify that behavior we need a supervisory strategy.
There are three types of supervisory strategies:
- stop: default.
- resume: drop the message where an error has occurred and process the next message. Actor state will be maintained/
- restart: drop the message where an error has occurred and reset the actor state.
To handle different types of failures in different ways. Behaviors.supervise
can be nested.
1
2
3
4
Behaviors.supervise(
Behaviors.supervise(worker)
.onFailure[IllegalStateException](SupervisorStrategy.resume)
).onFailure(SupervisorStrategy.restart.withLimit(maxNrOfRetries = 3, withinTimeRange = 10.seconds))
By default, child actors created in the setup phase is stopped when an actor is restarted. To override this behavior we can use iSupervisorStrategy.restart.withStopChildren(false)
Signals
- A restarted actor will receive a
PreRestart
signal - A parent watching a child actor would receive
ChildFailed
signal. Note thatChildFailed
extendsTerminated
signal. - To move up the failure up the hierarchy we need to watch for Terminated and rethrow the exception
Patterns
Ask
1
2
3
4
5
6
import akka.actor.typed.scaladsl.AskPattern._
context.ask(recipient , replyTo => Recipient.Message(replyTo, ???)) {
case Success(a) => Sender.Message(a)
case Failure(exception) => Sender.ErrorMessage(exception)
}
Handling async computations (pipe)
1
2
3
4
5
val result: Future[String] = asyncCall()
context.pipeToSelf(result){
case Success(m) => MessageToSelf(m)
case Failure(exception) => ErrorIndicatorToSelf(exception)
}
You might need to stash messages until the computation is completed.
Stash
1
2
3
4
5
6
Behaviors.withStash (capacity){ stash =>
Behaviors.receive{message =>
stash.stash(message)
stash.unstashAll(behaviorProcessingTheUnstashedMessages())
}
}
Serialization
Akka supports both Jackson and Google Protocol Buffers for serialization. We will explore Jackson serialization in this section.
Will need the dependency
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
Create a trait that all serializable objects using the same serializer would use
1
2
3
trait Serializable
trait Event extends com.mfarag.Serializable
- And some configuration to bind a serializer with that trait:
1 2 3 4 5 6 7 8
actor { serializers { jackson-json = "akka.serialization.jackson.JacksonJsonSerializer" } serialization-bindings { "com.mfarag.Serializable" = jackson-json } }
Persistence
Dependencies and setup
As of the time of this writing, Akka documentation uses org.fusesource.leveldbjni
for local file-based db development. org.fusesource.leveldbjni
is not easily supported on Apple Mac M1 chip based computers.
You will need to enable serialization for persisted events
- Akka Persistence and Akka Persistence JDBC Dependency
1 2 3 4 5
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, "com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion, "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test )
- Slick or alternative database access library.
1 2 3 4
libraryDependencies ++= Seq( "com.typesafe.slick" %% "slick" % SlickVersion, "com.typesafe.slick" %% "slick-hikaricp" % SlickVersion )
- Database drivers
1 2 3
libraryDependencies ++= Seq( "org.postgresql" % "postgresql" % PostgresVersion )
Database Schema Database Schema
- Database configurations Akka Persistence JDBC Configurations
Event Sourced Persistence
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Behaviors.setup { context =>
val logState: State => Unit = state => context.log.info(s"Current State is: $state")
val commandHandler: (State, Command) => Effect[Event, State] = {
case (_, Increment) =>
Effect.persist(Incremented)
.thenRun(logState)
}
val eventHandler: (State, Event) => State = {
case (state, Incremented) => state.incremented()
}
EventSourcedBehavior[Command, Event, State](
PersistenceId.ofUniqueId("abc"),
Zero,
commandHandler,
eventHandler
)
}
Snapshots
Make sure that the state you’re taking snapshots of is serializable. Make sure that you enabled snapshots in configuration
- You can persist snapshot based on a predicate
(State, Event, Long) => Boolean
1
2
3
4
5
6
val snapshotPredicate: (State, Event, Long) => Boolean = {
case (Value(n), event, sequenceNumber) if someCondition(n) => true
case _ => false
}
eventSourcedBehaviour.snapshotWhen(snapshotPredicate)
- You can persist every N events
1
eventSourcedBehaviour.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents, keepNSnapshots))
- You can use both for example snapshot every n events or when a specific business condition is met
Testing
1
2
3
4
5
6
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike
class TestingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { // AnyWordSpecLike can be replaced with any scalatest style trait and dependency
}
1
2
3
4
5
6
7
8
9
10
11
12
val actor: ActorRef[Actor.Message] = testKit.spawn(Actor(), "actor")
val probe: TestProbe[Actor.Message] = testKit.createTestProbe[Actor.Message]()
actor ! Actor.Request("Hello", probe.ref)
probe.expectNoMessage()
probe.expectMessage(Actor.Response("World"))
probe.expectMessage(1 second, Actor.Response("World"))
probe.within(1.second, 2.second){
probe.expectMessage(Actor.Response("World"))
}
val replies: Seq[Actor.Response] = probe.receiveMessages(numberOfMessages, timeoutDuration).collect { case m: Actor.Response => m }
Mocking actor behavior
1
2
3
4
5
6
7
8
9
10
11
12
13
val mockProbe: TestProbe[Actor.Message] = testKit.createTestProbe[Actor.Message]()
val mockedBehavior: Behaviors.Receive[Actor.Message] = Behaviors.receiveMessage[Actor.Message] {
case Actor.Request(_, replyTo) =>
replyTo ! Actor.Response("World")
Behaviors.same
}
val mockedActor: ActorRef[Actor.Message] = testKit.spawn(
Behaviors.monitor(
mockProbe.ref,
mockedBehavior
))
Manipulating Time
1
2
3
4
5
6
7
8
9
10
import akka.actor.testkit.typed.scaladsl.ManualTime
class TimeTester extends ScalaTestWithActorTestKit(ManualTime.config){
val time = ManualTime()
time.timePasses(10.hours)
time.expectNoMessageFor(duration, testProbe)
}
Intercepting Logs
I think this should be a last resort. Logs are very fragile and do not really represent business logic. The logic can change and the logs may stay the same.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
LoggingTestKit
.info(filter) // intercept logs of level info containing the string in filter
.withMessageRegex(regex) // and satisfies this regex
.withOccurrences(1)
.expect {
actor ! Actor.Request(replyToProbe.ref)
}
LoggingTestKit
.error[IllegalArgumentException]
.expect{
actor ! Actor.ErrorOut
}
Synchronous Testing
1
2
3
4
5
6
val actor: BehaviorTestKit[Actor.Message] = BehaviorTestKit(Actor())
val replyToInbox: TestInbox[Actor.Message] = TestInbox[Actor.Message]()
actor.run(Actor.Request(replyToInbox.ref))
actor.expectEffect(NoEffects)
replyToInbox.expectMessage(Actor.Response(expectedMessage))
actor.logEntries() should contain(CapturedLogEvent(Level.INFO, expectedLogMessage))
dependencies
1
2
3
4
5
6
7
val AkkaVersion = "2.6.18"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"ch.qos.logback" % "logback-classic" % "1.2.10",
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.11" % Test // You may add additional dependencies for specific scalatest test style
)
Comments powered by Disqus.