spark 2.2 struct streaming foreach writer jdbc sink lag
Asked Answered
B

3

4

i'm in a project using spark 2.2 struct streaming to read kafka msg into oracle database. the message flow into kafka is about 4000-6000 messages per second .


when using hdfs file system as sink destination ,it just works fine. when using foreach jdbc writer,it will have a huge delay over time . I think the lag is caused by foreach loop .

the jdbc sink class(stand alone class file):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "oracle.jdbc.driver.OracleDriver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }

  def process(value: org.apache.spark.sql.Row): Unit = {
    statement.setString(1, value(0).toString)
    statement.setString(2, value(1).toString)
    statement.setString(3, value(2).toString)
    statement.setString(4, value(3).toString)
    statement.executeUpdate()        
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

the sink part :

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
  .option("subscribe", "rawdb.raw_data")
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .filter(some logic).select(some logic) 
  .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

if I change the last line

.writeStream.format("csv")...

into jdbc foreach sink as following:

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";
val writer = new JDBCSink(url, user, pwd)
.writeStream.foreach(writer).outputMode("append").start()

the lag show up.

I guess the problem most likely caused by foreach loop mechanics-it's not in batch mode deal with like several thousands row in a batch ,as an oracle DBA either, I have fine tuned oracle database side ,mostly the database is waiting for idle events . excessive commit is trying to be avoided by setting connection.setAutoCommit(false) already,any suggestion will be much appreciate.

Bridwell answered 6/11, 2017 at 5:24 Comment(0)
B
3

Although I don't have an actual profile of whats taking the longest time in your application, I would assume it is due to the fact that using ForeachWriter will effectively close and re-open your JDBC connection on each run, because that's how ForeachWriter works.

I would advise that instead of using it, write a custom Sink for JDBC where you control how the connection is opened or closed.

There is an open pull request to add a JDBC driver to Spark which you can take a peek at to see a possible approach to the implementation.

Basalt answered 6/11, 2017 at 7:29 Comment(5)
you are quite right ,i just checked oracle listener log ,heavy connection and close there . so obviously not spark is slow nor oracle is bad ,only the foreach writer is not function for this business need. and since under the hood spark is using rdd (immutable) ,i guess no way to set a internal buffer or something to cache the "foreach" data and submit them in batch . I'm thinking rewrite the cacaulated result into another kafka and using other tools with batch mode to connect to RDBMS.Bridwell
@dalinqin Note that using a custom Sink would solve this issue, as you'd have no problem keeping the connection open.Basalt
could you share some sample script? if using struct streaming anyway ,have to go to foreach writer to write to jdbc destination,right? correct me if I'm wrong . sounds we have to implement the interface ForeachWriter .Bridwell
@dalinqin No, you can implement a custom sink, see the link I provided in my answer.Basalt
I see your point now , means rather than using foreach writer ,I work out my own sink :),pretty hard for me ,will give it a try.Bridwell
B
2

problem solved by injecting the result into another Kafka topic , then wrote another program read from the new topic write them into database on batches .

I think in next spark release,they might provide the jdbc sink and have some parameter setting batch size .

the main code is as following :

write to another topic:

  .writeStream.format("kafka")
  .option("kafka.bootstrap.servers", "x.x.x.x:9092")
  .option("topic", "fastdbtest")
  .option("checkpointLocation", "/user/root/chk")
  .start()

read the topic and write to databases,i'm using c3p0 connection pool

lines.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
      //get a connection from connection pool
      val conn = ConnManager.getManager.getConnection
      val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)")
      try {
        conn.setAutoCommit(false)
        partitionRecords.foreach(record => {
          insertIntoDB(ps, record)
        }
        )
        ps.executeBatch()
        conn.commit()
      } catch {
        case e: Exception =>{}
        // do some log
      } finally {
        ps.close()
        conn.close()
      }
    })
  }
})
Bridwell answered 10/11, 2017 at 16:22 Comment(0)
S
1

Have you tried using a trigger?

I notice when I didn't use a trigger my Foreach Sink open and close several times the connection to the database.

writeStream.foreach(writer).start()

But when I used a trigger, the Foreach only opened and closed the connection one time, processing for example 200 queries and when the micro-batch was ended it closed the connection until a new micro batch was received.

writeStream.trigger(Trigger.ProcessingTime("3 seconds")).foreach(writer).start()

My use case is reading from a Kafka topic with only one partition, so Spark I think is using one partition. I dont know if this solution works the same with multiple Spark partitions but my conclusion here is the Foreach process all the micro-batch at a time (row by row) in the process method and doesn't call open() and close() for every row like a lot of people think.

Selfsown answered 8/2, 2019 at 10:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.