Bucketed Sink in scalaz-stream
Asked Answered
F

1

6

I am trying to make a sink that would write a stream to bucketed files: when a particular condition is reached (time, size of file, etc.) is reached, the current output stream is closed and a new one is opened to a new bucket file.

I checked how the different sinks were created in the io object, but there aren't many examples. So I trIed to follow how resource and chunkW were written. I ended up with the following bit of code, where for simplicity, buckets are just represented by an Int for now, but would eventually be some type output streams.

  val buckets: Channel[Task, String, Int] = {

    //recursion to step through the stream
    def go(step: Task[String => Task[Int]]): Process[Task, String => Task[Int]] = {

      // Emit the value and repeat
      def next(msg: String => Task[Int]) =
        Process.emit(msg) ++
          go(step)


      Process.await[Task, String => Task[Int], String => Task[Int]](step)(
        next
        , Process.halt // TODO ???
        , Process.halt) // TODO ???
    }

   //starting bucket
    val acquire: Task[Int] = Task.delay {
      val startBuck = nextBucket(0)
      println(s"opening bucket $startBuck")
      startBuck
    }

   //the write step
    def step(os: Int): Task[String => Task[Int]] =
      Task.now((msg: String) => Task.delay {
        write(os, msg)
        val newBuck = nextBucket(os)
        if (newBuck != os) {
          println(s"closing bucket $os")
          println(s"opening bucket $newBuck")
        }
        newBuck
      })

    //start the Channel
    Process.await(acquire)(
      buck => go(step(buck))
      , Process.halt, Process.halt)
  }

 def write(bucket: Int, msg: String) { println(s"$bucket\t$msg") }
 def nextBucket(b: Int) = b+1

There are a number of issues in this:

  1. step is passed the bucket once at the start and this never changes during the recursion. I am not sure how in the recursive go to create a new step task that will use the bucket (Int) from the previous task, as I have to provide a String to get to that task.
  2. the fallback and cleanup of the await calls do not receive the result of rcv (if there is one). In the io.resource function, it works fine as the resource is fixed, however, in my case, the resource might change at any step. How would I go to pass the reference to the current open bucket to these callbacks?
Filagree answered 31/3, 2014 at 22:24 Comment(1)
ok, so in the meantime I have created my own BucketedWriter extends Writer that I can use with resource, but it's pretty imperative (implementing java api).Filagree
A
0

Well one of the options (i.e. time) may be to use simple go on the sink. This one uses time based, essentially reopening file every single hour:

val metronome =  Process.awakeEvery(1.hour).map(true)


def writeFileSink(file:String):Sink[Task,ByteVector] = ???


def timeBasedSink(prefix:String) = {
  def go(index:Int) : Sink[Task,ByteVector] = {
    metronome.wye(write(prefix + "_" + index))(wye.interrupt) ++ go(index + 1)
  }

  go(0)
} 

for the other options (i.e. bytes written) you can use similar technique, just keep signal of bytes written and combine it with Sink.

Akela answered 29/8, 2015 at 4:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.