Based on the introduction in Spark 3.0, https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. It should be possible to set "kafka.group.id" to track the offset. For our use case, I want to avoid the potential data loss if the streaming spark job failed and restart. Based on my previous questions, I have a feeling that kafka.group.id in Spark 3.0 is something that will help.
How to specify the group id of kafka consumer for spark structured streaming?
How to ensure no data loss for kafka data ingestion through Spark Structured Streaming?
However, I tried the settings in spark 3.0 as below.
package com.example
/**
* @author ${user.name}
*/
import scala.math.random
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
//import org.apache.spark.sql.hive.HiveContext
import scala.io.Source
import java.nio.charset.StandardCharsets
import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding
object App {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("MY-APP")
.getOrCreate()
import spark.sqlContext.implicits._
spark.catalog.clearCache()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
System.gc()
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mybroker.io:6667")
.option("subscribe", "mytopic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.group.id","MYID")
.load()
df.printSchema()
val schema = new StructType()
.add("id", StringType)
.add("x", StringType)
.add("eventtime", StringType)
val idservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")
val monitoring_df = idservice
.selectExpr("cast(id as string) id",
"cast(x as string) x",
"cast(eventtime as string) eventtime")
val monitoring_stream = monitoring_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)
{
batchDF.persist()
printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)
batchDF.show()
batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
spark.catalog.refreshTable("mytable")
batchDF.unpersist()
spark.catalog.clearCache()
}
}
.start()
.awaitTermination()
}
}
The spark job is tested in the standalone mode by using below spark-submit command, but the same problem exists when I deploy in cluster mode in AWS EMR.
spark-submit --master local[1] \
--files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab \
--driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" \
--conf spark.dynamicAllocation.enabled=false \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" \
--conf spark.yarn.maxAppAttempts=1000 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
--class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
Then, I started the streaming job to read the streaming data from Kafka topic. After some time, I killed the spark job. Then, I wait for 1 hour to start the job again. If I understand correctly, the new streaming data should start from the offset when I killed the spark job. However, it still starts as the latest offset, which caused data loss during the time I stopped the job.
Do I need to configure more options to avoid data loss? Or do I have some misunderstanding for the Spark 3.0? Thanks!
Problem solved
The key issue here is that the checkpoint must be added to the query specifically. To just add checkpoint for SparkContext is not enough. After adding the checkpoint, it is working. In the checkpoint folder, it will create a offset subfolder, which contains offset file, 0, 1, 2, 3.... For each file, it will show the offset information for different partition.
{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}
One suggestion is to put the checkpoint to some external storage, such as s3. It can help recover the offset even when you need to rebuild the EMR cluster itself in case.
file://
is also "HDFS compatible", however that shouldn't be used for distributed state management – Borborygmus