How can I stream elements from inside a JSON array using serde_json?
Asked Answered
G

3

8

I have a 5GB JSON file which is an array of objects with fixed structure:

[
  {
    "first": "John",
    "last": "Doe",
    "email": "[email protected]"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "[email protected]"
  },
  ....
]

I know that I can try to parse this file using the code shown in How can I deserialize JSON with a top-level array using Serde?:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let users: Vec<User> = serde_json::from_str(file)?;

There are multiple problems:

  1. It is first read as a string as a whole
  2. After reading as string, it converts it into a vector of User structs (I don't want that)

I tried How I can I lazily read multiple JSON values from a file/stream in Rust? but it reads the whole file before printing anything and it prints the whole structure at once inside the loop. I was expecting one object at a time in the loop:

enter image description here

Ideally, parsing and processing of the (parsed) User object should happen simultaneously in two separate threads/tasks/routines or by making use of channel.

Gar answered 3/8, 2021 at 18:31 Comment(4)
but it reads the whole file before printing anything — how do you verify this?Outdare
screenshot addedGar
Let's say that it printed out two items from the loop — how would you know if it read the whole file in or not?Outdare
Ok, so there is no issue with this line, no matter how big file size is. println!("Before reader"); let iterator = deserializer.into_iter::<serde_json::Value>(); println!("after reader"); - Now problem is whole file contents are printed at once on first loop iteration, so I can't get each object individually. - & I can't find any usage of loading whole file at any other place except the one mentioned in above pointGar
L
6

Streaming elements from a JSON array is possible, but requires some legwork. You must skip the leading [ and the intermittent , yourself, as well as detect the final ]. To parse individual array elements you need to use StreamDeserializer and extract a single item from it (so you can drop it and regain control of the IO reader). For example:

use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};

fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
    loop {
        let mut byte = 0u8;
        reader.read_exact(std::slice::from_mut(&mut byte))?;
        if !byte.is_ascii_whitespace() {
            return Ok(byte);
        }
    }
}

fn invalid_data(msg: &str) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, msg)
}

fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
    let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
    match next_obj {
        Some(result) => result.map_err(Into::into),
        None => Err(invalid_data("premature EOF")),
    }
}

fn yield_next_obj<T: DeserializeOwned, R: Read>(
    mut reader: R,
    at_start: &mut bool,
) -> io::Result<Option<T>> {
    if !*at_start {
        *at_start = true;
        if read_skipping_ws(&mut reader)? == b'[' {
            // read the next char to see if the array is empty
            let peek = read_skipping_ws(&mut reader)?;
            if peek == b']' {
                Ok(None)
            } else {
                deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
            }
        } else {
            Err(invalid_data("`[` not found"))
        }
    } else {
        match read_skipping_ws(&mut reader)? {
            b',' => deserialize_single(reader).map(Some),
            b']' => Ok(None),
            _ => Err(invalid_data("`,` or `]` not found")),
        }
    }
}

pub fn iter_json_array<T: DeserializeOwned, R: Read>(
    mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
    let mut at_start = false;
    std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}

Example usage:

fn main() {
    let data = r#"[
  {
    "first": "John",
    "last": "Doe",
    "email": "[email protected]"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "[email protected]"
  }
]"#;
    use serde::{Deserialize, Serialize};

    #[derive(Serialize, Deserialize, Debug)]
    struct User {
        first: String,
        last: String,
        email: String,
    }

    for user in iter_json_array(io::Cursor::new(&data)) {
        let user: User = user.unwrap();
        println!("{:?}", user);
    }
}

Playground

When using it in production, you'd open it as File instead of reading it to a string. As always, don't forget to wrap the File in a BufReader.

Latricialatrina answered 3/8, 2021 at 21:52 Comment(7)
Thank you! That's exactly what I wanted. I just parsed that 1.03 GB file in about 5 minutes having 10,000,000 objects. For reading file I used let str = fs::read_to_string(path)?; (it doesn't have overhead) for recipe in iter_json_array(io::Cursor::new(str)) {Gar
Here is a similar approach using Golang - github.com/sitetester/recipe-stats-calculator/blob/master/…Gar
@RSun Reading everything into a string seems suboptimal in terms of memory usage; why not use a buffered reader, as in golang? let reader = BufReader::new(File::open(path)?); for recipe in iter_json_array(reader) { ... }Latricialatrina
Agree. I'm still new to Rust :)Gar
@RSun I'm curious if it affects the run time (in either direction) - sometimes the most elegant code is not also the most performant! (And 5 minutes to parse a 5 GiB file is not exactly stellar performance - perhaps there is still low-hanging fruit in terms of ways to speed things up.)Latricialatrina
So I pushed Rust version here - github.com/sitetester/recipe-stats-calculator-rs Golang is parsing 10000000 JSON objects in about 25 seconds, while Rust is taking about 5 to 6 minutes for same number :D I wonder, what I'm doing wrong ? Could you please check it in free time for a BIG file with 10000000 JSON objects ? I want to learn more. It's also about Golang vs Rust speed comparison ;)Gar
@RSun The speed comparison probably belongs in a separate question (if it's stackoverflow material at all). I don't see anything obviously wrong with the code that does deserialization. You might want to consider using crossbeam channels instead of the ones in std::sync, as they are generally faster. But I don't expect the gain to be huge.Latricialatrina
O
3

This is not directly possible as of serde_json 1.0.66.

One workaround suggested is to implement your own Visitor that uses a channel. As deserialization of the array progresses, each element is pushed down the channel. The receiving side of the channel can then grab each element and process it, freeing up space for the deserialization to push in another value.

Outdare answered 3/8, 2021 at 20:37 Comment(0)
D
1

Disclaimer: The following uses a library other than serde_json, I am the author of that library and it is currently still experimental (but feedback is highly appreciated!).


You can use Struson and its serde feature to achieve this:

First you start the JSON array with begin_array and use has_next as loop condition to check if the array has more elements. In the loop you can then use deserialize_next to deserialize your User values.

let json = r#"
[
    {
        "first": "John",
        "last": "Doe",
        "email": "[email protected]"
    },
    {
        "first": "Anne",
        "last": "Ortha",
        "email": "[email protected]"
    }
]       
"#;
// `std::io::Read` providing the JSON data; in this example the str bytes
let reader = json.as_bytes();

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let mut json_reader = JsonStreamReader::new(reader);

json_reader.begin_array()?;

while json_reader.has_next()? {
    let user: User = json_reader.deserialize_next()?;
    // ... use deserialized value in some way
    println!("deserialized: {user:?}")
}

// Optionally consume the remainder of the JSON document
json_reader.end_array()?;
json_reader.consume_trailing_whitespace()?;

This is all performed in a streaming way by Struson, so it will only read as much data necessary to deserialize each of the User values (and possibly buffer some more bytes internally for the next call).

Dacia answered 25/8, 2023 at 22:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.