What is the right way to work with slick's 3.0.0 streaming results and Postgresql?
Asked Answered
T

2

13

I am trying to figure out how to work with slick streaming. I use slick 3.0.0 with postgres driver

The situation is following: server have to give client sequences of data split into chunks limited by size(in bytes). So, I wrote following slick query:

val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))

I combined seq with akka-streams Source, wrote custom PushPullStage, that limits size of data(in bytes) and finishes upstream when it reaches size limit. It works just fine. The problem is - when I look into postgres logs, I see query like that select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

So, at first glance it appears to be much (and unnecessary) database querying going on, only to use a few bytes in each query. What is the right way to do streaming with Slick so as to minimize database querying and to make best use of the data transferred in each query?

Tumid answered 10/7, 2015 at 12:13 Comment(0)
L
17

The "right way" to do streaming with Slick and Postgres includes three things:

  1. Must use db.stream()

  2. Must disable autoCommit in JDBC-driver. One way is to make the query run in a transaction by suffixing .transactionally.

  3. Must set fetchSize to be something else than 0 or else postgres will push the whole resultSet to the client in one go.

Ex:

DB.stream(
  find(0L, 0L)
    .transactionally
    .withStatementParameters(fetchSize = 1000)
).foreach(println)

Useful links:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

Lucknow answered 12/7, 2015 at 18:58 Comment(4)
Thanks for the answer, very helpful. I was confused about the "addendum": isn't the execution context which handles db maintained separately in AsyncExecutor?Raynard
Good to hear! About the addendum: Yes, you are right. It should be the default. I'm removing the addendum since it confuses more than it helps and in hindsight I guess it really had to do with the back pressure mechanics. My consumer was much faster than the network so forking off processing futures as fast as the results arrived was a more suitable solution.Lucknow
Is there an equivalent for the case of MySQL?Safier
Why actually disable auto commit?Safier
P
0

The correct way to stream in Slick is as provided in documentation is

val q = for (c <- coffees) yield c.image
val a = q.result
val p1: DatabasePublisher[Blob] = db.stream(a.withStatementParameters(
  rsType = ResultSetType.ForwardOnly, 
  rsConcurrency = ResultSetConcurrency.ReadOnly, 
  fetchSize = 1000 /*your fetching size*/
).transactionally)
Passel answered 3/1, 2020 at 14:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.