Spark streaming custom metrics
Asked Answered
P

5

13

I'm working on a Spark Streaming program which retrieves a Kafka stream, does very basic transformation on the stream and then inserts the data to a DB (voltdb if it's relevant). I'm trying to measure the rate in which I insert rows to the DB. I think metrics can be useful (using JMX). However I can't find how to add custom metrics to Spark. I've looked at Spark's source code and also found this thread however it doesn't work for me. I also enabled the JMX sink in the conf.metrics file. What's not working is I don't see my custom metrics with JConsole.

Could someone explain how to add custom metrics (preferably via JMX) to spark streaming? Or alternatively how to measure my insertion rate to my DB (specifically VoltDB)? I'm using spark with Java 8.

Peripteral answered 29/9, 2015 at 12:17 Comment(0)
P
17

Ok after digging through the source code I found how to add my own custom metrics. It requires 3 things:

  1. Create my own custom source. Sort of like this
  2. Enable the Jmx sink in the spark metrics.properties file. The specific line I used is: *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink which enable JmxSink for all instances
  3. Register my custom source in the SparkEnv metrics system. An example of how to do can be seen here - I actually viewed this link before but missed the registration part which prevented me from actually seeing my custom metrics in the JVisualVM

I'm still struggling with how to actually count the number of insertions into VoltDB because the code runs on the executors but that's a subject for a different topic :)

I hope this will help others

Peripteral answered 1/10, 2015 at 9:12 Comment(7)
Did you figure out on how to count anything from executors? I have similar usecase where I write to HTTP endpoint and I want to count bunch of things from executors, but counters won't budge.Froghopper
This was actually quite a long time ago but as far as I remember I sent my metrics from the executors using codahale metrics and they have a Graphite reporter and I just summed everything in graphitePeripteral
Ah ok, thanks for the response. My use case is a little different, writing my own Source and trying to send the events to internal metrics tool.Froghopper
Writing your own metrics is exactly like my answer, the issue is with sending them from the executors?Peripteral
when register my custom metric, I find that MetricsSystem is specified by private[spark], I can't register my custom metric in the main function to MetricsSystem, how can you solve this? thanksMeri
I solved the private[spark] reference by adding a bridge class A, and A under the package org.apache.sparkMeri
@Meri its been a long time since I touched it but if I remember correctly I did the same thing. I wish they would make it a little bit more user friendlyPeripteral
U
7

Groupon have a library called spark-metrics that lets you use a simple (Codahale-like) API on your executors and have the results collated back in the driver and automatically registered in Spark's existing metrics registry. These then get automatically exported along with Spark's built-in metrics when you configure a metric sink as per the Spark docs.

Umeh answered 28/3, 2017 at 12:32 Comment(1)
I'm obviously not stuck on this problem anymore but still, great to know there are some useful libraries for that kind of stuff. Thanks for the tip :)Peripteral
G
4

Below is a working example in Java.
It's tested with StreaminQuery (Unfortunately StreaminQuery does not have ootb metrics like StreamingContext till Spark 2.3.1).

Steps:

Define a custom source in the same package of Source class

package org.apache.spark.metrics.source;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.spark.sql.streaming.StreamingQueryProgress;

/**
 * Metrics source for structured streaming query.
 */
public class StreamingQuerySource implements Source {
    private String appName;
    private MetricRegistry metricRegistry = new MetricRegistry();
    private final Progress progress = new Progress();

    public StreamingQuerySource(String appName) {
        this.appName = appName;
        registerGuage("batchId", () -> progress.batchId());
        registerGuage("numInputRows", () -> progress.numInputRows());
        registerGuage("inputRowsPerSecond", () -> progress.inputRowsPerSecond());
        registerGuage("processedRowsPerSecond", () -> progress.processedRowsPerSecond());
    }

    private <T> Gauge<T> registerGuage(String name, Gauge<T> metric) {
        return metricRegistry.register(MetricRegistry.name(name), metric);
    }

    @Override
    public String sourceName() {
        return String.format("%s.streaming", appName);
    }


    @Override
    public MetricRegistry metricRegistry() {
        return metricRegistry;
    }

    public void updateProgress(StreamingQueryProgress queryProgress) {
        progress.batchId(queryProgress.batchId())
                .numInputRows(queryProgress.numInputRows())
                .inputRowsPerSecond(queryProgress.inputRowsPerSecond())
                .processedRowsPerSecond(queryProgress.processedRowsPerSecond());
    }

    @Data
    @Accessors(fluent = true)
    private static class Progress {
        private long batchId = -1;
        private long numInputRows = 0;
        private double inputRowsPerSecond = 0;
        private double processedRowsPerSecond = 0;
    }
}

Register the source right after SparkContext is created

    querySource = new StreamingQuerySource(getSparkSession().sparkContext().appName());
    SparkEnv.get().metricsSystem().registerSource(querySource);

Update data in StreamingQueryListener.onProgress(event)

  querySource.updateProgress(event.progress());

Config metrics.properties

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=xxx
*.sink.graphite.port=9109
*.sink.graphite.period=10
*.sink.graphite.unit=seconds

# Enable jvm source for instance master, worker, driver and executor
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Sample output in graphite exporter (mapped to prometheus format)

streaming_query{application="local-1538032184639",model="model1",qty="batchId"} 38
streaming_query{application="local-1538032184639",model="model1r",qty="inputRowsPerSecond"} 2.5
streaming_query{application="local-1538032184639",model="model1",qty="numInputRows"} 5
streaming_query{application="local-1538032184639",model=model1",qty="processedRowsPerSecond"} 0.81
Gordon answered 27/9, 2018 at 8:30 Comment(1)
This is gold! Thank you @Gordon for the code sample. This was very helpful and working fine in Spark 3.2.1Verdellverderer
O
3

to insert rows from based on inserts from VoltDB, use accumulators - and then from your driver you can create a listener - maybe something like this to get you started

sparkContext.addSparkListener(new SparkListener() {
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
    stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {

here you have access to those rows combined accumulators and then you can send to your sink..

Optimum answered 7/11, 2015 at 23:55 Comment(3)
I eventually went with gathering metrics for each executor and sending it to Graphana and summing all the information there. The listener is a cool idea though :)Peripteral
@Peripteral can you expand on that? You say you ditched the normal metrics and did the work yourself or you got them to work?Geniality
I didn't ditch the normal metrics. I added some of my own custom metrics on the Spark executors. The problem is that for these custom metrics I needed aggregated results (basically summing accumulators from the different executors) so what I did is send the data from each Spark executor to Graphana and aggregate the results therePeripteral
R
2

here's an excellent tutorial which covers all the setps you need to setup Spark's MetricsSystem with Graphite. That should do the trick:

http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/

Reimpression answered 29/9, 2015 at 15:9 Comment(1)
Thanks Erik for your response, it's quite useful! but did you happen to add your own metrics in the application code? I'm not talking about stuff that are already monitored by spark but other things like rate of insertion of rows into VoltDB within each partition? (or any other custom made metrics in the code). I'm struggling with implementing custom measurements in my applicationPeripteral

© 2022 - 2024 — McMap. All rights reserved.