How to get the number of records written (using DataFrameWriter's save operation)?
Asked Answered
U

3

9

Is there any way to get the number of records written when using spark to save records? While I know it isn't in the spec currently, I'd like to be able to do something like:

val count = df.write.csv(path)

Alternatively, being able to do an inline count (preferably without just using a standard accumulator) of the results of a step would be (almost) as effective. i.e.:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()

Any ideas?

Ulita answered 12/5, 2017 at 9:30 Comment(0)
J
6

I'd use SparkListener that can intercept onTaskEnd or onStageCompleted events that you could use to access task metrics.

Task metrics give you the accumulators Spark uses to display metrics in SQL tab (in Details for Query).

web UI / Details for Query

For example, the following query:

spark.
  read.
  option("header", true).
  csv("../datasets/people.csv").
  limit(10).
  write.
  csv("people")

gives exactly 10 output rows so Spark knows it (and you could too).

enter image description here


You could also explore Spark SQL's QueryExecutionListener:

The interface of query execution listener that can be used to analyze execution metrics.

You can register a QueryExecutionListener using ExecutionListenerManager that's available as spark.listenerManager.

scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager

scala> spark.listenerManager.
clear   clone   register   unregister

I think it's closer to the "bare metal", but haven't used that before.


@D3V (in the comments section) mentioned accessing the numOutputRows SQL metrics using QueryExecution of a structured query. Something worth considering.

scala> :type q
org.apache.spark.sql.DataFrame

scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]

q.queryExecution.executedPlan.metrics("numOutputRows").value
Jennajenne answered 12/5, 2017 at 11:58 Comment(5)
Have you tested SparkListener with SQL? I've done some crude testing before, and it didn't capture SQL write metrics. ExecutionListenerManager is the way to go, and has nice example in the tests.Throes
No, I have not used either interface with SQL. Something I only know in theory...still.Jennajenne
If you're using SparkListenerTaskEnd.taskMetrics.outputMetrics.recordsWritten, keep in mind there was a bug there solved only in Spark 2.3.0Encourage
Doesnt work. q.queryExecution.executedPlan.metrics is totally emptyMoreno
@Moreno I remember you have to execute the query first to access the metrics. They're accumulators so they filled in after tasks (of the q query) are done.Jennajenne
C
2

If you are using DataFrames on Spark 3.3+, then the modern way to do this is with an Observation.

import org.apache.spark.sql.functions.{count, max}
import org.apache.spark.sql.Observation

val data = spark.range(1000)
val observation = Observation("write-metrics")

data
.observe(
    observation,
    count("*").alias("count"),
    max("id").alias("max_id"),
)
.write.parquet("output")

// Must call this after an action on `data` populates the observation.
observation.get

You can compute multiple metrics at once as part of an observation. In this example, we're counting the number of rows written as well as tracking the maximum value for id.

Here's what the observation results look like:

scala> observation.get
res3: scala.collection.immutable.Map[String,Any] =
        Map(count -> 1000, max_id -> 999)
Chuipek answered 14/2 at 18:39 Comment(0)
A
1

You could use an accumulator to count the rows as they are written out:

val count = df.sparkSession.sparkContext.longAccumulator("row count")
val counted = df.map(row => { count.add(1); row })(df.encoder)
counted.write.parquet("my file")
count.value

Since it has to decode/encode each row for us, I'm not sure this is faster than just checking the output:

df.sparkSession.read.parquet("my file").count

Parquet stores the row counts as metadata, so it might be fast enough to check.

Apt answered 11/11, 2021 at 8:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.