Using pyspark, how do I read multiple JSON documents on a single line in a file into a dataframe?
Asked Answered
M

5

10

Using Spark 2.3, I know I can read a file of JSON documents like this:

{'key': 'val1'}
{'key': 'val2'}

With this:

spark.json.read('filename')

How can I read the following in to a dataframe when there aren't newlines between JSON documents?

The following would be an example input.

{'key': 'val1'}{'key': 'val2'}

To be clear, I expect a dataframe with two rows (frame.count() == 2).

Midgett answered 12/7, 2018 at 20:52 Comment(1)
I would suggest fixing your input file rather than fight how Spark reads the files because that's not valid JSON object or JSONlines formattingRomans
P
1

Please try -

df = spark.read.json(["fileName1","fileName2"])

You can also do if you want to read all json files in the folder -

df = spark.read.json("data/*json")
Phloem answered 13/7, 2018 at 15:14 Comment(2)
The problem isn't that I have multiple files - the problem is that, in a single file, I have multiple JSON documents without newlines between.Midgett
This helped me out fo my case. Thanks.Ultraviolet
S
0

As @cricket_007 suggested above, you'd be better off fixing the input file

If you're sure you have no inline close braces within json objects, you could do the following:

with open('myfilename', 'r') as f:
    txt = f.read()

txt = txt.replace('}', '}\n')

with open('mynewfilename', 'w') as f:
    f.write(txt)

If you do have '}' within keys or values, the task becomes harder but not impossible with regex. It seems unlikely though.

Sello answered 6/6, 2019 at 16:2 Comment(0)
T
0

We solved this using the RDD-Api as we couldn't find any way to use the Dataframe-API in a memory efficient way (we were always hitting executor OoM-Errors).

Following function will incrementally try to parse the json and yielding subsequent jsons from your file (from this post):

from functools import partial
from json import JSONDecoder
from io import StringIO

def generate_from_buffer(buffer: str, chunk: str, decoder: JSONDecoder):
    buffer += chunk
    while buffer:
        try:
            result, index = decoder.raw_decode(buffer)
            yield result
            buffer = buffer[index:].lstrip()
        except ValueError:
            # Not enough data to decode, read more
            break
    return buffer


def parse_jsons_file(jsons_str: str, buffer_size: int = 1024):
    decoder = JSONDecoder()
    buffer = ''
    file_obj = StringIO(jsons_str)
    for chunk in iter(partial(file_obj.read, buffer_size), ''):
        buffer = yield from generate_from_buffer(buffer, chunk, decoder)
    if buffer:
        raise ValueError("Invalid input: should be concatenation of json strings")

We first read the json with .format("text"):

    df: DataFrame = (
        spark
        .read
        .format("text")
        .option("wholetext", True)
        .load(data_source_path)
    )

then convert it to RDD, flatMap using the function from above, and finally convert it back to spark dataframe. For this you have to define the json_schema for the single jsons in your file, which is good practice anyway.

    rdd_df = (df_rdd.map(lambda row: row["value"])
                 .flatMap(lambda jsons_string: parse_jsons_file(jsons_string))
                 .toDF(json_schema))
Templar answered 19/3, 2021 at 8:13 Comment(0)
C
0

Adapted from https://stackoverflow.com/a/77198142

Post from awhile ago, but I'm now in same situation. In my case I'm stuck with VPC flow logs where each set of logEntries is a JSON object, many per file with no separators. Like you, I see a set of JSON objects like {...}{...}{...}...

I agree, changing the data is best. BUT, to work with existing data without re-writing, still need a solution.

Here's what I did (using pyspark via AWS Glue):

from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
import pyspark.sql.functions as F
from pyspark.sql.functions import explode, split, col, from_json

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Each file is a row

df = spark.read.text(
    's3://xxxxx-omitted-cloudwatch-logs/vpcFlowLogs/2024/01/31/*/*',
    wholetext = True)

# Here we split each row (file) looking for "}{".
# Still one row, but now many columns, each a valid JSON string

df = df.select(F.split(F.regexp_replace(
    col('value'),
    '\}\{',
    '\}\|\{'),
    '\|').alias('value'))

# Here we explode the columns into rows.

df = df.withColumn("json_str", explode(df["value"]))
df = df.drop("value")

# Here we convert each row to a JSON object

json_schema = spark.read.json(df.rdd.map(lambda row: row.json_str)).schema
df = df.withColumn('json', from_json(col('json_str'), json_schema))

# Now we explode the underlying logEvents array

df = df.select(col('json.logEvents').alias('logEvents'))
df = df.withColumn("logEvents_expr", explode(df["logEvents"]))

# Now we split each log event message into named columns

df = df.withColumn("msg", df["logEvents_expr"].getItem("message"))
spl = split(df["msg"], " ")
df = df.withColumn("account_id", spl.getItem(1))
#df = df.withColumn("interface_id", spl.getItem(2))
df = df.withColumn("srcaddr", spl.getItem(3))
df = df.withColumn("dstaddr", spl.getItem(4))
#df = df.withColumn("srcport", spl.getItem(5))
#df = df.withColumn("dstport", spl.getItem(6))
#df = df.withColumn("protocol", spl.getItem(7))
#df = df.withColumn("packets", spl.getItem(8))
df = df.withColumn("total_bytes", spl.getItem(9))
#df = df.withColumn("start", spl.getItem(10))
#df = df.withColumn("end", spl.getItem(11))
#df = df.withColumn("action", spl.getItem(12))
#df = df.withColumn("log_status", spl.getItem(13))

# Get rid of unneeded columns

df = df.drop("logEvents")
df = df.drop("logEvents_expr")

# Here I filter by account.

df = df.where(df["account_id"] == "99999999999")

# And those events downloaded to NAT

df = df.where((df["dstaddr"] == "10.123.124.125"))

# And sum the total bytes

df.select(F.sum(df.total_bytes)).show()
Corpse answered 13/2 at 20:40 Comment(2)
see if this answer[https://mcmap.net/q/337191/-how-to-extract-multiple-json-objects-from-one-file] simplifies your problem. ie. convert the series of json elements in a line into an array of json elements in a line - by wrapping each line with []. This will avoid the need to parse the line yourself.Fanni
No @Fanni because the it is not that simple. There are no commas separating the JSON objectsCorpse
B
0

The following approach worked for me. Give an pre-defined schema to help Spark:

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("key", StringType(), True)
])


df = spark.read.schema(schema).json(path_to_json_like_file)
df.show()

outputs:

       key
1      val1
2      val2
3      val3
4      val4

for a file like:

{"key": "val1"}
{"key": "val2"}
{"key": "val3"}
{"key": "val4"}
Balmoral answered 13/2 at 21:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.