Pyspark: explode json in column to multiple columns
Asked Answered
W

8

45

The data looks like this -

+-----------+-----------+-----------------------------+
|         id|      point|                         data|
+-----------------------------------------------------+
|        abc|          6|{"key1":"124", "key2": "345"}|
|        dfl|          7|{"key1":"777", "key2": "888"}|
|        4bd|          6|{"key1":"111", "key2": "788"}|

I am trying to break it into the following format.

+-----------+-----------+-----------+-----------+
|         id|      point|       key1|       key2|
+------------------------------------------------
|        abc|          6|        124|        345|
|        dfl|          7|        777|        888|
|        4bd|          6|        111|        788|

The explode function explodes the dataframe into multiple rows. But that is not the desired solution.

Note: This solution does not answers my questions. PySpark "explode" dict in column

Willow answered 27/6, 2018 at 19:38 Comment(0)
B
61

As long as you are using Spark version 2.1 or higher, pyspark.sql.functions.from_json should get you your desired result, but you would need to first define the required schema

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('key1', StringType(), True),
        StructField('key2', StringType(), True)
    ]
)

df.withColumn("data", from_json("data", schema))\
    .select(col('id'), col('point'), col('data.*'))\
    .show()

which should give you

+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc|    6| 124| 345|
|df1|    7| 777| 888|
|4bd|    6| 111| 788|
+---+-----+----+----+
Burrstone answered 27/6, 2018 at 19:54 Comment(4)
You should be able to use something the following to extract the schema of the JSON from the data field... schema = spark.read.json(df.rdd.map(lambda row: row.data)).schemaSalver
Is there any way to do this without supplying a schema? In the context of spark streaming jobs, the above schema extraction is not an option @SimonPeacock, writing down the complete schema is .. messy (to say the least) and also quite unflexible as I want additional fields to appear without having to adapt and restart the whole streaming jobExtern
get schema using df.schema and don't forget to use all dataTypes as StringType() else it may give null values for other data types as well as for StringTypesConlon
In case you want to select all rest of the DF columns and also expan the json column use following df2 = df.select("*", col("data.*"))Responsory
D
8

As suggested by @pault, the data field is a string field. since the keys are the same (i.e. 'key1', 'key2') in the JSON string over rows, you might also use json_tuple() (this function is New in version 1.6 based on the documentation)

from pyspark.sql import functions as F

df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()

Below is My original post: which is most likely WRONG if the original table is from df.show(truncate=False) and thus the data field is NOT a python data structure.

Since you have exploded the data into rows, I supposed the column data is a Python data structure instead of a string:

from pyspark.sql import functions as F

df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()
Destinydestitute answered 27/6, 2018 at 20:47 Comment(6)
I don't think this works in this case- you need a MapType() column to use getItem() but it looks like it's a string here.Thermotensile
the OP mentioned the results had been exploded into multiple rows, this does not sounds to be a string field.Destinydestitute
"The explode function explodes the dataframe into multiple rows." sounds like OP is stating a fact, rather than what they have tried. Also, if it were a MapType() it would not display as shown in the post.Thermotensile
thanks, I think you might be right. but I think it can be much simpler when keys are constant in the JSON strings.Destinydestitute
this works. I did not know about json_tuple - it's much easier than defining the schema.Thermotensile
struggled 1 day with looping, this is really kool.Marisamariscal
S
3

In this approach you just need to set the name of column with Json content. No need to set up the schema. It makes everything automatically.

json_col_name = 'data'
keys = df.select(f"{json_col_name}.*").columns
jsonFields= [f"{json_col_name}.{key} {key}" for key in keys]

main_fields = [key for key in df.columns if key != json_col_name]
df_new = df.selectExpr(main_fields + jsonFields)
Sparrow answered 14/7, 2022 at 13:4 Comment(1)
This should be the accepted answer IMHODepute
I
2

As mentioned by @jxc, json_tuple should work fine if you were not able to define the schema beforehand and you only needed to deal with a single level of json string. I think it's more straight forward and easier to use. Strangely, I didn't find anyone else mention this function before.

In my use case, original dataframe schema: StructType(List(StructField(a,StringType,true))), json string column shown as:

+---------------------------------------+
|a                                      |
+---------------------------------------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
|{"k1": "v11", "k3": "v33"}             |
|{"k1": "v13", "k2": "23"}              |
+---------------------------------------+

Expand json fields into new columns with json_tuple:

from pyspark.sql import functions as F

df = df.select(F.col('a'), 
    F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
    .alias('k1', 'k2', 'k3'))

df.schema
df.show(truncate=False)

The document doesn't say much about it, but at least in my use case, new columns extracted by json_tuple are StringType, and it only extract single depth of JSON string.

StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))

+---------------------------------------+---+----+-------+
|a                                      |k1 |k2  |k3     |
+---------------------------------------+---+----+-------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2   |{"m":1}|
|{"k1": "v11", "k3": "v33"}             |v11|null|v33    |
|{"k1": "v13", "k2": "23"}              |v13|23  |null   |
+---------------------------------------+---+----+-------+
Ichabod answered 16/6, 2021 at 9:43 Comment(0)
P
0

This works for my use case

data1 = spark.read.parquet(path)
json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
data2 = data1.withColumn("data", from_json("json_col", json_schema))
col1 = data2.columns
col1.remove("data")
col2 = data2.select("data.*").columns
append_str ="data."
col3 = [append_str + val for val in col2]
col_list = col1 + col3
data3 = data2.select(*col_list).drop("json_col")
Predominance answered 5/6, 2021 at 18:37 Comment(0)
C
0

All credits to Shrikant Prabhu

You can simply use SQL

SELECT id, point, data.*
FROM original_table

Like this the schema of the new table will adapt if the data changes and you won't have to do anything in your pipelin.

Cereal answered 26/11, 2021 at 10:0 Comment(0)
F
0

Simply do this:

df.select("id", "point", "data.*").show()

It will give you following answer:

enter image description here

Explanation:

To expand a struct type data, 'data.*' can be used. Doing this will expand the data column and the 'key' inside data column will become new columns.

Fermanagh answered 6/4 at 12:47 Comment(0)
P
0

You can use from_json function also to achieve this,

data = [("abc", 6, '{"key1":"124", "key2": "345"}'),
        ("dfl", 7, '{"key1":"777", "key2": "888"}'),
        ("4bd", 6, '{"key1":"111", "key2": "788"}')]

df = spark.createDataFrame(data, ["id", "point", "data"])


schema = "key1 string, key2 string"


df = df.withColumn("data", from_json(col("data"), schema)) \
       .select("id", "point", "data.key1", "data.key2")

df.show()
Porker answered 8/4 at 20:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.