Below is a working example in Java.
It's tested with StreaminQuery
(Unfortunately StreaminQuery
does not have ootb metrics like StreamingContext
till Spark 2.3.1).
Steps:
Define a custom source in the same package of Source
class
package org.apache.spark.metrics.source;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;
/**
* Metrics source for structured streaming query.
*/
public class StreamingQuerySource implements Source {
private String appName;
private MetricRegistry metricRegistry = new MetricRegistry();
private final Progress progress = new Progress();
public StreamingQuerySource(String appName) {
this.appName = appName;
registerGuage("batchId", () -> progress.batchId());
registerGuage("numInputRows", () -> progress.numInputRows());
registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
}
private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
return metricRegistry.register(MetricRegistry.name(name), metric);
}
@Override
public String sourceName() {
return String.format("%s.streaming", appName);
}
@Override
public MetricRegistry metricRegistry() {
return metricRegistry;
}
public void updateProgress(StreamingQueryProgress queryProgress) {
progress.batchId(queryProgress.batchId())
.numInputRows(queryProgress.numInputRows())
.inputRowsPerSecond(queryProgress.inputRowsPerSecond())
.processedRowsPerSecond(queryProgress.processedRowsPerSecond());
}
@Data
@Accessors(fluent = true)
private static class Progress {
private long batchId = -1;
private long numInputRows = 0;
private double inputRowsPerSecond = 0;
private double processedRowsPerSecond = 0;
}
}
Register the source right after SparkContext is created
querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
SparkEnv.get().metricsSystem().registerSource(querySource);
Update data in StreamingQueryListener.onProgress(event)
querySource.updateProgress(event.progress());
Config metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Sample output in graphite exporter (mapped to prometheus format)
streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81