Deserializing to AsyncEnumerable using Newtonsoft.Json
Asked Answered
P

1

5

System.Text.Json.JsonSerializer.DeserializeAsyncEnumerable<T> is a method by System.Text.Json that can take a Stream and produce an IAsyncEnumerable<T>, where the enumeration can be asynchronous. This is useful for example, to deserialize an array of elements that are streamed by a network connection, so we can output the elements before reaching the actual end of stream.

Is there any way to achieve equivalent functionality using the Newtonsoft.Json library?

Pruitt answered 31/5, 2022 at 17:4 Comment(4)
i think it was answered here: #8158136Myriagram
There is no direct equivalent. Json.NET's serializer doesn't support async deserialization. See Feature request: async serialization to/from the System.IO.Pipelines API #1795: Making the entire serializer async would be a huge amount of work and require a lot of duplication of code to have sync and async paths. I wouldn't really want to maintain that.. See also DeserializeAsync #1193 which is still open.Nickname
JsonTextReader does claim to support async reading though so it might be possible to asynchronously copy each enumerable item into a MemoryStream or JToken, then deserialize from that. Would that be good enough?Nickname
Will use this method as a workaround. Thanks.Pruitt
N
7

Json.NET's JsonSerializer class does not support asynchronous deserialization, and no support is planned in the future. For confirmation, see e.g. the following open issues:

JsonTextReader does support async reading however, and LINQ-to-JSON supports asynchronous loading via JToken.LoadAsync(). Thus you should be able to create an IAsyncEnumerable that iterates through a JSON stream whose root value is an array, and asynchronously returns each array element as a JToken. Subsequently you can deserialize each token to your final model using JToken.ToObject<T>().

First, create the following extension methods:

public static partial class JsonExtensions
{
    /// <summary>
    /// Asynchronously load and synchronously deserialize values from a stream containing a JSON array.  The root object of the JSON stream must in fact be an array, or an exception is thrown
    /// </summary>
    public static async IAsyncEnumerable<T?> DeserializeAsyncEnumerable<T>(Stream stream, JsonSerializerSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var serializer = JsonSerializer.CreateDefault(settings);
        var loadSettings = new JsonLoadSettings { LineInfoHandling = LineInfoHandling.Ignore }; // For performance do not load line info.
        // StreamReader and JsonTextReader do not implement IAsyncDisposable so let the caller dispose the stream.
        using (var textReader = new StreamReader(stream, leaveOpen : true))
        using (var reader = new JsonTextReader(textReader) { CloseInput = false })
        {
            await foreach (var token in LoadAsyncEnumerable(reader, loadSettings, cancellationToken ).ConfigureAwait(false))
                yield return token.ToObject<T>(serializer);
        }
    }
    
    /// <summary>
    /// Asynchronously load and return JToken values from a stream containing a JSON array.  The root object of the JSON stream must in fact be an array, or an exception is thrown
    /// </summary>
    public static async IAsyncEnumerable<JToken> LoadJTokenAsyncEnumerable(Stream stream, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        // StreamReader and JsonTextReader do not implement IAsyncDisposable so let the caller dispose the stream.
        using (var textReader = new StreamReader(stream, leaveOpen : true))
        using (var reader = new JsonTextReader(textReader) { CloseInput = false })
        {
            await foreach (var token in LoadAsyncEnumerable(reader, settings, cancellationToken).ConfigureAwait(false))
                yield return token;
        }
    }
    
    /// <summary>
    /// Asynchronously load and return JToken values from a stream containing a JSON array.  The root object of the JSON stream must in fact be an array, or an exception is thrown
    /// </summary>
    public static async IAsyncEnumerable<JToken> LoadAsyncEnumerable(JsonTextReader reader, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        (await reader.MoveToContentAndAssertAsync().ConfigureAwait(false)).AssertTokenType(JsonToken.StartArray);
        cancellationToken.ThrowIfCancellationRequested();
        while ((await reader.ReadToContentAndAssert(cancellationToken).ConfigureAwait(false)).TokenType != JsonToken.EndArray)
        {
            cancellationToken.ThrowIfCancellationRequested();
            yield return await JToken.LoadAsync(reader, settings, cancellationToken).ConfigureAwait(false);
        }
        cancellationToken.ThrowIfCancellationRequested();
    }
    
    public static JsonReader AssertTokenType(this JsonReader reader, JsonToken tokenType) => 
        reader.TokenType == tokenType ? reader : throw new JsonSerializationException(string.Format("Unexpected token {0}, expected {1}", reader.TokenType, tokenType));
    
    public static async Task<JsonReader> ReadToContentAndAssert(this JsonReader reader, CancellationToken cancellationToken = default) =>
        await (await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false)).MoveToContentAndAssertAsync(cancellationToken).ConfigureAwait(false);

    public static async Task<JsonReader> MoveToContentAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
    {
        if (reader == null)
            throw new ArgumentNullException();
        if (reader.TokenType == JsonToken.None)       // Skip past beginning of stream.
            await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
        while (reader.TokenType == JsonToken.Comment) // Skip past comments.
            await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
        return reader;
    }

    public static async Task<JsonReader> ReadAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
    {
        if (reader == null)
            throw new ArgumentNullException();
        if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
            throw new JsonReaderException("Unexpected end of JSON stream.");
        return reader;
    }
}

And now you will be able to do something like:

await foreach (var token in JsonExtensions.LoadJTokenAsyncEnumerable(stream, cancellationToken : cancellationToken))
{
    Console.WriteLine(token.ToString(Formatting.None));
}

Notes:

  • Cancellation is not tested. The above implementation should throw OperationCanceledException if cancelled.

  • System.Text.Json was designed from scratch to support asynchronous deserialization with good performance. If you need asynchronous deserialization, you should consider rewriting your code to use it.

  • StreamReader and JsonTextReader do not implement IAsyncDisposable so the extension methods leave the underlying stream open for the caller to dispose.

  • I'm not sure all those .ConfigureAwait(false) calls are still needed in .NET 6.

Lightly tested demo fiddle here.

Nickname answered 4/6, 2022 at 18:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.