Home Akka Streams
Post
Cancel

Akka Streams

Sources

Source[+Out, +Mat]

  • Out: The type of the elements that the Source produces.
  • Mat: The type of materialized value.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val emptySource: Source[String, NotUsed] = Source.empty[String]
val singleElementSource: Source[String, NotUsed] = Source.single("element")
val repeatingElementSource: Source[String, NotUsed] = Source.repeat("element")
val scheduledSource: Source[String, Cancellable] = Source.tick(
  initialDelay = 1 second,
  interval = 5 seconds,
  tick = "message"
) // When there is no demand, the tick will be lost
val sourceFromIterable: Source[Int, NotUsed] = Source(1 to 10)
val sourceFromIterator: Source[Int, NotUsed] = Source.fromIterator{
  () => Iterator.from(0)
}
val cyclingSourceFromIterator: Source[Int, NotUsed] = Source.cycle{
  () => Iterator.range(1, 10)
}
val statefulSource: Source[Int, NotUsed] = Source.unfold(initialValue) { // Source.unfoldAsync(...): Future[Option[T]]
  case value if condition => Some((value.updated(), value))
  case _ => None // completes when None is returned
}
val sourceFromActor: Source[Message, ActorRef] = // materialized as an actorRef. Completes on `Status.Success` (drain the buffer) or `PoisonPill` (terminates immediately) 
  Source.actorRef[Message](
    bufferSize = n,
    overflowStrategy = OverflowStrategy.dropNew // downstream is slow
  )
val sourceFromFile: Source[ByteString, Future[IOResult]] = FileIO.fromPath(path, chunkSize)
val sourceFromTcp: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind(address, portNumber) //stream of connections
val sourceFromJavaStream: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream {
  () => new FileInputStream(fileName)
}

Sinks

Sink[-In, +Mat]

  • In: the type of elements the sink consumes.
  • Mat: the type of materialized value.
1
2
3
4
5
6
7
8
9
val discardingSink: Sink[Any, Future[Done]] = Sink.ignore
val sinkWithNoMaterializedValue: Sink[Int, Future[Done]] = Sink.foreach[Int](i => println(s"$i"))
val singleElementSink: Sink[Int, Future[Int]] = Sink.head[Int] // throws NoSuchElementException if stream is empty
val singleOptionalElementSink: Sink[Int, Future[Option[Int]]] = Sink.headOption[Int]
val pullAllElementsSink: Sink[Int, Future[Seq[Int]]] = Sink.seq[Int]
val statefulSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0){ case (sum, elem) => sum + elem}
val actorSink: Sink[Int, NotUsed] = Sink.actorRef[Int](actor, onCompleteMessage = DoneMessage) // No backpressure, use `actorRefWithAck` for backpressure
val fileSink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(path)
val javaStreamSink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream( () => new FileOutputStream(file))

Flows

Flow[-In, +Out, +Mat]

  • In: type of elements Flow consumes
  • Out: type of elements flow produces
  • Mat: type of materialized value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val mapSyncFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val mapAsyncFlowOrdered: Flow[Int, Int, NotUsed] = Flow[Int].(parallelism = 4).mapAsync(_ * 2) { i => Future{ i * 2 }}
val mapAsyncFlowUnordered: Flow[Int, Int, NotUsed] = Flow[Int].(parallelism = 4).mapAsyncUnordered(_ * 2) { i => Future{ i * 2 }}
val flattenedCollectionFlow: Flow[String, String, NotUsed] = Flow[String].mapConcat(s => s.split("\\s").toVector) // similar to flatMap on collections
val groupingFlow: Flow[Int, Seq[Int], NotUsed] = Flow[Int].grouped(10) // There is also `sliding(10, step)` for sliding window grouping
val statefulFlowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0){ case (sum, value) => sum + value} // will only emit when completed
val statefulFlowScan: Flow[Int, Int, NotUsed] = Flow[Int].scan(0){ case (sum, value) => sum + value} // emits every intermediate result
val filteringFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(predicate) // there is also `collect` which filters and maps in the same time
val timeBasedFlow: Flow[Int, Int, NotUsed] = Flow[Int].takeWithin(duration) // There is also `dropWithin` and `groupWithin`
val zipFlow: Flow[String, (String, Int), NotUsed] = Flow[String].zip(otherSource)
val flatMapOrdered: Flow[Int, Int, NotUsed] = Flow[Int].flatMapConcat( sources) // sources are processed sequentially in order
val flatMapUnordered: Flow[Int, Int, NotUsed] = Flow[Int].flatMapMerge(breadth, sources) // streams processed in parallel, order can't be guaranteed between streams
val bufferingFlow: Flow[Int, Int, NotUsed] = Flow.buffer(size, overflowStrategy)
val loggingFlow: Flow[Int, Int, NotUsed] = Flow[Int].log(logKey) // log each element and mark it with logKey. optional extract function can be supplied. `withAttributes` to change the log level 

some flow methods are available on sources for convenience (e.g. map)

Flows for slow consumers/producers

  • expand: extrapolate data until the next value comes
  • batch: e.g. batch writing to a db
  • conflate: summarize elements until the next phase is ready

Junctions

Fan Out Junctions

  • Broadcast[T] to all outputs
  • Balance[T] emits to the first available output
  • UnzipWith[In, A, B, ...] convert 1 input to N outputs
  • UnZip[A, B] Stream of tuple into two streams

Fan In Junctions

  • Merge[In] randomly selects from N inputs and pushes to an output
  • MergePreferred[In] one input is given higher priority
  • zipWith[A, B, ..., Out] take one from each input and create a single output
  • Zip[A,B]
  • Concat[A] consume one stream then the other

Overflow strategies

  • backpressure
  • dropHead
  • dropTail
  • dropNew
  • dropBuffer
  • fail

Runnable Graph

source.via(flow).via(anotherFlow).to(sink).run()

Materialized value

  • run() returns the materialized value
  • to materializes the value from the stage it is called on
  • toMat and viaMat takes a second argument that allows us to keep the materialized value from left of right. viaMat(sink)(Keep.right)

Shortcuts

returns materialized value of the sink:

  • runWith(sink)
  • runForeach(...)
  • runFold(...)
  • runReduce(...)
This post is licensed under CC BY 4.0 by the author.
Contents

Akka Classic Scala Cheat Sheet

Akka Scala Cheat Sheet

Comments powered by Disqus.