I am trying to make a sink that would write a stream to bucketed files: when a particular condition is reached (time, size of file, etc.) is reached, the current output stream is closed and a new one is opened to a new bucket file.
I checked how the different sinks were created in the io
object, but there aren't many examples. So I trIed to follow how resource
and chunkW
were written. I ended up with the following bit of code, where for simplicity, buckets are just represented by an Int
for now, but would eventually be some type output streams.
val buckets: Channel[Task, String, Int] = {
//recursion to step through the stream
def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = {
// Emit the value and repeat
def next(msg: String => Task[Int]) =
Process.emit(msg) ++
go(step)
Process.await[Task, String => Task[Int], String => Task[Int]](step)(
next
, Process.halt // TODO ???
, Process.halt) // TODO ???
}
//starting bucket
val acquire: Task[Int] = Task.delay {
val startBuck = nextBucket(0)
println(s"opening bucket $startBuck")
startBuck
}
//the write step
def step(os: Int): Task[String => Task[Int]] =
Task.now((msg: String) => Task.delay {
write(os, msg)
val newBuck = nextBucket(os)
if (newBuck != os) {
println(s"closing bucket $os")
println(s"opening bucket $newBuck")
}
newBuck
})
//start the Channel
Process.await(acquire)(
buck => go(step(buck))
, Process.halt, Process.halt)
}
def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") }
def nextBucket(b: Int) = b+1
There are a number of issues in this:
step
is passed the bucket once at the start and this never changes during the recursion. I am not sure how in the recursivego
to create a newstep
task that will use the bucket (Int) from the previous task, as I have to provide a String to get to that task.- the
fallback
andcleanup
of theawait
calls do not receive the result ofrcv
(if there is one). In theio.resource
function, it works fine as the resource is fixed, however, in my case, the resource might change at any step. How would I go to pass the reference to the current open bucket to these callbacks?
BucketedWriter extends Writer
that I can use withresource
, but it's pretty imperative (implementing java api). – Filagree