Scala: how to traverse stream/iterator collecting results into several different collections
Asked Answered
E

3

2

I'm going through log file that is too big to fit into memory and collecting 2 type of expressions, what is better functional alternative to my iterative snippet below?

def streamData(file: File, errorPat: Regex, loginPat: Regex): List[(String, String)]={
  val lines : Iterator[String] = io.Source.fromFile(file).getLines()

  val logins: mutable.Map[String, String] = new mutable.HashMap[String, String]()
  val errors: mutable.ListBuffer[(String, String)] = mutable.ListBuffer.empty

  for (line <- lines){
    line match {
      case errorPat(date,ip)=> errors.append((ip,date))
      case loginPat(date,user,ip,id) =>logins.put(ip, id)
      case _ => ""
    }
  }

  errors.toList.map(line => (logins.getOrElse(line._1,"none") + " " + line._1,line._2))
}
Evelineevelinn answered 5/2, 2013 at 5:10 Comment(4)
As a matter of good style, I suggest your snippet be compilable. In this case, it is not. Some imports are required: import java.io.File, import scala.util.matching.Regex and import scala.collection.mutable.Wizard
I tried to avoid unnecessary wrappings by Class or Object plus not important for question regexps, whole compilable example will contain too much "noise".Evelineevelinn
Is there any chance a line will match both patterns?Edric
No, patterns are totally differentEvelineevelinn
G
1

Here is a possible solution:

def streamData(file: File, errorPat: Regex, loginPat: Regex): List[(String,String)] = {
  val lines = Source.fromFile(file).getLines
  val (err, log) = lines.collect {
        case errorPat(inf, ip) => (Some((ip, inf)), None)
        case loginPat(_, _, ip, id) => (None, Some((ip, id)))
      }.toList.unzip
  val ip2id = log.flatten.toMap
  err.collect{ case Some((ip,inf)) => (ip2id.getOrElse(ip,"none") + "" + ip, inf) }
}
Galvanism answered 5/2, 2013 at 9:23 Comment(1)
Tried using collect myself, just didn't know about Option/Some/None pattern. Thanks, for this simple and logical solution.Evelineevelinn
R
0

Corrections:
1) removed unnecessary types declarations
2) tuple deconstruction instead of ulgy ._1
3) left fold instead of mutable accumulators
4) used more convenient operator-like methods :+ and +

