extending the excelent example of sairam krish
from typing import Optional
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, coalesce
def diff_action(
current_name: Optional[str],
current_type: Optional[str],
future_name: Optional[str],
future_type: Optional[str],
):
if current_name is None and future_name is not None:
return "add_column"
if future_name is None and current_name is not None:
return "drop_column"
if current_name == future_name and current_type != future_type:
return "alter_column"
return None
diff_action_udf = udf(diff_action, StringType())
def schema_diff(spark: SparkSession, current: DataFrame, future: DataFrame):
current_types = spark.createDataFrame(
current.dtypes, ["current_name", "current_type"]
)
future_types = spark.createDataFrame(future.dtypes, ["future_name", "future_type"])
difference = (
current_types.join(
future_types,
current_types.current_name == future_types.future_name,
how="outer",
)
.where(
current_types.current_type.isNull()
| future_types.future_type.isNull()
| (current_types.current_type != future_types.future_type)
)
.select(
current_types.current_name,
current_types.current_type,
future_types.future_name,
future_types.future_type,
)
.withColumn("column", coalesce("current_name", "future_name"))
.withColumn("type", coalesce("future_type", "current_type"))
.withColumn(
"action",
diff_action_udf(
"current_name", "current_type", "future_name", "future_type"
),
)
.drop("current_name", "current_type", "future_name", "future_type")
)
return difference
the according unit tests are below
import chispa
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField("column", StringType()),
StructField("type", StringType()),
StructField("action", StringType()),
]
)
def test_schema_diff_equal(spark):
# ARRANGE
current = spark.createDataFrame([(1, "name", "address")], ["id", "name", "active"])
future = spark.createDataFrame([(1, "name", "address")], ["id", "name", "active"])
# ACT
diff = schema_diff(spark, current, future)
# ASSERT
expect = spark.createDataFrame([], schema)
chispa.assert_df_equality(
diff,
expect,
ignore_row_order=True,
)
def test_schema_diff_add_column(spark):
# ARRANGE
current = spark.createDataFrame([(1, "name")], ["id", "name"])
future = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
# ACT
diff = schema_diff(spark, current, future)
# ASSERT
expect = spark.createDataFrame([("active", "string", "add_column")], schema)
chispa.assert_df_equality(
diff,
expect,
ignore_row_order=True,
)
def test_schema_diff_drop_column(spark):
# ARRANGE
current = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
future = spark.createDataFrame([(1, "name")], ["id", "name"])
# ACT
diff = schema_diff(spark, current, future)
# ASSERT
expect = spark.createDataFrame([("active", "string", "drop_column")], schema)
chispa.assert_df_equality(
diff,
expect,
ignore_row_order=True,
)
def test_schema_diff_name_change(spark):
# ARRANGE
current = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
future = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "isActive"])
# ACT
diff = schema_diff(spark, current, future)
# ASSERT
expect = spark.createDataFrame(
[("active", "string", "drop_column"), ("isActive", "string", "add_column")],
schema,
)
chispa.assert_df_equality(
diff,
expect,
ignore_row_order=True,
)
def test_schema_diff_type_change(spark):
# ARRANGE
current = spark.createDataFrame([(1, "name", "yes")], ["id", "name", "active"])
future = spark.createDataFrame([(1, "name", True)], ["id", "name", "active"])
# ACT
diff = schema_diff(spark, current, future)
# ASSERT
expect = spark.createDataFrame([("active", "boolean", "alter_column")], schema)
chispa.assert_df_equality(
diff,
expect,
ignore_row_order=True,
)