I've used Dask for large telemetry JSON-Lines files (newline delimited)...
The nice thing with Dask is it does a lot of work for you.
With it, you can read the data, process it, and write to disk without reading it all into memory.
Dask will also parallelize for you and use multiple cores (threads)...
More info on Dask bags here:
https://examples.dask.org/bag.html
import ujson as json #ujson for speed and handling NaNs which are not covered by JSON spec
import dask.bag as db
def update_dict(d):
d.update({'new_key':'new_value', 'a':1, 'b':2, 'c':0})
d['c'] = d['a'] + d['b']
return d
def read_jsonl(filepaths):
"""Read's a JSON-L file with a Dask Bag
:param filepaths: list of filepath strings OR a string with wildcard
:returns: a dask bag of dictionaries, each dict a JSON object
"""
return db.read_text(filepaths).map(json.loads)
filepaths = ['file1.jsonl.gz','file2.jsonl.gz']
#OR
filepaths = 'file*.jsonl.gz' #wildcard to match multiple files
#(optional) if you want Dask to use multiple processes instead of threads
# from dask.distributed import Client, progress
# client = Client(threads_per_worker=1, n_workers=6) #6 workers for 6 cores
# print(client)
#define bag containing our data with the JSON parser
dask_bag = read_jsonl(filepaths)
#modify our data
#note, this doesn't execute, it just adds it to a queue of tasks
dask_bag.map(update_dict)
#(optional) if you're only reading one huge file but want to split the data into multiple files you can use repartition on the bag
# dask_bag = dask_bag.repartition(10)
#write our modified data back to disk, this is when Dask actually performs execution
dask_bag.map(json.dumps).to_textfiles('file_mod*.jsonl.gz') #dask will automatically apply compression if you use .gz