Select columns from a highly nested data
Asked Answered
C

3

6

For the dataframe below, which was generated from an avro file, I'm trying to get the column names as a list or other format so that I can use it in a select statement. node1 and node2 have the same elements. For example I understand that we could do df.select(col('data.node1.name')), but I'm not sure

  1. how to select all columns at once without hardcode all the column names, and
  2. how to handle the nested part. I think to make it readable, the productvalues and porders should be selected into separate individual dataframes/tables?

Input schema:

root
  |-- metadata: struct
  |...
  |-- data :struct 
  |    |--node1 : struct
  |    |   |--name : string
  |    |   |--productlist: array
  |    |        |--element : struct
       |              |--productvalues: array
       |                   |--element : struct
       |                         |-- pname:string
       |                         |-- porders:array
       |                                |--element : struct
       |                                      |-- ordernum: int
       |                                      |-- field: string
       |--node2 : struct
  |        |--name : string
  |        |--productlist: array
  |             |--element : struct
                      |--productvalues: array
                          |--element : struct
                                 |-- pname:string
                                 |-- porders:array
                                        |--element : struct
                                              |-- ordernum: int
                                              |-- field: string
Calamus answered 16/9, 2022 at 3:26 Comment(3)
since you have plenty of array structure in your schema, how would you plan of selecting that with your select statement? would that be a very very long and cryptic query like productlist[12]productvalues[7]porders[0].ordernum?Hoiden
@Hoiden that's why I'm wondering how to handle the nested part, e.g. if array should be separated to another two tables and select only the outer level in the select statement.Calamus
so what is precisely the desired results here? I don't think that there is an easy way for simplifying your select statement for such structure hence you will need first to transform your data, i.e explode arraysKevel
M
1

The following way, you will not need to hardcode all the struct fields. But you will need to provide a list of those columns/fields which have the type of array of struct. You have 3 of such fields, we will add one more column, so in total it will be 4.

First of all, the dataframe, similar to yours:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [(
        ('a', 'b'),
        (
            (
                'name_1',
                [
                    ([
                        (
                            'pname_111',
                            [
                                (1111, 'field_1111'),
                                (1112, 'field_1112')
                            ]
                        ),
                        (
                            'pname_112',
                            [
                                (1121, 'field_1121'),
                                (1122, 'field_1122')
                            ]
                        )
                    ],),
                    ([
                        (
                            'pname_121',
                            [
                                (1211, 'field_1211'),
                                (1212, 'field_1212')
                            ]
                        ),
                        (
                            'pname_122',
                            [
                                (1221, 'field_1221'),
                                (1222, 'field_1222')
                            ]
                        )
                    ],)
                ]
            ),
            (
                'name_2',
                [
                    ([
                        (
                            'pname_211',
                            [
                                (2111, 'field_2111'),
                                (2112, 'field_2112')
                            ]
                        ),
                        (
                            'pname_212',
                            [
                                (2121, 'field_2121'),
                                (2122, 'field_2122')
                            ]
                        )
                    ],),
                    ([
                        (
                            'pname_221',
                            [
                                (2211, 'field_2211'),
                                (2212, 'field_2212')
                            ]
                        ),
                        (
                            'pname_222',
                            [
                                (2221, 'field_2221'),
                                (2222, 'field_2222')
                            ]
                        )
                    ],)
                ]
            )
        ),
    )],
    'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
# df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- data: struct (nullable = true)
#  |    |-- node1: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)
#  |    |-- node2: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)

The answer

  • Spark 3.1+

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
    df = df.withColumn("data", F.array("data.*"))
    
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
            F.expr(f"inline({arr_of_struct})")
        )
    
  • Lower Spark versions:

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn(
            "data",
            F.struct(
                F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
                *[f"data.{c}" for c in df.select("data.*").columns if c != n]
            )
        )
    df = df.withColumn("data", F.array("data.*"))
    
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
            F.expr(f"inline({arr_of_struct})")
        )
    

Results:

df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- node: string (nullable = false)
#  |-- name: string (nullable = true)
#  |-- pname: string (nullable = true)
#  |-- ordernum: integer (nullable = true)
#  |-- field: string (nullable = true)

