Using zipWith
You could zip the original Source
with a Source of Booleans that only returns true
the first time. This zipped Source can then be processed.
First we'll need a Source that emits the Booleans:
//true, false, false, false, ...
def firstTrueIterator() : Iterator[Boolean] =
(Iterator single true) ++ (Iterator continually false)
def firstTrueSource : Source[Boolean, _] =
Source fromIterator firstTrueIterator
We can then define a function that handles the two different cases:
type Data = ???
type OutputData = ???
def processData(data : Data, firstRun : Boolean) : OutputData =
if(firstRun) { ... }
else { ... }
This function can then be used in a zipWith
of your original Source:
val originalSource : Source[Data,_] = ???
val contingentSource : Source[OutputData,_] =
originalSource.zipWith(firstTrueSource)(processData)
Using Stateful Flow
You could create a Flow
that contains state similar to the example in the question but with a more functional approach:
def firstRunner(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : (Data) => OutputData = {
var firstRun = true
(data : Data) => {
if(firstRun) {
firstRun = false
firstCall(data)
}
else
otherCalls(data)
}
}//end def firstRunner
def firstRunFlow(firstCall : (Data) => OutputData,
otherCalls : (Data) => OutputData) : Flow[Data, OutputData, _] =
Flow[Data] map firstRunner(firstCall, otherCalls)
This Flow can then be applied to your original Source:
def firstElementFunc(data : Data) : OutputData = ???
def remainingElsFunc(data : Data) : OutputData = ???
val firstSource : Source[OutputData, _] =
originalSource via firstRunFlow(firstElementFunc,remainingElseFunc)
"Idiomatic Way"
Answering your question directly requires dictating the "idiomatic way". I answer that part last because it is the least verifiable by the compiler and is therefore closer to opinion. I would never claim to be a valid classifier of idiomatic code.
My personal experience with akka-streams has been that it is best to switch my perspective to imagining an actual stream (I think of a train with boxcars) of Data
elements. Do I need to break it up into multiple fixed size trains? Do only certain boxcars make it through? Can I attach another train side-by-side that contains Boolean
cars which can signal the front? I would prefer the zipWith method due to my regard of streams (trains). My initial approach is always to use other stream parts connected together.
Also, I find it best to embed as little code in an akka Stream component as possible. firstTrueIterator
and processData
have no dependency on akka at all. Concurrently, the firstTrueSource
and contingentSource
definitions have virtually no logic. This allows you to test the logic independent of a clunky ActorSystem and the guts can be used in Futures, or Actors.
fst
is kind of irrelevant, sincehead
is already aSeq
. At least my use case (CSV files) is one where the 'head' is consumed, used in creation of the processing for 'tail'. I used Akka doc.akka.io/docs/akka/current/stream/operators/Flow/… page but it also had some quirks.. See github.com/akka/akka/issues/31284 – Obligato