Treating an SQL ResultSet like a Scala Stream
Asked Answered
S

10

46

When I query a database and receive a (forward-only, read-only) ResultSet back, the ResultSet acts like a list of database rows.

I am trying to find some way to treat this ResultSet like a Scala Stream. This will allow such operations as filter, map, etc., while not consuming large amounts of RAM.

I implemented a tail-recursive method to extract the individual items, but this requires that all items be in memory at the same time, a problem if the ResultSet is very large:

// Iterate through the result set and gather all of the String values into a list
// then return that list
@tailrec
def loop(resultSet: ResultSet,
         accumulator: List[String] = List()): List[String] = {
  if (!resultSet.next) accumulator.reverse
  else {
    val value = resultSet.getString(1)
    loop(resultSet, value +: accumulator)
  }
}
Superabundant answered 9/3, 2012 at 15:25 Comment(4)
Could you use an Iterable instead of a Stream to do what you want?Siliculose
Also a stream will retain the values in memory anyway so you wont actually save memory by the time you reach the end of the list.Arundell
I think without a jdbc flag/option that makes jdbc itself stream the results, you still have one full copy of the data in memory, built by your jdbc api.Diatribe
This is a case where the class ResultSet itself fights the recursion pattern. Here's a solution allowing for a less memory-intensive alternative; i.e. do the per row work within the lambda and initiate the iterator with a .foreach(): https://mcmap.net/q/373165/-scala-exposing-a-jdbc-resultset-through-a-generator-iterableDropkick
T
78

I didn't test it, but why wouldn't it work?

new Iterator[String] {
  def hasNext = resultSet.next()
  def next() = resultSet.getString(1)
}.toStream
Tadd answered 9/3, 2012 at 17:50 Comment(8)
That looks perfect. I'll test it as soon as I get my database set up. I don't even think I need to convert it to a Stream. I can apply map, filter, etc. directly to it.Superabundant
I would like to give you a second up-vote. I've added this code fragment to my Scala snippets library. It's quickly becoming one of my favorites.Superabundant
It's a cool solution but I worry. I think the usual contract of Iterator is that hasNext is side-effect-free. It could be called any number of times between two calls to next. Is there something preventing this from becoming an issue?Potpourri
Good answer , but what is the actual implementation ?Godwin
Didn't work for me with mysql-connector-java version 6. Not sure if I did anything wrong, but my ResultSet got closed on the second next() call, so I could only retrieve one result row. The only way it's not auto-closed before I got all rows seems to be using while (rs.next()) {...}, so I add items individually to a scala.collection.mutable.ListBuffer within the while. Doesn't seem pretty, but couldn't figure out any other way.Bender
@Bender Using new Iterator[String]{ ... }.toList instead of .toStream will fetch the entire set of results immediately, instead of just the first row.Zephyr
This is converting one column from rs into stream. Is there a way of converting a rs with multiple columns into one array/list with multi dimensions?Fernyak
Same here, ResultSet got closed on the second next() call, It allows to get only first recordDemark
D
13

Utility function for @elbowich's answer:

def results[T](resultSet: ResultSet)(f: ResultSet => T) = {
  new Iterator[T] {
    def hasNext = resultSet.next()
    def next() = f(resultSet)
  }
}

Allows you to use type inference. E.g.:

stmt.execute("SELECT mystr, myint FROM mytable")

// Example 1:
val it = results(stmt.resultSet) {
  case rs => rs.getString(1) -> 100 * rs.getInt(2)
}
val m = it.toMap // Map[String, Int]

// Example 2:
val it = results(stmt.resultSet)(_.getString(1))
Denby answered 30/4, 2015 at 8:33 Comment(0)
E
11

This sounds like a great opportunity for an implicit class. First define the implicit class somewhere:

import java.sql.ResultSet

object Implicits {

    implicit class ResultSetStream(resultSet: ResultSet) {

        def toStream: Stream[ResultSet] = {
            new Iterator[ResultSet] {
                def hasNext = resultSet.next()

                def next() = resultSet
            }.toStream
        }
    }
}

Next, simply import this implicit class wherever you have executed your query and defined the ResultSet object:

import com.company.Implicits._

Finally get the data out using the toStream method. For example, get all the ids as shown below:

val allIds = resultSet.toStream.map(result => result.getInt("id"))
Ecstatics answered 29/9, 2016 at 16:31 Comment(3)
Are you sure it works? It fails on DB2 with ResultSet being closed. If this worked in your case perhaps it depends on the specific database brand and/or configuration?Acromegaly
It does but you can only use the stream as long as your connection remains open. If you close your connection, the stream will fail, as will the iterator.Ecstatics
I have a similar answer with slightly more elaboration: https://mcmap.net/q/373165/-scala-exposing-a-jdbc-resultset-through-a-generator-iterableDropkick
H
3

