The following way, you will not need to hardcode all the struct fields. But you will need to provide a list of those columns/fields which have the type of array of struct. You have 3 of such fields, we will add one more column, so in total it will be 4.
First of all, the dataframe, similar to yours:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(
('a', 'b'),
(
(
'name_1',
[
([
(
'pname_111',
[
(1111, 'field_1111'),
(1112, 'field_1112')
]
),
(
'pname_112',
[
(1121, 'field_1121'),
(1122, 'field_1122')
]
)
],),
([
(
'pname_121',
[
(1211, 'field_1211'),
(1212, 'field_1212')
]
),
(
'pname_122',
[
(1221, 'field_1221'),
(1222, 'field_1222')
]
)
],)
]
),
(
'name_2',
[
([
(
'pname_211',
[
(2111, 'field_2111'),
(2112, 'field_2112')
]
),
(
'pname_212',
[
(2121, 'field_2121'),
(2122, 'field_2122')
]
)
],),
([
(
'pname_221',
[
(2211, 'field_2211'),
(2212, 'field_2212')
]
),
(
'pname_222',
[
(2221, 'field_2221'),
(2222, 'field_2222')
]
)
],)
]
)
),
)],
'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
# df.printSchema()
# root
# |-- metadata: struct (nullable = true)
# | |-- fld1: string (nullable = true)
# | |-- fld2: string (nullable = true)
# |-- data: struct (nullable = true)
# | |-- node1: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- productlist: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- productvalues: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- pname: string (nullable = true)
# | | | | | | |-- porders: array (nullable = true)
# | | | | | | | |-- element: struct (containsNull = true)
# | | | | | | | | |-- ordernum: integer (nullable = true)
# | | | | | | | | |-- field: string (nullable = true)
# | |-- node2: struct (nullable = true)
# | | |-- name: string (nullable = true)
# | | |-- productlist: array (nullable = true)
# | | | |-- element: struct (containsNull = true)
# | | | | |-- productvalues: array (nullable = true)
# | | | | | |-- element: struct (containsNull = true)
# | | | | | | |-- pname: string (nullable = true)
# | | | | | | |-- porders: array (nullable = true)
# | | | | | | | |-- element: struct (containsNull = true)
# | | | | | | | | |-- ordernum: integer (nullable = true)
# | | | | | | | | |-- field: string (nullable = true)
The answer
Spark 3.1+
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
df = df.withColumn("data", F.array("data.*"))
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
Lower Spark versions:
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn(
"data",
F.struct(
F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
*[f"data.{c}" for c in df.select("data.*").columns if c != n]
)
)
df = df.withColumn("data", F.array("data.*"))
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
Results:
df.printSchema()
# root
# |-- metadata: struct (nullable = true)
# | |-- fld1: string (nullable = true)
# | |-- fld2: string (nullable = true)
# |-- node: string (nullable = false)
# |-- name: string (nullable = true)
# |-- pname: string (nullable = true)
# |-- ordernum: integer (nullable = true)
# |-- field: string (nullable = true)
df.show()
# +--------+-----+------+---------+--------+----------+
# |metadata| node| name| pname|ordernum| field|
# +--------+-----+------+---------+--------+----------+
# | {a, b}|node1|name_1|pname_111| 1111|field_1111|
# | {a, b}|node1|name_1|pname_111| 1112|field_1112|
# | {a, b}|node1|name_1|pname_112| 1121|field_1121|
# | {a, b}|node1|name_1|pname_112| 1122|field_1122|
# | {a, b}|node1|name_1|pname_121| 1211|field_1211|
# | {a, b}|node1|name_1|pname_121| 1212|field_1212|
# | {a, b}|node1|name_1|pname_122| 1221|field_1221|
# | {a, b}|node1|name_1|pname_122| 1222|field_1222|
# | {a, b}|node2|name_2|pname_211| 2111|field_2111|
# | {a, b}|node2|name_2|pname_211| 2112|field_2112|
# | {a, b}|node2|name_2|pname_212| 2121|field_2121|
# | {a, b}|node2|name_2|pname_212| 2122|field_2122|
# | {a, b}|node2|name_2|pname_221| 2211|field_2211|
# | {a, b}|node2|name_2|pname_221| 2212|field_2212|
# | {a, b}|node2|name_2|pname_222| 2221|field_2221|
# | {a, b}|node2|name_2|pname_222| 2222|field_2222|
# +--------+-----+------+---------+--------+----------+
Explanation
nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
Using the above, I decided to save the node title in case you need it.
It first gets a list of nodes from "data" column fields. Using the list, the for
loop creates one more field inside every node struct for the title of the node.
df = df.withColumn("data", F.array("data.*"))
The above converts the "data" column type from struct to array so that in the next step we could easily explode it into columns.
for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)
In the above, the main line is F.expr(f"inline({arr_of_struct})")
. It must be used inside a loop, because it's a generator and you cannot nest them together in Spark. inline
explodes arrays of structs into columns. At this step you have 4 of [array of struct], so 4 inline
expressions will be created.
array
structure in your schema, how would you plan of selecting that with your select statement? would that be a very very long and cryptic query likeproductlist[12]productvalues[7]porders[0].ordernum
? – Hoiden