How can I update a broadcast variable in spark streaming?
Asked Answered
T

6

39

I have, I believe, a relatively common use case for spark streaming:

I have a stream of objects that I would like to filter based on some reference data

Initially, I thought that this would be a very simple thing to achieve using a Broadcast Variable:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

However, albeit infrequently, my reference data will change periodically

I was under the impression that I could modify and re-broadcast my variable on the driver and it would be propagated to each of the workers, however the Broadcast object is not Serializable and needs to be final.

What alternatives do I have? The three solutions I can think of are:

  1. Move the reference data lookup into a forEachPartition or forEachRdd so that it resides entirely on the workers. However the reference data lives beind a REST API so I would also need to somehow store a timer / counter to stop the remote being accessed for every element in the stream.

  2. Restart the Spark Context every time the refdata changes, with a new Broadcast Variable.

  3. Convert the Reference Data to an RDD, then join the streams in such a way that I am now streaming Pair<MyObject, RefData>, though this will ship the reference data with every object.

Tiruchirapalli answered 27/10, 2015 at 15:38 Comment(0)
T
28

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

Your code would look like :

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

This worked for me on multi-cluster as well. Hope this helps

Tetherball answered 21/12, 2016 at 9:31 Comment(5)
Thanks for the solution. Do you know if the updateAndGet will be executed on the Driver node or on the Worker node? The wrapper itself doesn't seem to be broadcasted so I assume it is not available on each Worker node. And if it's executed on the Driver node, does that mean each Worker has to ask the Driver every time it tries to access the value? (That would contradict the idea of using a broadcast variable in the first place)Dracaena
This function is returning the reference of Broadcast type object. A broadcast type object will be having the identifier to the broadcast variable and the num of blocks . When refdataBroadcast.getValue() is called, then if the broadcast identifier is present in the executor memory then it will not be recalculated. All this happens on the executor, however when the sparkContext.broadcast is called then driver comes into picture. So updateAndGet will be executed on the driver node only when the variable is refreshed and re-broadcasted (which only the driver can take care).Tetherball
Any idea how would the same work in the new Structured Streaming API?Perfectible
Since I am facing similar problems I was wondering if there is any chance that anyone made a Python implementation to the code above? I think that this could be a good way to overcome some of the difficulties I encounter at the moment. All help is appreciated, thanks.Aztec
@Tetherball The transform function will run on executors and it will not have access to sparkContext there. What I understand is that updating of broadcast variable can happen only from foreachRDD or foraeachBatch functions and the updated reference to the broadcast is applicable only with in the scope of these functions. Is this understanding correct ?Symons
S
10

Recently faced issue with this. Thought it might be helpful for scala users..

Scala way of doing BroadCastWrapper is like below example.

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

Every time you need to call update function to get new broadcast variable.

Spriggs answered 23/8, 2018 at 7:1 Comment(3)
For those wondering, value exposes the underlying broadcast object for reading only; and writeObject and readObject are required for some special serialization cases. See: Java SerializationDeirdra
@Deirdra just wondering if you need to call close() method inside writeObject and readObject?Stilton
@PetrFedosov No; close is not necessary, nor desirable.Deirdra
D
6

Almost every one that is dealing with streaming applications need a way to weave (filter, lookup etc) reference data (from DB, files etc) into the streaming data. We have a partial solution of the whole two parts

  1. Lookup reference data to be used in streaming operations

    • create CacheLookup object with desired cache TTL
    • wrap that in Broadcast
    • use CacheLookup as part of streaming logic

For most part this works fine, except for the following

  1. Update the reference data

    There is no definitive way achieve this despite the suggestions in these threads, i.e: kill the previous broadcast variable and create new one. Multiple unknowns like what to be expected between these operations.

This is such a common need, it would have helped if there is a way to send info to broadcast variable informing update. With that, it is possible to invalidate the local caches in "CacheLookup"

The second portion of the problem is still not solved. I would be interested if there is any viable approach to this

Disremember answered 29/2, 2016 at 19:18 Comment(0)
R
4

Not sure if you've tried this already but I think an update to a broadcast variable may be achieved without shutting down the SparkContext. Through use of the unpersist() method, copies of the broadcast variable are deleted on each executor and would need to be the variable would need to be rebroadcast in order to be accessed again. For your use case, when you want to update your broadcast, you can:

  1. Wait for your executors to finish on a current series of data

  2. Unpersist the broadcast variable

  3. Update the broadcast variable

  4. Rebroadcast to send the new reference data to the executors

I'm drawing pretty heavily from this post but the person who made the last reply claimed to have gotten it working locally. It's important to note that you probably want to set blocking to true on the unpersist so that you can be sure executors are rid of the old data (so the stale values won't be read again on the next iteration).

Remorseless answered 28/10, 2015 at 5:48 Comment(0)
G
1

I did it in a different way.

I created a broadcast variable and updating it in a different thread on the driver every 5 minutes.

  var broadcastValue: Broadcast[Set[String]] = spark.sparkContext.broadcast(calculateValue())

  def runScheduledThreadToUpdateBroadcastVariable(): Unit = {
    val updateTask = new Runnable {
      def run() = {
        broadcastValue.unpersist(blocking = false)
        broadcastValue = spark.sparkContext.broadcast(calculateValue())
      }
    }

    val executor = new ScheduledThreadPoolExecutor(1)
    executor.scheduleAtFixedRate(updateTask, 1, 5, TimeUnit.MINUTES)
  }
Gateway answered 26/7, 2021 at 8:37 Comment(0)
M
0

Simplest way to achieve , below code reads dimension data folder for every batch but do keep in mind new dimension data values (country names in my case) have to be a new file.

package com.databroccoli.streaming.dimensionupateinstreaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}

object RefreshDimensionInStreaming {

  def main(args: Array[String]) = {

    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")

    var countryDf: Option[DataFrame] = None: Option[DataFrame]

    def updateDimensionDf() = {
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")

      if (countryDf != None) {
        countryDf.get.unpersist()
      }

      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))

      countryDf.get.show()
    }

    factDf1.writeStream
      .outputMode("append")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)

        updateDimensionDf()

        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show

      }
      .start()
      .awaitTermination()

  }

}

Microdont answered 19/3, 2021 at 16:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.