Creating your first actor
1
2
3
4
5
6
7
8
9
| class MyActor extends Actor {
def receive: Receive = {
case msg => // do something with that message
}
}
object MyActor{
def props():Props = Props(new MyActor)
}
|
1
| val system = ActorSystem("name")
|
Creating an Actor Reference
Root Actor Reference
1
| val actor = system.actorOf(MyActor.props())
|
Creating a child actor
1
| val actor = context.actorOf(MyActor.props())
|
1
| actor ! "First message"
|
1
| actor.forward("keep original sender")
|
1
2
3
4
5
6
7
8
9
10
11
12
13
| import akka.patter.ask
// implicit Timeout
// implicit ExecutionContext
val response = myActor ? Message
response.mapTo[Response] onComplete {
case Success(result) =>
case Failure(reason) =>
}
// alternatively
import akka.pattern.pipe
myActor ? Message pipeTo self // preferred to avoid running concurrent calculations
// That will return either the successful result, or Status.Failure
|
pipeTo should be used with any future
Actor Selection
1
2
3
| context.actorSelection("/user/path/to/specific/actor")
context.actorSelection("../sibling")
context.actorSelection("/user/wild-card/*") // All children of "wild-card"
|
- ActorSelection is unverified and the message might end up on deadLetters
- Less performant compared to messaging the actorRef directly.
- Can’t be used for death watch
Actor Identification
1
2
3
4
| override def receive: Receive = {
case ActorIdentity(key, Some(actorRef)) => //actor exists and alive
case ActorIdentity(key, None) => // No one is alive here
}
|
Actor Lifecycle
Lifecycle Hooks
- preStart
- postStop
- preRestart(reason: Throwable, message: Option[Any]) // on the failing actor
- postRestart(reason: Throwable) // on the new actor
Stopping an actor
1
2
| context.stop(self)
context.stop(anotherActor)
|
1
2
| actor ! PoisonPill // context.stop(self)
actor ! Kill // Throws ActorKilledException
|
Death Watch
1
2
3
| context.watch(anotherActor)
case Terminated(otherActor) => // Do something
|
If Terminated message is not handled, a DeathPactException
will be thrown by default.
Scheduling
Timer trait
- Only sends messages to the actor it is mixed-in with
- Tied to the lifecycle of the actor. If the actor dies, the timer is cancelled
- If the timer is cancelled and a scheduled message in flight, that message will be discarded
1
2
3
4
5
6
7
8
9
| class MyActor extends ActorLogging with Timers{
timers.startSingleTimer(
key,
message,
duration
)
timers.cancel(key)
}
|
Scheduler
- Needs an execution context
- Not tied to an actor. As such every incarnation of an actor might start a new scheduler
- Not tied to the lifecycle of the timer and would continue to send messages even if the recipient is dead
1
2
3
4
5
6
7
| import context.dispatcher
system.scheduler.scheduleOnce(
duration,
actorRef,
Message
)
|
Testing
AfterAll
shutdown the actor system after all tests are done.
TestActorRef
1
2
3
4
5
| // implicit actorSystem in scope
val actor = TestActorRef(new MyActor)
val myUnderlyingActor = actor.underlyingActor
actor ! message // Processed synchronously
myUnderlyingActor.field shouldEqual expectedValue
|
- This breaks encapsulation
- In reality this scenario doesn’t happen
TestProbe
1
2
3
4
5
6
7
8
9
10
| val testProbe = TestProbe()
val myActor = system.actorOf(MyActor.props(testProbe.ref))
myActor ! message
testProbe.expectMsg(expectedMessage)
testProbe.within(minDuration, maxDuration){
myActor ! anotherMessage
testProbe.expectMsg(anotherExpectedMessage)
}
|
Fault Tolerance (error handling)
1
2
3
4
5
6
| class MyActor extends Actor{
override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maximumNumberOfRetries, withinTimeRange) {
}
// rest of the actor code
}
|
1
2
3
4
5
6
7
8
| override val supervisorStrategy: SupervisorStrategy = {
val decider: Decider = {
case Guest.CaffeineException => Stop
}
OneForOneStrategy() {
decider.orElse(super.supervisorStrategy.decider)
}
}
|
- OneForOneStrategy: Only fail the affected actor
- AllForOneStrategy: Fail the affected actor and all its siblings
Directives:
- Resume (maintain the state of the actor)
- Restart (reset the state of the actor)
- Stop
- Escalate (default if you ignore it)
The faulty message will be dropped
Default strategy
Stop: - ActorInitializationException - ActorKilledException - DeathPactException Restart: - Other Exceptions Escalate: - Other Throwables
Routing
1
2
3
| context.actorOf(MyActor.props.withRouter(Config()), "router-name") // Pull config from config file
context.actorOf(RoundRobinPool(4).props(MyActor.props), "router-name")
|
Attach a router to an actor
1
2
3
4
5
| deployment {
/path/myActor {
router = round-robin-pool
nr-of-instances = 4
}
|
1
| FromConfig.props(MyActor.props)
|
- An optional resizer can be given to a router to dynamically adjust the number of routees
- By default, all failures are escalated
Router Type
- Pool Router (built in, creates and supervises a number of routees)
- Group Router (Built in, routes to existing actors that are supervised by their own parent(s))
Routers do not have a mailbox, they route messages to their routees directly except for
PoisonPill
and Kill
are not delivered to any routeeBroadcast
is delivered to all routees
Routing Logic
- RandomRoutingLogic
- RoundRobinRoutingLogic
- SmallestMailboxRoutingLogic (Doesn’t work well for remote routees)
- ConsistentHashingRoutingLogic (by key?)
- BroadcastRoutingLogic
- ScatterGatherFirstCompletedRoutingLogic
- TailChoppingRoutingLogic (send to one, waits for response, if it doesn’t receive it, it moves to the next and so on)
- extend RoutingLogic (must be thread safe)
Dispatcher
1
| context.actorOv(MyActor.props.withDispatcher("dispatcher-name"))
|
Dispatchers provided by Akka
- Dispatcher (default): sharing threads from a thread pool for its actors
- PinnedDispatcher: Dedicates a unique thread for each actor
- CallingThreadDispatcher: For testing, runs invocations on the current thread only
Provide MessageDispatcherConfigurator to provide your own dispatcher
In configuration throughput = 5
the number of messages that an actor processes before yielding the thread.
You might need to use a separate dispatcher when there is long-running blocking operations
Modifying Actor Behaviour
1
| context.become(newBhaviour, discardOld)
|
stashing messages stash
and unstash
-> Could create a memory leak! stash size can be configured, and excess messages will be deleted.
Comments powered by Disqus.