i needed something similar. Building on elbowich's very cool answer, I wrapped it a bit, and instead of the string, I return the result (so you can get any column)

def resultSetItr(resultSet: ResultSet): Stream[ResultSet] = {
    new Iterator[ResultSet] {
      def hasNext = resultSet.next()
      def next() = resultSet
    }.toStream
  }

I needed to access table metadata, but this will work for table rows (could do a stmt.executeQuery(sql) instead of md.getColumns):

 val md = connection.getMetaData()
 val columnItr = resultSetItr( md.getColumns(null, null, "MyTable", null))
      val columns = columnItr.map(col => {
        val columnType = col.getString("TYPE_NAME")
        val columnName = col.getString("COLUMN_NAME")
        val columnSize = col.getString("COLUMN_SIZE")
        new Column(columnName, columnType, columnSize.toInt, false)
      })
Hillie answered 19/8, 2014 at 19:40 Comment(1)
If you don't need to go back on the stream (e.g., forward iteration only), you can just use an iterator. This greatly reduces the memory overhead of using a stream (return an Iterator[ResultSet], and drop the toStream)Hillie
F
2

Because ResultSet is just a mutable object being navigated by next, we need to define our own concept of a next row. We can do so with an input function as follows:

class ResultSetIterator[T](rs: ResultSet, nextRowFunc: ResultSet => T) 
extends Iterator[T] {

  private var nextVal: Option[T] = None

  override def hasNext: Boolean = {
    val ret = rs.next()
    if(ret) {
      nextVal = Some(nextRowFunc(rs))
    } else {
      nextVal = None
    }
    ret
  }

  override def next(): T = nextVal.getOrElse { 
    hasNext 
    nextVal.getOrElse( throw new ResultSetIteratorOutOfBoundsException 
  )}

  class ResultSetIteratorOutOfBoundsException extends Exception("ResultSetIterator reached end of list and next can no longer be called. hasNext should return false.")
}

EDIT: Translate to stream or something else as per above.

Fling answered 17/5, 2016 at 4:57 Comment(0)
L
2
Iterator.continually(rs.next())
  .takeWhile(identity)
  .map(_ => Model(
      id = rs.getInt("id"),
      text = rs.getString("text")
   ))
Latecomer answered 13/3, 2019 at 20:38 Comment(1)
I have a similar answer with slightly more elaboration: https://mcmap.net/q/373165/-scala-exposing-a-jdbc-resultset-through-a-generator-iterableDropkick
D
1

Here is an alternative, similar to Sergey Alaev's and thoredge's solutions, for when we need a solution which honors the Iterator contract where hasNext is side-effect free.

Assuming a function f: ResultSet => T:

Iterator.unfold(resultSet.next()) { hasNext =>
  Option.when(hasNext)(f(resultSet), resultSet.next())
}

I've found it useful to have as map "extension method" on ResultSet.

implicit class ResultSetOps(resultSet: ResultSet) {
    def map[T](f: ResultSet => T): Iterator[T] = {
      Iterator.unfold(resultSet.next()) { hasNext =>
        Option.when(hasNext)(f(resultSet), resultSet.next())
      }
    }
  }
Dundalk answered 30/10, 2019 at 9:31 Comment(0)
B
1

Another variant on the above, which works with Scala 2.12:

implicit class ResultSetOps(resultSet: ResultSet) {
 def map[T](f: ResultSet => T): Iterator[T] =
  Iterator.continually(resultSet).takeWhile(_.next()).map(f)
}
Bukharin answered 19/7, 2020 at 16:11 Comment(1)
I have a similar answer with slightly more elaboration: https://mcmap.net/q/373165/-scala-exposing-a-jdbc-resultset-through-a-generator-iterableDropkick
C
0

This implementation, although longer and clumsier it is in better correspondence with the ResultSet contract. The side-effect has been removed from hasNext(...) and moved into next().

new Iterator[String] {
  private var available = resultSet.next()
  override def hasNext: Boolean = available
  override def next(): String = {
    val string = resultSet.getString(1)
    available = resultSet.next()
    string
  }
}
Chittagong answered 22/5, 2018 at 22:15 Comment(0)
D
0

I think most of above implementations has a nondeterministic hasNext method. Calling it two times will move cursor to the second row. I would advise to use something like that:

  new Iterator[ResultSet] {
    def hasNext = {
      !resultSet.isLast
    }
    def next() = {
      resultSet.next()
      resultSet
    }
  }
Destructor answered 7/12, 2018 at 15:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.