df.show()
# +--------+-----+------+---------+--------+----------+
# |metadata| node|  name|    pname|ordernum|     field|
# +--------+-----+------+---------+--------+----------+
# |  {a, b}|node1|name_1|pname_111|    1111|field_1111|
# |  {a, b}|node1|name_1|pname_111|    1112|field_1112|
# |  {a, b}|node1|name_1|pname_112|    1121|field_1121|
# |  {a, b}|node1|name_1|pname_112|    1122|field_1122|
# |  {a, b}|node1|name_1|pname_121|    1211|field_1211|
# |  {a, b}|node1|name_1|pname_121|    1212|field_1212|
# |  {a, b}|node1|name_1|pname_122|    1221|field_1221|
# |  {a, b}|node1|name_1|pname_122|    1222|field_1222|
# |  {a, b}|node2|name_2|pname_211|    2111|field_2111|
# |  {a, b}|node2|name_2|pname_211|    2112|field_2112|
# |  {a, b}|node2|name_2|pname_212|    2121|field_2121|
# |  {a, b}|node2|name_2|pname_212|    2122|field_2122|
# |  {a, b}|node2|name_2|pname_221|    2211|field_2211|
# |  {a, b}|node2|name_2|pname_221|    2212|field_2212|
# |  {a, b}|node2|name_2|pname_222|    2221|field_2221|
# |  {a, b}|node2|name_2|pname_222|    2222|field_2222|
# +--------+-----+------+---------+--------+----------+

Explanation

nodes = df.select("data.*").columns
for n in nodes:
    df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))

Using the above, I decided to save the node title in case you need it. It first gets a list of nodes from "data" column fields. Using the list, the for loop creates one more field inside every node struct for the title of the node.

df = df.withColumn("data", F.array("data.*"))

The above converts the "data" column type from struct to array so that in the next step we could easily explode it into columns.

for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
    df = df.select(
        *[c for c in df.columns if c != arr_of_struct],
        F.expr(f"inline({arr_of_struct})")
    )

In the above, the main line is F.expr(f"inline({arr_of_struct})"). It must be used inside a loop, because it's a generator and you cannot nest them together in Spark. inline explodes arrays of structs into columns. At this step you have 4 of [array of struct], so 4 inline expressions will be created.

Multure answered 20/9, 2022 at 10:2 Comment(0)
P
2

Instead of collecting all data into a table, I would recommend you making more tables for each list. In order to get values from list you can use the explode function.For instance, for making productlist table:

productlist = df.select(col('data.node1.name').alias("name"), explode(col('data.node1.productlist'))).alias("first_explode"))

In the next step, you can use the df of productlist , and run the below sector:

productValue=df.select(col('productlist.name'),col('productlist.node1.first_explode.element'),explode(col('productlist.node1.first_explode.productvalues')).alias("second_explode"))

and so on. You can also get some help from the this link as well.

Protocol answered 19/9, 2022 at 11:55 Comment(0)
M
1

The following way, you will not need to hardcode all the struct fields. But you will need to provide a list of those columns/fields which have the type of array of struct. You have 3 of such fields, we will add one more column, so in total it will be 4.

First of all, the dataframe, similar to yours:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [(
        ('a', 'b'),
        (
            (
                'name_1',
                [
                    ([
                        (
                            'pname_111',
                            [
                                (1111, 'field_1111'),
                                (1112, 'field_1112')
                            ]
                        ),
                        (
                            'pname_112',
                            [
                                (1121, 'field_1121'),
                                (1122, 'field_1122')
                            ]
                        )
                    ],),
                    ([
                        (
                            'pname_121',
                            [
                                (1211, 'field_1211'),
                                (1212, 'field_1212')
                            ]
                        ),
                        (
                            'pname_122',
                            [
                                (1221, 'field_1221'),
                                (1222, 'field_1222')
                            ]
                        )
                    ],)
                ]
            ),
            (
                'name_2',
                [
                    ([
                        (
                            'pname_211',
                            [
                                (2111, 'field_2111'),
                                (2112, 'field_2112')
                            ]
                        ),
                        (
                            'pname_212',
                            [
                                (2121, 'field_2121'),
                                (2122, 'field_2122')
                            ]
                        )
                    ],),
                    ([
                        (
                            'pname_221',
                            [
                                (2211, 'field_2211'),
                                (2212, 'field_2212')
                            ]
                        ),
                        (
                            'pname_222',
                            [
                                (2221, 'field_2221'),
                                (2222, 'field_2222')
                            ]
                        )
                    ],)
                ]
            )
        ),
    )],
    'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
