exception: org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update
Asked Answered
C

1

6

I have a simple Spark job that streams data to a Delta table. The table is pretty small and is not partitioned.

A lot of small parquet files are created.

As recommended in the documentation (https://docs.delta.io/1.0.0/best-practices.html) I added a compaction job that runs once a day.

    val path = "..."
    val numFiles = 16
    
    spark.read
     .format("delta")
     .load(path)
     .repartition(numFiles)
     .write
     .option("dataChange", "false")
     .format("delta")
     .mode("overwrite")
     .save(path)

Every time the compaction job runs the streaming job gets the following exception:

org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.

I tried to add the following config parameters to the streaming job:

spark.databricks.delta.retryWriteConflict.enabled = true  # would be false by default
spark.databricks.delta.retryWriteConflict.limit = 3  # optionally limit the maximum amout of retries

It doesn't help.

Any idea how to solve the problem?

Coastguardsman answered 12/8, 2021 at 13:22 Comment(3)
Which Delta Lake version are you using?Mcfarland
scalaMajorVersion=2.12 scalaMinorVersion=11 SPARK_VERSION=3.1.1 DELTA_VERSION=1.0.0 KAFKA_VERSION=2.3.1Coastguardsman
As you are running a structured streaming job, one possibility is that the volume of small files is interfering with the compaction job. Could you perhaps try using .option("dataChange", "false") and then run the compaction from a different cluster to see if that works?Neill
F
2

When you're streaming the data in, small files are being created (additive) and these files are being referenced in your delta log (an update). When you perform your compaction, you're trying to resolve the small files overhead by collating the data into larger files (currently 16). These large files are created alongside the small, but the change occurs when the delta log is written to. That is, transactions 0-100 make 100 small files, compaction occurs, and your new transaction tells you to now refer to the 16 large files instead. The problem is, you've already had transactions 101-110 occur from the streaming job while the compaction was occurring. After all, you're compacting ALL of your data and you essentially have a merge conflict.

The solution is is to go to the next step in the best practices and only compact select partitions using:

.option("replaceWhere", partition)

When you compact every day, the partition variable should represent the partition of your data for yesterday. No new files are being written to that partition, and the delta log can identify that the concurrent changes will not apply to currently incoming data for today.

Floorwalker answered 16/3, 2022 at 7:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.