Spark - How to add an element to an array of structs
Asked Answered
H

3

8

Having this schema:

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)

How can we add a new field like that?

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: integer (nullable = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)

I've already done that with a simple struct (more detail at the bottom of this post), but I'm not able to do it with an array of struct.

This is the code in order to test it:

val schema = new StructType()
    .add("Elems", ArrayType(new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    ))

val dataDS = Seq("""
{
  "Elems": [ {"Elem":1, "Desc": "d1"}, {"Elem":2, "Desc": "d2"}, {"Elem":3, "Desc": "d3"} ]
}
""").toDS()

val df = spark.read.schema(schema).json(dataDS.rdd)

df.show(false)
+---------------------------+
|Elems                      |
+---------------------------+
|[[1, d1], [2, d2], [3, d3]]|
+---------------------------+

Once we have the DF, the best approach I have is creating a Struct of arrays for each element:

val mod_df = df.withColumn("modif_elems", 
     struct(
         array(lit("")).as("New_field"),
         col("Elems.Elem"),
         col("Elems.Desc")
                            ))

mod_df.show(false)
+---------------------------+-----------------------------+
|Elems                      |modif_elems                  |
+---------------------------+-----------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[], [1, 2, 3], [d1, d2, d3]]|
+---------------------------+-----------------------------+


mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: array (nullable = false)
 |    |    |-- element: string (containsNull = false)
 |    |-- Elem: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)
 |    |-- Desc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

We don't lose any data but this is not exactly what I want.

Update: Workaround in PD1.


Bonus track: Modifying a struct (not in an array)

The code is almost the same but now we don't have an array of struct, so it's easier to modify the struct:

val schema = new StructType()
    .add("Elems", new StructType()
        .add("Elem", IntegerType)
        .add("Desc", StringType)
    )


val dataDS = Seq("""
{
  "Elems": {"Elem":1, "Desc": "d1"}
}
""").toDS()    


val df = spark.read.schema(schema).json(dataDS.rdd)
df.show(false)
+-------+
|Elems  |
+-------+
|[1, d1]|
+-------+

df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)

In this case, in order to add the field we need to create another struct:

val mod_df = df
    .withColumn("modif_elems", 
                struct(
                    lit("").alias("New_field"),
                    col("Elems.Elem"),
                    col("Elems.Desc")
                    )
               )

mod_df.show
+-------+-----------+
|  Elems|modif_elems|
+-------+-----------+
|[1, d1]|  [, 1, d1]|
+-------+-----------+

mod_df.printSchema
root
 |-- Elems: struct (nullable = true)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)
 |-- modif_elems: struct (nullable = false)
 |    |-- New_field: string (nullable = false)
 |    |-- Elem: integer (nullable = true)
 |    |-- Desc: string (nullable = true)


PD1:

Ok, I have used arrays_zip Spark SQL function (new in 2.4.0 version) and it's nearly what I want but I can't see how we can change the elements names (as or alias doesn't work here):

val mod_df = df.withColumn("modif_elems", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("Elem"),
            col("Elems.Desc").alias("Desc")
                    )
        )

mod_df.show(false)
+---------------------------+---------------------------------+
|Elems                      |modif_elems                      |
+---------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+

mod_df.printSchema
root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)

Struct modif_elems shoud contains 3 elements named New_field, Elem and Desc, not 0, 1 and 2.

Holdall answered 14/1, 2019 at 19:57 Comment(0)
P
9

Spark 3.1+

