Adapted from https://stackoverflow.com/a/77198142
Post from awhile ago, but I'm now in same situation. In my case I'm stuck with VPC flow logs where each set of logEntries is a JSON object, many per file with no separators. Like you, I see a set of JSON objects like {...}{...}{...}...
I agree, changing the data is best. BUT, to work with existing data without re-writing, still need a solution.
Here's what I did (using pyspark via AWS Glue):
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
import pyspark.sql.functions as F
from pyspark.sql.functions import explode, split, col, from_json
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Each file is a row
df = spark.read.text(
's3://xxxxx-omitted-cloudwatch-logs/vpcFlowLogs/2024/01/31/*/*',
wholetext = True)
# Here we split each row (file) looking for "}{".
# Still one row, but now many columns, each a valid JSON string
df = df.select(F.split(F.regexp_replace(
col('value'),
'\}\{',
'\}\|\{'),
'\|').alias('value'))
# Here we explode the columns into rows.
df = df.withColumn("json_str", explode(df["value"]))
df = df.drop("value")
# Here we convert each row to a JSON object
json_schema = spark.read.json(df.rdd.map(lambda row: row.json_str)).schema
df = df.withColumn('json', from_json(col('json_str'), json_schema))
# Now we explode the underlying logEvents array
df = df.select(col('json.logEvents').alias('logEvents'))
df = df.withColumn("logEvents_expr", explode(df["logEvents"]))
# Now we split each log event message into named columns
df = df.withColumn("msg", df["logEvents_expr"].getItem("message"))
spl = split(df["msg"], " ")
df = df.withColumn("account_id", spl.getItem(1))
#df = df.withColumn("interface_id", spl.getItem(2))
df = df.withColumn("srcaddr", spl.getItem(3))
df = df.withColumn("dstaddr", spl.getItem(4))
#df = df.withColumn("srcport", spl.getItem(5))
#df = df.withColumn("dstport", spl.getItem(6))
#df = df.withColumn("protocol", spl.getItem(7))
#df = df.withColumn("packets", spl.getItem(8))
df = df.withColumn("total_bytes", spl.getItem(9))
#df = df.withColumn("start", spl.getItem(10))
#df = df.withColumn("end", spl.getItem(11))
#df = df.withColumn("action", spl.getItem(12))
#df = df.withColumn("log_status", spl.getItem(13))
# Get rid of unneeded columns
df = df.drop("logEvents")
df = df.drop("logEvents_expr")
# Here I filter by account.
df = df.where(df["account_id"] == "99999999999")
# And those events downloaded to NAT
df = df.where((df["dstaddr"] == "10.123.124.125"))
# And sum the total bytes
df.select(F.sum(df.total_bytes)).show()