Sources
Source[+Out, +Mat]
- Out: The type of the elements that the Source produces.
- Mat: The type of materialized value.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val emptySource: Source[String, NotUsed] = Source.empty[String]
val singleElementSource: Source[String, NotUsed] = Source.single("element")
val repeatingElementSource: Source[String, NotUsed] = Source.repeat("element")
val scheduledSource: Source[String, Cancellable] = Source.tick(
initialDelay = 1 second,
interval = 5 seconds,
tick = "message"
) // When there is no demand, the tick will be lost
val sourceFromIterable: Source[Int, NotUsed] = Source(1 to 10)
val sourceFromIterator: Source[Int, NotUsed] = Source.fromIterator{
() => Iterator.from(0)
}
val cyclingSourceFromIterator: Source[Int, NotUsed] = Source.cycle{
() => Iterator.range(1, 10)
}
val statefulSource: Source[Int, NotUsed] = Source.unfold(initialValue) { // Source.unfoldAsync(...): Future[Option[T]]
case value if condition => Some((value.updated(), value))
case _ => None // completes when None is returned
}
val sourceFromActor: Source[Message, ActorRef] = // materialized as an actorRef. Completes on `Status.Success` (drain the buffer) or `PoisonPill` (terminates immediately)
Source.actorRef[Message](
bufferSize = n,
overflowStrategy = OverflowStrategy.dropNew // downstream is slow
)
val sourceFromFile: Source[ByteString, Future[IOResult]] = FileIO.fromPath(path, chunkSize)
val sourceFromTcp: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind(address, portNumber) //stream of connections
val sourceFromJavaStream: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream {
() => new FileInputStream(fileName)
}
Sinks
Sink[-In, +Mat]
- In: the type of elements the sink consumes.
- Mat: the type of materialized value.
1
2
3
4
5
6
7
8
9
val discardingSink: Sink[Any, Future[Done]] = Sink.ignore
val sinkWithNoMaterializedValue: Sink[Int, Future[Done]] = Sink.foreach[Int](i => println(s"$i"))
val singleElementSink: Sink[Int, Future[Int]] = Sink.head[Int] // throws NoSuchElementException if stream is empty
val singleOptionalElementSink: Sink[Int, Future[Option[Int]]] = Sink.headOption[Int]
val pullAllElementsSink: Sink[Int, Future[Seq[Int]]] = Sink.seq[Int]
val statefulSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0){ case (sum, elem) => sum + elem}
val actorSink: Sink[Int, NotUsed] = Sink.actorRef[Int](actor, onCompleteMessage = DoneMessage) // No backpressure, use `actorRefWithAck` for backpressure
val fileSink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(path)
val javaStreamSink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream( () => new FileOutputStream(file))
Flows
Flow[-In, +Out, +Mat]
- In: type of elements Flow consumes
- Out: type of elements flow produces
- Mat: type of materialized value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val mapSyncFlow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val mapAsyncFlowOrdered: Flow[Int, Int, NotUsed] = Flow[Int].(parallelism = 4).mapAsync(_ * 2) { i => Future{ i * 2 }}
val mapAsyncFlowUnordered: Flow[Int, Int, NotUsed] = Flow[Int].(parallelism = 4).mapAsyncUnordered(_ * 2) { i => Future{ i * 2 }}
val flattenedCollectionFlow: Flow[String, String, NotUsed] = Flow[String].mapConcat(s => s.split("\\s").toVector) // similar to flatMap on collections
val groupingFlow: Flow[Int, Seq[Int], NotUsed] = Flow[Int].grouped(10) // There is also `sliding(10, step)` for sliding window grouping
val statefulFlowFold: Flow[Int, Int, NotUsed] = Flow[Int].fold(0){ case (sum, value) => sum + value} // will only emit when completed
val statefulFlowScan: Flow[Int, Int, NotUsed] = Flow[Int].scan(0){ case (sum, value) => sum + value} // emits every intermediate result
val filteringFlow: Flow[Int, Int, NotUsed] = Flow[Int].filter(predicate) // there is also `collect` which filters and maps in the same time
val timeBasedFlow: Flow[Int, Int, NotUsed] = Flow[Int].takeWithin(duration) // There is also `dropWithin` and `groupWithin`
val zipFlow: Flow[String, (String, Int), NotUsed] = Flow[String].zip(otherSource)
val flatMapOrdered: Flow[Int, Int, NotUsed] = Flow[Int].flatMapConcat( sources) // sources are processed sequentially in order
val flatMapUnordered: Flow[Int, Int, NotUsed] = Flow[Int].flatMapMerge(breadth, sources) // streams processed in parallel, order can't be guaranteed between streams
val bufferingFlow: Flow[Int, Int, NotUsed] = Flow.buffer(size, overflowStrategy)
val loggingFlow: Flow[Int, Int, NotUsed] = Flow[Int].log(logKey) // log each element and mark it with logKey. optional extract function can be supplied. `withAttributes` to change the log level
some flow methods are available on sources for convenience (e.g. map)
Flows for slow consumers/producers
- expand: extrapolate data until the next value comes
- batch: e.g. batch writing to a db
- conflate: summarize elements until the next phase is ready
Junctions
Fan Out Junctions
Broadcast[T]
to all outputsBalance[T]
emits to the first available outputUnzipWith[In, A, B, ...]
convert 1 input to N outputsUnZip[A, B]
Stream of tuple into two streams
Fan In Junctions
Merge[In]
randomly selects from N inputs and pushes to an outputMergePreferred[In]
one input is given higher priorityzipWith[A, B, ..., Out]
take one from each input and create a single outputZip[A,B]
Concat[A]
consume one stream then the other
Overflow strategies
- backpressure
- dropHead
- dropTail
- dropNew
- dropBuffer
- fail
Runnable Graph
source.via(flow).via(anotherFlow).to(sink).run()
Materialized value
run()
returns the materialized valueto
materializes the value from the stage it is called ontoMat
andviaMat
takes a second argument that allows us to keep the materialized value from left of right.viaMat(sink)(Keep.right)
Shortcuts
returns materialized value of the sink:
runWith(sink)
runForeach(...)
runFold(...)
runReduce(...)
Comments powered by Disqus.