withField can be used (together with transform)

  • Scala

    Input:

    val df = spark.createDataFrame(Seq((1, "2")))
        .select(
            array(struct(
                col("_1").as("Elem"),
                col("_2").as("Desc")
            )).as("Elems")
        )
    df.printSchema()
    // root
    //  |-- Elems: array (nullable = true)
    //  |    |-- element: struct (containsNull = true)
    //  |    |    |-- Elem: integer (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    

    Script

    val df2 = df.withColumn(
        "Elems",
        transform(
            $"Elems",
            x => x.withField("New_field", lit(3))
        )
    )
    df2.printSchema()
    // root
    //  |-- Elems: array (nullable = false)
    //  |    |-- element: struct (containsNull = false)
    //  |    |    |-- Elem: long (nullable = true)
    //  |    |    |-- Desc: string (nullable = true)
    //  |    |    |-- New_field: integer (nullable = false)
    
  • PySpark

    Input:

    from pyspark.sql import functions as F
    df = spark.createDataFrame([(1, "2",)]) \
        .select(
            F.array(F.struct(
                F.col("_1").alias("Elem"),
                F.col("_2").alias("Desc")
            )).alias("Elems")
        )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = true)
    #  |    |-- element: struct (containsNull = true)
    #  |    |    |-- Elem: integer (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    

    Script:

    df = df.withColumn(
        "Elems",
        F.transform(
            F.col("Elems"),
            lambda x: x.withField("New_field", F.lit(3))
        )
    )
    df.printSchema()
    # root
    #  |-- Elems: array (nullable = false)
    #  |    |-- element: struct (containsNull = false)
    #  |    |    |-- Elem: long (nullable = true)
    #  |    |    |-- Desc: string (nullable = true)
    #  |    |    |-- New_field: integer (nullable = false)
    
Polynesian answered 18/7, 2022 at 7:18 Comment(0)
H
3

Solution here. We need to do use arrays_zip and then rename the obtained column with the renamed schema of the struct (elem_struct_recomposed):


val elem_struct_recomposed = new StructType()
  .add("New_field", StringType)
  .add("ElemRenamed", IntegerType)
  .add("DescRenamed", StringType)


val mod_df = df
    .withColumn("modif_elems_NOT_renamed", 
        arrays_zip(
            array(lit("")).as("New_field"),
            col("Elems.Elem").as("ElemRenamed"),
            col("Elems.Desc").alias("DescRenamed")
                    ))
    .withColumn("modif_elems_renamed", 
               $"modif_elems_NOT_renamed".cast(ArrayType(elem_struct_recomposed)))


mod_df.show(false)
mod_df.printSchema

+---------------------------+---------------------------------+---------------------------------+
|Elems                      |modif_elems_NOT_renamed          |modif_elems_renamed              |
+---------------------------+---------------------------------+---------------------------------+
|[[1, d1], [2, d2], [3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|[[, 1, d1], [, 2, d2], [, 3, d3]]|
+---------------------------+---------------------------------+---------------------------------+

root
 |-- Elems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Elem: integer (nullable = true)
 |    |    |-- Desc: string (nullable = true)
 |-- modif_elems_NOT_renamed: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- 0: string (nullable = true)
 |    |    |-- 1: integer (nullable = true)
 |    |    |-- 2: string (nullable = true)
 |-- modif_elems_renamed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- New_field: string (nullable = true)
 |    |    |-- ElemRenamed: integer (nullable = true)
 |    |    |-- DescRenamed: string (nullable = true)

Holdall answered 24/1, 2019 at 16:7 Comment(6)
Hello @rvilla, how can we use arrays_zip on Spark 2.3? any help would be much appreciated.Interplanetary
Sorry, I did not try it, but you could implement the functions as you can see in the github source code: github.com/apache/spark/blob/master/sql/core/src/main/scala/org/… Take note that ArraysZip is a case class from org/apache/spark/sql/catalyst/expressions/collectionOperations.scala. You could see it properly using your favourite code editor and opening the arrays_zip function source code.Holdall
@Holdall for spark 2.3 please see my answer to #61920472Newland
This method produces nulls in the "New_Field" if "Elems" column has more than 1 elementSledge
What is elem_struct_recomposed?Parental
Sorry sohil, I forgot to specify the renamed schema of the struct. Fixed (I just tested it on a local Spark 3.1.2 with no problems). Thanks!Holdall
A
0

You can try this.

df2 = spark.createDataFrame(df.rdd, schema).limit(0).unionByName(df, allowMissingColumn= true)
Arkwright answered 22/5, 2024 at 9:51 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.