How to write an enumeratee to chunk an enumerator along different boundaries
Asked Answered
T

2

7

So the Play2.0 Enumeratee page shows an example of using a the &> or through method to change an Enumerator[String] into an Enumerator[Int]:

val toInt: Enumeratee[String,Int] = Enumeratee.map[String]{ s => s.toInt }
val ints: Enumerator[Int] = strings &> toInt

There is also an Enumeratee.grouped enumeratee to create an enumerator of chunks from individual elements. That seemed to work fine.

But what I see is that the usual input would be in the form of Array[Byte] (which is returned by Enumerator.fromFile and Enumerator.fromStream). With that in mind I would like to take those Array[Byte] inputs and turns them into an Enumerator[String], for instance where each string is a line (terminated by a '\n'). The boundaries for the lines and the Array[Byte] elements won't usually match. How do I write an enumerator that can convert the chunked arrays into chunked strings?

The purpose is to chunk those lines back to the browser as each Array[Byte] becomes available, and keep the leftover bytes that were not part of a complete line until the next input chunk comes along.

Ideally I'd love to have a method that given an iter: Iteratee[Array[Byte], T] and an Enumerator[Array[Byte]] will give me back an Enumerator[T], where my T elements were parsed by iter.

Additional Info: I had a bit of time to clean up my code and here is a specific example of what I'm trying to do. I have the following iteratees that detect the next line:

import play.api.libs.iteratee._
type AB = Array[Byte]

def takeWhile(pred: Byte => Boolean): Iteratee[AB, AB] = {
  def step(e: Input[AB], acc: AB): Iteratee[AB, AB] = e match {
    case Input.EOF => Done(acc, Input.EOF)
    case Input.Empty => Cont(step(_, acc))
    case Input.El(arr) =>
      val (taking, rest) = arr.span(pred)
      if (rest.length > 0) Done(acc ++ taking, Input.El(rest)) 
      else Cont(step(_, acc ++ taking)) 
  }
  Cont(step(_, Array()))
}

val line = for {
  bytes <- takeWhile(b => !(b == '\n' || b == '\r'))
  _     <- takeWhile(b =>   b == '\n' || b == '\r')
} yield bytes

And what I'd like to do is something like that:

Ok.stream(Enumerator.fromFile(filename) &> chunkBy(line)).as("text/plain")
Thetos answered 27/4, 2012 at 7:34 Comment(0)
S
5

https://github.com/playframework/Play20/commit/f979006a7e2c1c08ca56ee0bae67b5463ee099c1#L3R131 Does something similar to what you are doing. I fixed grouped to take care of the remaining input. The code basically looks like:

val upToNewLine = 
  Traversable.splitOnceAt[String,Char](_ != '\n')  &>>
  Iteratee.consume()

Enumeratee.grouped(upToNewLine)

Also I have to fix repeat in the same way

Semang answered 8/5, 2012 at 21:4 Comment(0)
T
2

Here is what I have after some hours of experimentation. I'm hoping that somebody can come up with a more elegant implementation, as I can barely follow mine.

def chunkBy(chunker: Iteratee[AB, AB]) = new Enumeratee[AB, AB] {
  def applyOn[A](inner: Iteratee[AB, A]): Iteratee[AB, Iteratee[AB, A]] = {
    def step(e: Input[AB], in: Iteratee[AB, A], leftover: Input[AB]):
          Iteratee[AB, Iteratee[AB, A]] = {
      e match {
        case Input.EOF =>
          // if we have a leftover and it's a chunk, then output it
          leftover match {
            case Input.EOF | Input.Empty => Done(in, leftover)
            case Input.El(_) =>
              val lastChunk = Iteratee.flatten(Enumerator.enumInput(leftover)
                >>> Enumerator.eof |>> chunker)
              lastChunk.pureFlatFold(
                done = { (chunk, rest) =>
                  val nextIn = Iteratee.flatten(Enumerator(chunk) |>> in)
                  nextIn.pureFlatFold(
                    done = (a, e2) => Done(nextIn, e2),
                    // nothing more will come
                    cont = k => Done(nextIn, Input.EOF),
                    error = (msg, e2) => Error(msg, e2))
                },
                // not enough content to get a chunk, so drop content
                cont = k => Done(in, Input.EOF),
                error = (msg, e2) => Error(msg, e2))
          }
        case Input.Empty => Cont(step(_, in, leftover))
        case Input.El(arr) =>
          // feed through chunker
          val iChunks = Iteratee.flatten(
            Enumerator.enumInput(leftover)
              >>> Enumerator(arr)
              >>> Enumerator.eof // to extract the leftover
              |>> repeat(chunker))
          iChunks.pureFlatFold(
            done = { (chunks, rest) =>
              // we have our chunks, feed them to the inner iteratee
              val nextIn = Iteratee.flatten(Enumerator(chunks: _*) |>> in)
              nextIn.pureFlatFold(
                done = (a, e2) => Done(nextIn, e2),
                // inner iteratee needs more data
                cont = k => Cont(step(_: Input[AB], nextIn,
                  // we have to ignore the EOF we fed to repeat
                  if (rest == Input.EOF) Input.Empty else rest)),
                error = (msg, e2) => Error(msg, e2))
            },
            // not enough content to get a chunk, continue
            cont = k => Cont(step(_: Input[AB], in, leftover)),
            error = (msg, e2) => Error(msg, e2))
      }
    }
    Cont(step(_, inner, Input.Empty))
  }
}

Here is the definition to my custom repeat:

// withhold the last chunk so that it may be concatenated with the next one
def repeat(chunker: Iteratee[AB, AB]) = {
  def loop(e: Input[AB], ch: Iteratee[AB, AB], acc: Vector[AB], 
        leftover: Input[AB]): Iteratee[AB, Vector[AB]] = e match {
    case Input.EOF => ch.pureFlatFold(
      done = (a, e) => Done(acc, leftover),
      cont = k => k(Input.EOF).pureFlatFold(
        done = (a, e) => Done(acc, Input.El(a)),
        cont = k => sys.error("divergent iter"),
        error = (msg, e) => Error(msg, e)),
      error = (msg, e) => Error(msg, e))
    case Input.Empty => Cont(loop(_, ch, acc, leftover))
    case Input.El(_) =>
      val i = Iteratee.flatten(Enumerator.enumInput(leftover) 
          >>> Enumerator.enumInput(e) |>> ch)
      i.pureFlatFold(
        done = (a, e) => loop(e, chunker, acc :+ a, Input.Empty),
        cont = k => Cont(loop(_, i, acc, Input.Empty)),
        error = (msg, e) => Error(msg, e))
  }
  Cont(loop(_: Input[AB], chunker, Vector(), Input.Empty))
}

This works on a few samples including this one:

 val source = Enumerator(
   "bippy".getBytes,
   "foo\n\rbar\n\r\n\rbaz\nb".getBytes,
   "azam\ntoto\n\n".getBytes)
 Ok.stream(source 
   &> chunkBy(line) 
   &> Enumeratee.map(l => l ++ ".\n".getBytes)
 ).as("text/plain")

Which prints:

bippyfoo.
bar.
baz.
bazam.
toto.
Thetos answered 29/4, 2012 at 6:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.