# df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- data: struct (nullable = true)
#  |    |-- node1: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)
#  |    |-- node2: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)

The answer

  • Spark 3.1+

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
    df = df.withColumn("data", F.array("data.*"))
    
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
            F.expr(f"inline({arr_of_struct})")
        )
    
  • Lower Spark versions:

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn(
            "data",
            F.struct(
                F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
                *[f"data.{c}" for c in df.select("data.*").columns if c != n]
            )
        )
    df = df.withColumn("data", F.array("data.*"))
    
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
            F.expr(f"inline({arr_of_struct})")
        )
    

Results:

df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- node: string (nullable = false)
#  |-- name: string (nullable = true)
#  |-- pname: string (nullable = true)
#  |-- ordernum: integer (nullable = true)
#  |-- field: string (nullable = true)

df.show()
# +--------+-----+------+---------+--------+----------+
# |metadata| node|  name|    pname|ordernum|     field|
# +--------+-----+------+---------+--------+----------+
# |  {a, b}|node1|name_1|pname_111|    1111|field_1111|
# |  {a, b}|node1|name_1|pname_111|    1112|field_1112|
# |  {a, b}|node1|name_1|pname_112|    1121|field_1121|
# |  {a, b}|node1|name_1|pname_112|    1122|field_1122|
# |  {a, b}|node1|name_1|pname_121|    1211|field_1211|
# |  {a, b}|node1|name_1|pname_121|    1212|field_1212|
# |  {a, b}|node1|name_1|pname_122|    1221|field_1221|
# |  {a, b}|node1|name_1|pname_122|    1222|field_1222|
# |  {a, b}|node2|name_2|pname_211|    2111|field_2111|
# |  {a, b}|node2|name_2|pname_211|    2112|field_2112|
# |  {a, b}|node2|name_2|pname_212|    2121|field_2121|
# |  {a, b}|node2|name_2|pname_212|    2122|field_2122|
# |  {a, b}|node2|name_2|pname_221|    2211|field_2211|
# |  {a, b}|node2|name_2|pname_221|    2212|field_2212|
# |  {a, b}|node2|name_2|pname_222|    2221|field_2221|
# |  {a, b}|node2|name_2|pname_222|    2222|field_2222|
# +--------+-----+------+---------+--------+----------+

Explanation

nodes = df.select("data.*").columns
for n in nodes:
    df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))

Using the above, I decided to save the node title in case you need it. It first gets a list of nodes from "data" column fields. Using the list, the for loop creates one more field inside every node struct for the title of the node.

df = df.withColumn("data", F.array("data.*"))

The above converts the "data" column type from struct to array so that in the next step we could easily explode it into columns.

for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
    df = df.select(
        *[c for c in df.columns if c != arr_of_struct],
        F.expr(f"inline({arr_of_struct})")
    )

In the above, the main line is F.expr(f"inline({arr_of_struct})"). It must be used inside a loop, because it's a generator and you cannot nest them together in Spark. inline explodes arrays of structs into columns. At this step you have 4 of [array of struct], so 4 inline expressions will be created.

Multure answered 20/9, 2022 at 10:2 Comment(0)
H
0

So, this is not a perfect answer for you, but I hope it might give you some ideas to solve your problem. I know you said you don't want to hardcode your column names but I'm unable to handle that part at this moment.

First thing first, I created this sample JSON for testing

