Use Scala Iterator to break up large stream (from string) into chunks using a RegEx match, and then operate on those chunks?
Asked Answered
D

1

6

I'm currently using a not-very-Scala-like approach to parse large Unix mailbox files. I'm still learning the language and would like to challenge myself to find a better way, however, I do not believe I have a solid grasp on just what can be done with an Iterator and how to effectively use it.

I'm currently using org.apache.james.mime4j, and I use the org.apache.james.mime4j.mboxiterator.MboxIterator to get a java.util.Iterator from a file, as so:

 // registers an implementation of a ContentHandler that
 // allows me to construct an object representing an email
 // using callbacks
 val handler: ContentHandler = new MyHandler();

 // creates a parser that parses a SINGLE email from a given InputStream
 val parser: MimeStreamParser = new MimeStreamParser(configBuilder.build());
 // register my handler
 parser.setContentHandler(handler);

 // Get a java.util.Iterator
 val iterator = MboxIterator.fromFile(fileName).build();
 // For each email, process it using above Handler
 iterator.forEach(p => parser.parse(p.asInputStream(Charsets.UTF_8)))

From my understanding, the Scala Iterator is much more robust, and probably a lot more capable of handling something like this, especially because I won't always be able to fit the full file in memory.

I need to construct my own version of the MboxIterator. I dug through the source for MboxIterator and was able to find a good RegEx pattern to use to determine the beginning of individual email messages with, however, I'm drawing a blank from now on.

I created the RegEx like so:

 val MESSAGE_START = Pattern.compile(FromLinePatterns.DEFAULT, Pattern.MULTILINE);

What I want to do (based on what I know so far):

  • Build a FileInputStream from an MBOX file.
  • Use Iterator.continually(stream.read()) to read through the stream
  • Use .takeWhile() to continue to read until the end of the stream
  • Chunk the Stream using something like MESSAGE_START.matcher(someString).find(), or use it to find the indexes the separate the message
  • Read the chunks created, or read the bits in between the indexes created

I feel like I should be able to use map(), find(), filter() and collect() to accomplish this, but I'm getting thrown off by the fact that they only give me Ints to work with.

How would I accomplish this?

EDIT:

After doing some more thinking on the subject, I thought of another way to describe what I think I need to do:

  1. I need to keep reading from the stream until I get a string that matches my RegEx

  2. Maybe group the previously read bytes?

  3. Send it off to be processed somewhere

  4. Remove it from the scope somehow so it doesn't get grouped the next time I run into a match

  5. Continue to read the stream until I find the next match.

  6. Profit???

EDIT 2:

I think I'm getting closer. Using a method like this gets me an iterator of iterators. However, there are two issues: 1. Is this a waste of memory? Does this mean everything gets read into memory? 2. I still need to figure out a way to split by the match, but still include it in the iterator returned.

def split[T](iter: Iterator[T])(breakOn: T => Boolean): 
    Iterator[Iterator[T]] =
        new Iterator[Iterator[T]] {
           def hasNext = iter.hasNext

           def next = {
              val cur = iter.takeWhile(!breakOn(_))
              iter.dropWhile(breakOn)
              cur
            }
 }.withFilter(l => l.nonEmpty)  
Dillondillow answered 14/7, 2019 at 4:17 Comment(4)
Although you've explained very well how you are attempting to solve your problem, you haven't explained what the problem is. Are you trying to convert your existing working code to use Scala iterators (if so, see Java conversion shims)? Are you worried about exception handling or memory use of the Java library? At a glance, the MboxIterator should be properly streaming the file content (as opposed to loading it all into memory)...Kolodgie
@Kolodgie I suppose I'm just not satisfied with any of the solutions I've found. They should be more "scallaic", i.e. more concise. I'm trying to break up a large text file of objects using a regex match that will match the first line of each object. Separating the stream of string lines using the regex matches and processing each group is my central issue.Dillondillow
You're split() method might work but it appears to break the first rule of iterators: "one should never use an iterator after calling a method on it. The two most important exceptions are also the sole abstract methods: next and hasNext." (From the Scaladocs page.)Wiskind
@Wiskind What would be a good alternative?Dillondillow
W
2

If I understand correctly, you want to lazily chunk a large file delimited by a regex recognizable pattern.

You could try to return an Iterator for each request but the correct iterator management would not be trivial.

I'd be inclined to hide all file and iterator management from the client.

class MBox(filePath :String) {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :Option[String] =
    if (itr.hasNext) {
      val sb = new StringBuilder()
      sb.append(itr.next() + "\n")
      while (itr.hasNext && !header.matches(itr.head))
        sb.append(itr.next() + "\n")
      Some(sb.mkString)
    } else {
      file.close()
      None
    }
}

testing:

val mbox = new MBox("so.txt")
mbox.next()
//res0: Option[String] =
//Some(From MAILER-DAEMON Fri Jul  8 12:08:34 2011
//some text AAA
//some text BBB
//)

mbox.next()
//res1: Option[String] =
//Some(From MAILER-DAEMON Mon Jun  8 12:18:34 2012
//small text
//)

mbox.next()
//res2: Option[String] =
//Some(From MAILER-DAEMON Tue Jan  8 11:18:14 2013
//some text CCC
//some text DDD
//)

mbox.next()  //res3: Option[String] = None

There is only one Iterator per open file and only the safe methods are invoked on it. The file text is realized (loaded) only on request and the client gets just what's requested, if available. Instead of all lines in one long String you could return each line as part of a collection, Seq[String], if that's more applicable.


UPDATE: This can be modified for easy iteration.

class MBox(filePath :String) extends Iterator[String] {
  private val file   = io.Source.fromFile(filePath)
  private val itr    = file.getLines().buffered
  private val header = "From .+ \\d{4}".r  //adjust to taste

  def next() :String = {
    val sb = new StringBuilder()
    sb.append(itr.next() + "\n")
    while (itr.hasNext && !header.matches(itr.head))
      sb.append(itr.next() + "\n")
    sb.mkString
  }

  def hasNext: Boolean =
    if (itr.hasNext) true else {file.close(); false}
}

Now you can .foreach(), .map(), .flatMap(), etc. But you can also do dangerous things like .toList which will load the entire file.

Wiskind answered 22/7, 2019 at 23:45 Comment(3)
I haven't had the chance to test this yet. But, reading through it, it makes a lot of sense and seems much cleaner than my implementation. Thank you!Dillondillow
How would I go about adding a foreach or map functionality, since I'm not implementing Iterator? Should I just use a while loop on the MBox value? Or, is this the wrong question because I'm fundamentally misunderstanding something?Dillondillow
Why is it bad to add class MBox(filePath: String) extends Iterator[Option[String]] and def hasNext: Boolean = itr.hasNext so that I can use map and foreach? I feel like something isn't clicking with me about Iterator safety and how it mutates.Dillondillow

© 2022 - 2024 — McMap. All rights reserved.