If you look at
taskEnd.taskInfo.accumulables
You will see that it is bundled with following AccumulableInfo
in ListBuffer
in a sequential order.
AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None),
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None),
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None),
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None),
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None),
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)
You can clearly see that number of output rows are on the 7th position of the listBuffer, so the correct way to get the rows being written count is
taskEnd.taskInfo.accumulables(6).value.get
We can get the rows written by following way ( I just modified @zero323's answer)
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long]
}
}
})
sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount