Found the answer. You can get the stats by using SparkListener.
If your job has no input or output metrics you might get None.get exceptions which you can safely ignore by providing if stmt.
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
Please find the below example.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
val conf = new SparkConf()
.set("spark.cassandra.connection.host", "...")
.set("spark.driver.allowMultipleContexts","true")
.set("spark.master","spark://....:7077")
.set("spark.driver.memory","1g")
.set("spark.executor.memory","10g")
.set("spark.shuffle.spill","true")
.set("spark.shuffle.memoryFraction","0.2")
.setAppName("CassandraTest")
sc.stop
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
var outputWritten = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val metrics = taskEnd.taskMetrics
if(metrics.inputMetrics != None){
inputRecords += metrics.inputMetrics.get.recordsRead}
if(metrics.outputMetrics != None){
outputWritten += metrics.outputMetrics.get.recordsWritten }
}
})
val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))
println("outputWritten",outputWritten)
Result:
scala> println("outputWritten",outputWritten)
(outputWritten,16383)