Deserialize an Avro file with C#
Asked Answered
A

6

19

I can't find a way to deserialize an Apache Avro file with C#. The Avro file is a file generated by the Archive feature in Microsoft Azure Event Hubs.

With Java I can use Avro Tools from Apache to convert the file to JSON:

java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json

Using NuGet package Microsoft.Hadoop.Avro I am able to extract SequenceNumber, Offset and EnqueuedTimeUtc, but since I don't know what type to use for Body an exception is thrown. I've tried with Dictionary<string, object> and other types.

static void Main(string[] args)
{
    var fileName = "...";

    using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
    {
        using (var reader = AvroContainer.CreateReader<EventData>(stream))
        {
            using (var streamReader = new SequentialReader<EventData>(reader))
            {
                var record = streamReader.Objects.FirstOrDefault();
            }
        }
    }
}

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "Body")]
    public foo Body { get; set; }

    // More properties...
}

The schema looks like this:

{
  "type": "record",
  "name": "EventData",
  "namespace": "Microsoft.ServiceBus.Messaging",
  "fields": [
    {
      "name": "SequenceNumber",
      "type": "long"
    },
    {
      "name": "Offset",
      "type": "string"
    },
    {
      "name": "EnqueuedTimeUtc",
      "type": "string"
    },
    {
      "name": "SystemProperties",
      "type": {
        "type": "map",
        "values": [ "long", "double", "string", "bytes" ]
      }
    },
    {
      "name": "Properties",
      "type": {
        "type": "map",
        "values": [ "long", "double", "string", "bytes" ]
      }
    },
    {
      "name": "Body",
      "type": [ "null", "bytes" ]
    }
  ]
}    
Azoic answered 4/10, 2016 at 7:44 Comment(2)
This may help: https://mcmap.net/q/666485/-reading-event-hub-archive-file-in-cElectrophone
What about snappy coding? Anyone having issues?Io
T
11

I was able to get full data access working using dynamic. Here's the code for accessing the raw body data, which is stored as an array of bytes. In my case, those bytes contain UTF8-encoded JSON, but of course it depends on how you initially created your EventData instances that you published to the Event Hub:

using (var reader = AvroContainer.CreateGenericReader(stream))
{
    while (reader.MoveNext())
    {
        foreach (dynamic record in reader.Current.Objects)
        {
            var sequenceNumber = record.SequenceNumber;
            var bodyText = Encoding.UTF8.GetString(record.Body);
            Console.WriteLine($"{sequenceNumber}: {bodyText}");
        }
    }
}

If someone can post a statically-typed solution, I'll upvote it, but given that the bigger latency in any system will almost certainly be the connection to the Event Hub Archive blobs, I wouldn't worry about parsing performance. :)

Teahan answered 27/5, 2017 at 19:9 Comment(3)
great nice one I want to do similar stuff here is my detail questions any help ? #48462811Tenner
As a footnote, when Event Hub Capture gives you an "empty" blob (no events received in a given partition during that capture interval) -- such blobs are 508 B in size in my case currently -- then the deserialization fails with a mysterious complaint about a "size" parameter (System.ArgumentOutOfRangeException: 'Specified argument was out of the range of valid values. Parameter name: size'). The same logic works again as soon as at least one event is present in the capture blob.Teahan
I get Invalid Avro object container in a stream. The header cannot be recognized.Sturtevant
D
11

