Spark: how to get the number of written rows?
Asked Answered
E

3

16

I'm wondering if there is a way to know the number of lines written by a Spark save operation. I know that it's enough to do a count on the RDD before writing it, but I'd like to know if there is a way to have the same info without doing it.

Thank you, Marco

Edgerton answered 28/5, 2016 at 7:56 Comment(3)
Its may be duplicate of #28413923Robinet
@amit_kumar I don't think it's a duplicate, I think he wants to count it and save it without having to pass over the data twice.Gerek
This is a question about RDDs, but if you're using DataFrames on Spark 3.3+ check out this answer (Python) or this answer (Scala).Roselleroselyn
F
18

If you really want you can add custom listener and extract number of written rows from outputMetrics. Very simple example can look like this:

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10

but this part of the API is intended for internal usage.

Flaming answered 29/5, 2016 at 12:24 Comment(5)
thank you, but using Spark 1.5.2 it doesn't work. Instead you have to do: recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.get.recordsWrittenEdgerton
Like mentioned it is an internal API so there is no guarantee it will be stable.Flaming
Wouldn't it be better to use atomic recordsWrittenCount instead of synch block?Microdont
is there a way to do the same thing when writing records using: spark.write.avro(...)Bronchiole
for me recordsWritten is always 0 in structuredstreamingTheatrical
G
8

The accepted answer more closely matches the OPs specific needs (as made explicit in various comments), nevertheless this answer will suit the majority.

The most efficient approach is to use an Accumulator: http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val accum = sc.accumulator(0L)

data.map { x =>
  accum += 1
  x
}
.saveAsTextFile(path)

val count = accum.value

You can then wrap this in a useful pimp:

implicit class PimpedStringRDD(rdd: RDD[String]) {
  def saveAsTextFileAndCount(p: String): Long = {
    val accum = rdd.sparkContext.accumulator(0L)

    rdd.map { x =>
      accum += 1
      x
    }
    .saveAsTextFile(p)

    accum.value
  }
}

So you can do

val count = data.saveAsTextFileAndCount(path)
Gerek answered 29/5, 2016 at 11:9 Comment(6)
I know this kind of approach, but I'd like to avoid it for 2 main reasons: using it in a transformation means that the result cannot be trusted in case of some failures; there is anyway a (little) overhead. I was just wondering if there is a counter accessible someway, like there is in mapreduce, since in the web UI the number of rows written is shown...Edgerton
Well, thank you for your answer... even though I keep on wondering how they can show this info on the web UI if there is no internal counter...Edgerton
@mark91 Ah, well, you could clone the UI code and dig through it I guess. Having read the documentation, the code I've given is fine. (Spark says it protects against restarted tasks). It seems what you want to protect against is when an RDD is transformed multiple times, but the code I've given the rdd isn't accessible outside the Pimps scope. It will only accumulates before writing, and only accumulate once.Gerek
count = rdd.count(); rdd.saveAsTextFile(p); Is this anyway better?Robinet
@amit_kumar If RDD is not cached this should be more efficient than separate count because data will be materialized only once.Flaming
As far as I understand modifying accumulator in transformation action (i.e. map in your case) might result in invalid value.Microdont
I
2

If you look at

taskEnd.taskInfo.accumulables

You will see that it is bundled with following AccumulableInfo in ListBuffer in a sequential order.

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)

You can clearly see that number of output rows are on the 7th position of the listBuffer, so the correct way to get the rows being written count is

taskEnd.taskInfo.accumulables(6).value.get

We can get the rows written by following way ( I just modified @zero323's answer)

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
Idelia answered 17/6, 2017 at 12:1 Comment(1)
It is not that simple unfortunately. I had several tasks ie: stage1"s task outputted 100 rows , stage2's task(which writes into the DB) outputted 30rows. So just adding them up naively would result 130 rows instead of 30.Theatrical

© 2022 - 2024 — McMap. All rights reserved.