def streamData(file: File, errorPat: Regex, loginPat: Regex): List[(String, String)] = {
    val lines = io.Source.fromFile(file).getLines()

    val (logins, errors) =
        ((Map.empty[String, String], Seq.empty[(String, String)]) /: lines) {
            case ((loginsAcc, errorsAcc), next) =>
                next match {
                    case errorPat(date, ip) => (loginsAcc, errorsAcc :+ (ip -> date))
                    case loginPat(date, user, ip, id) => (loginsAcc + (ip -> id) , errorsAcc)
                    case _ => (loginsAcc, errorsAcc)
                }
        }

// more concise equivalent for
// errors.toList.map { case (ip, date) => (logins.getOrElse(ip, "none") + " " + ip) -> date }
    for ((ip, date) <- errors.toList) 
    yield (logins.getOrElse(ip, "none") + " " + ip) -> date


}
Riddell answered 5/2, 2013 at 5:18 Comment(4)
Looks like the answer for my question, though it's really mind blowing usage of fold left, gonna wait for simpler alternative a little before accept.Evelineevelinn
For me, this is a little too hardcoded to take two patterns - adding a third means adding a new accummulator etc. Have you considered using Iteratees? (composable consumers of the output of an iterator - see jsuereth.com/scala/2012/02/29/iteratees.html for some discussion)Edric
Yes, using Iteratees will be more general solution. But in this case (parsing log files) it may be overkill. And my knowledge about it is not enough to write a good answer. If you can do it, it will be interesting for me too.Riddell
No time now (and I've never actually written anything with Iteratees) but I will try to find some time, it looks interesting....Edric
G
0

I have a few suggestions:

  • Instead of a pair/tuple, it's often better to use your own class. It gives meaningful names to both the type and its fields, which makes the code much more readable.
  • Split the code into small parts. In particular, try to decouple pieces of code that don't need to be tied together. This makes your code easier to understand, more robust, less prone to errors and easier to test. In your case it'd be good to separate producing your input (lines of a log file) and consuming it to produce a result. For example, you'd be able to make automatic tests for your function without having to store sample data in a file.

As an example and exercise, I tried to make a solution based on Scalaz iteratees. It's a bit longer (includes some auxiliary code for IteratorEnumerator) and perhaps it's a bit overkill for the task, but perhaps someone will find it helpful.

import java.io._;
import scala.util.matching.Regex
import scalaz._
import scalaz.IterV._

object MyApp extends App {
  // A type for the result. Having names keeps things
  // clearer and shorter.
  type LogResult = List[(String,String)]

  // Represents a state of our computation. Not only it
  // gives a name to the data, we can also put here
  // functions that modify the state.  This nicely
  // separates what we're computing and how.
  sealed case class State(
    logins: Map[String,String],
    errors: Seq[(String,String)]
  ) {
    def this() = {
      this(Map.empty[String,String], Seq.empty[(String,String)])
    }

    def addError(date: String, ip: String): State =
      State(logins, errors :+ (ip -> date));
    def addLogin(ip: String, id: String): State =
      State(logins + (ip -> id), errors);

    // Produce the final result from accumulated data.
    def result: LogResult =
      for ((ip, date) <- errors.toList)
        yield (logins.getOrElse(ip, "none") + " " + ip) -> date
  }

  // An iteratee that consumes lines of our input. Based
  // on the given regular expressions, it produces an
  // iteratee that parses the input and uses State to
  // compute the result.
  def logIteratee(errorPat: Regex, loginPat: Regex):
            IterV[String,List[(String,String)]] = {
    // Consumes a signle line.
    def consume(line: String, state: State): State =
      line match {
        case errorPat(date, ip)           => state.addError(date, ip);
        case loginPat(date, user, ip, id) => state.addLogin(ip, id);
        case _                            => state
      }

    // The core of the iteratee. Every time we consume a
    // line, we update our state. When done, compute the
    // final result.
    def step(state: State)(s: Input[String]): IterV[String, LogResult] =
      s(el    = line => Cont(step(consume(line, state))),
        empty = Cont(step(state)),
        eof   = Done(state.result, EOF[String]))
    // Return the iterate waiting for its first input.
    Cont(step(new State()));
  }


  // Converts an iterator into an enumerator. This
  // should be more likely moved to Scalaz.
  // Adapted from scalaz.ExampleIteratee
  implicit val IteratorEnumerator = new Enumerator[Iterator] {
    @annotation.tailrec def apply[E, A](e: Iterator[E], i: IterV[E, A]): IterV[E, A] = {
      val next: Option[(Iterator[E], IterV[E, A])] =
        if (e.hasNext) {
          val x = e.next();
          i.fold(done = (_, _) => None, cont = k => Some((e, k(El(x)))))
        } else
          None;
       next match {
         case None => i
         case Some((es, is)) => apply(es, is)
       }
    }
  }


  // main ---------------------------------------------------
  {
    // Read a file as an iterator of lines:
    // val lines: Iterator[String] =
    //    io.Source.fromFile("test.log").getLines();

    // Create our testing iterator:
    val lines: Iterator[String] = Seq(
      "Error: 2012/03 1.2.3.4",
      "Login: 2012/03 user 1.2.3.4 Joe",
      "Error: 2012/03 1.2.3.5",
      "Error: 2012/04 1.2.3.4"
    ).iterator;

    // Create an iteratee.
    val iter = logIteratee("Error: (\\S+) (\\S+)".r, 
                           "Login: (\\S+) (\\S+) (\\S+) (\\S+)".r);
    // Run the the iteratee against the input
    // (the enumerator is implicit)
    println(iter(lines).run);
  }
}
Gay answered 5/2, 2013 at 19:50 Comment(1)
Too much overhead for an operational task, in addition I'd like pure scala solution for quite simple task without scalaz inclusion, though your solution can probably be better in full-scaled scala project.Evelineevelinn

© 2022 - 2024 — McMap. All rights reserved.