{
    "metadata": {},
    "data": {
        "node1": {
            "name": "Node001",
            "productlist": [
                {
                    "productvalues": [
                        {
                            "pname": "Node001-P001",
                            "porders": [
                                {"ordernum":  1, "field": "Node001-P001-001"},
                                {"ordernum":  2, "field": "Node001-P001-002"}
                            ]
                        },
                        {
                            "pname": "Node001-P002",
                            "porders": [
                                {"ordernum":  3, "field": "Node001-P002-003"},
                                {"ordernum":  4, "field": "Node001-P002-004"},
                                {"ordernum":  5, "field": "Node001-P002-005"},
                                {"ordernum":  6, "field": "Node001-P002-006"}
                            ]
                        },
                        {
                            "pname": "Node001-P003",
                            "porders": [
                                {"ordernum":  7, "field": "Node001-P003-007"}
                            ]
                        }
                    ]
                },
                {
                    "productvalues": [
                        {
                            "pname": "Node001-P004",
                            "porders": [
                                {"ordernum":  8, "field": "Node001-P004-008"},
                                {"ordernum":  9, "field": "Node001-P004-009"},
                                {"ordernum": 10, "field": "Node001-P004-010"}
                            ]
                        },
                        {
                            "pname": "Node001-P005",
                            "porders": [
                                
                                {"ordernum": 11, "field": "Node001-P005-011"},
                                {"ordernum": 12, "field": "Node001-P005-012"},
                                {"ordernum": 13, "field": "Node001-P005-013"}
                            ]
                        }
                    ]
                }
            ]
        },
        "node2": {
            "name": "Node002",
            "productlist": [
                {
                    "productvalues": [
                        {
                            "pname": "Node002-P001",
                            "porders": [
                                {"ordernum": 14, "field": "Node002-P001-014"}
                            ]
                        },
                        {
                            "pname": "Node002-P002",
                            "porders": [
                                {"ordernum": 15, "field": "Node002-P002-015"}
                            ]
                        },
                        {
                            "pname": "Node002-P003",
                            "porders": [
                                {"ordernum": 16, "field": "Node002-P003-016"}
                            ]
                        }
                    ]
                },
                {
                    "productvalues": [
                        {
                            "pname": "Node002-P004",
                            "porders": [
                                {"ordernum": 17, "field": "Node002-P004-017"}
                            ]
                        },
                        {
                            "pname": "Node002-P005",
                            "porders": [
                                
                                {"ordernum": 18, "field": "Node002-P005-018"}
                            ]
                        }
                    ]
                }
            ]
        }
    }
}

Now, this is a "dict-like" column that you need to use for later

cols_dict = [
    {
        'col': ['data.node1.name'],
        'exp': ['data.node1.productlist'],
    },
    {
        'exp': ['productlist.productvalues'],
    },
    {
        'col': ['productvalues.pname'],
        'exp': ['productvalues.porders'],
    },
    {
        'col': ['porders.ordernum', 'porders.field']
    }
]

And finally, loop through this dict and add some transformation to get your final result

dfx = df
select_col = []
for i in cols_dict:
    select_col = [c.split('.')[-1] for c in select_col]
    if i.get('col'):
        select_col += i['col']
    
    select_exp = []
    if i.get('exp'):
        select_exp += i['exp']

    dfx = dfx.select([F.col(c) for c in select_col] + [F.explode(c).alias(c.split('.')[-1]) for c in select_exp])

+-------+------------+--------+----------------+
|   name|       pname|ordernum|           field|
+-------+------------+--------+----------------+
|Node001|Node001-P001|       1|Node001-P001-001|
|Node001|Node001-P001|       2|Node001-P001-002|
|Node001|Node001-P002|       3|Node001-P002-003|
|Node001|Node001-P002|       4|Node001-P002-004|
|Node001|Node001-P002|       5|Node001-P002-005|
|Node001|Node001-P002|       6|Node001-P002-006|
|Node001|Node001-P003|       7|Node001-P003-007|
|Node001|Node001-P004|       8|Node001-P004-008|
|Node001|Node001-P004|       9|Node001-P004-009|
|Node001|Node001-P004|      10|Node001-P004-010|
|Node001|Node001-P005|      11|Node001-P005-011|
|Node001|Node001-P005|      12|Node001-P005-012|
|Node001|Node001-P005|      13|Node001-P005-013|
+-------+------------+--------+----------------+
Hoiden answered 20/9, 2022 at 9:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.