This Gist shows how to deserialize an event hub capture with C# using Microsoft.Hadoop.Avro2, which has the advantage of being both .NET Framework 4.5 and .NET Standard 1.6 compliant:

 var connectionString = "<Azure event hub capture storage account connection string>";
 var containerName = "<Azure event hub capture container name>";
 var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";

 var storageAccount = CloudStorageAccount.Parse(connectionString);
 var blobClient = storageAccount.CreateCloudBlobClient();
 var container = blobClient.GetContainerReference(containerName);
 var blob = container.GetBlockBlobReference(blobName);
 using (var stream = blob.OpenRead())
 using (var reader = AvroContainer.CreateGenericReader(stream))
     while (reader.MoveNext())
         foreach (dynamic result in reader.Current.Objects)
         {
             var record = new AvroEventData(result);
             record.Dump();
         }

 public struct AvroEventData
 {
     public AvroEventData(dynamic record)
     {
         SequenceNumber = (long) record.SequenceNumber;
         Offset = (string) record.Offset;
         DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
         EnqueuedTimeUtc = enqueuedTimeUtc;
         SystemProperties = (Dictionary<string, object>) record.SystemProperties;
         Properties = (Dictionary<string, object>) record.Properties;
         Body = (byte[]) record.Body;
     }
     public long SequenceNumber { get; set; }
     public string Offset { get; set; }
     public DateTime EnqueuedTimeUtc { get; set; }
     public Dictionary<string, object> SystemProperties { get; set; }
     public Dictionary<string, object> Properties { get; set; }
     public byte[] Body { get; set; }
 }
  • NuGet references:

    • Microsoft.Hadoop.Avro2 (1.2.1 works)
    • WindowsAzure.Storage (8.3.0 works)
  • Namespaces:

    • Microsoft.Hadoop.Avro.Container
    • Microsoft.WindowsAzure.Storage
Djambi answered 19/8, 2017 at 16:18 Comment(2)
please avoid "link-only answers". consider that the link could be broken in future and the answer should be useful also without it.Lowermost
great nice one I want to do similar stuff here is my detail questions any help ? #48462811Tenner
R
10

I was finally able to get this to work with the Apache C# library / framework.
I was stuck for a while because the Capture feature of the Azure Event Hubs sometimes outputs a file without any message content. I may have also had an issue with how the messages were originally serialized into the EventData object.
The code below was for a file saved to disk from a capture blob container.

var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)
{
   // Do work on EventData object
}

This also works using the GenericRecord object.

var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);

This took some effort to figure out. However I now agree this Azure Event Hubs Capture feature is a great feature to backup all events. I still feel they should make the format optional like they did with Stream Analytic job output but maybe I will get used to Avro.

Rapine answered 16/7, 2017 at 3:4 Comment(2)
how exactly you have used this code ? DataFileReader is in which nuget?Tenner
@neo One version of the DataFileReader that works with the above code can be found Confluent.Apache.Avro nuget package.Elishaelision
E
1

I would recommend you to use https://github.com/AdrianStrugala/AvroConvert

And simply:

byte[] avroFileContent = File.ReadAllBytes(fileName);
var result = AvroConvert.Deserialize<EventData>(avroFileContent);

The library itself was meant to be an improvement of dev flow with using Avro format. You don't even need the schema or attributes on your model. (I am a contributor to this library)

Elfish answered 30/7, 2019 at 8:10 Comment(1)
Be aware, the license for AvroConvert does not permit the use of it within Commercial projects. Which will make this solution unviable for some.Ontiveros
M
0

Your remaining types, I suspect should be defined as:

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
[KnownType(typeof(Dictionary<string, object>))]
public class EventData
{
    [DataMember]
    public IDictionary<string, object> SystemProperties { get; set; }

    [DataMember]
    public IDictionary<string, object> Properties { get; set; }

    [DataMember]
    public byte[] Body { get; set; }
}

Even though Body is a union of null and bytes, this maps to a nullable byte[].

In C#, arrays are always reference types so can be null and the contract fulfilled.

Mims answered 9/12, 2016 at 11:15 Comment(4)
Thanks, it didn't work though: Could not find any matching known type for 'System.Collections.Generic.IDictionary`2[System.String,System.Object]'..Gesundheit
@KristofferJälén Is that exception specifically for the Body property?Mims
No, that exception was for the SystemProperties property.Gesundheit
Try annotating the class with the likes of [KnownType(typeof(Dictionary<string, object>))]? Else you may try using the concrete type instead of the interface as well.Mims
L
0

You can also use NullableSchema attribute to mark the Body as union of bytes and null. This will allow you to use the strongly typed interface.

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "Body")]
    [NullableSchema]
    public foo Body { get; set; }
}
Lodging answered 29/3, 2019 at 19:50 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.