Reading Event Hub Archive File in C#

2019-05-10 15:31发布

Is there any sample code in C# for reading the Azure Event Hub Archive files (Avro format)?

I am trying to use the Microsoft.Hadoop.Avro library. I dumped the schema out using a java avro tool which produces this:

{

                ""type"":""record"",
                ""name"":""EventData"",
                ""namespace"":""Microsoft.ServiceBus.Messaging"",
                ""fields"":[
                             {""name"":""SequenceNumber"",""type"":""long""},
                             {""name"":""Offset"",""type"":""string""},
                             {""name"":""EnqueuedTimeUtc"",""type"":""string""},
                             {""name"":""SystemProperties"",""type"":{ ""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},
                             {""name"":""Properties"",""type"":{ ""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"", ""null""]}},
                             {""name"":""Body"",""type"":[""null"",""bytes""]}
                         ]
                }

However, when trying to deserialize the file to read the data back in like this:

using (var reader = AvroContainer.CreateReader<EventData>(stream))
            {
                using (var streamReader = new SequentialReader<EventData>(reader))
                {
                    foreach (EventData dta in streamReader.Objects)
                    {
                        //stuff here
                    }

                }
            }

It doesn't work when passing the actual EventData type used on the Producer side so I tried to create a special class marked up with DataContract attributes like this:

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "Body")]
    public ArraySegment<byte> Body { get; set; }

    //[DataMember(Name = "SystemProperties")]
    //public SystemPropertiesCollection SystemProperties { get; set; }

    //[DataMember(Name = "Properties")]
    //public IDictionary<string, object> Properties { get; set; }
}

It errors with the following:

System.Runtime.Serialization.SerializationException occurred
Message=Cannot match the union schema.

Is there a reason no sample code exists from MS for this use case of reading the Avro archive files using C#?

2条回答
▲ chillily
2楼-- · 2019-05-10 16:02

If you're trying to read the Avro files using Microsoft.Hadoop.Avro library, you can use the following class:

[DataContract(Name = "EventData", Namespace = "Microsoft.ServiceBus.Messaging")]
class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public DateTime EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "SystemProperties")]
    public Dictionary<string, object> SystemProperties { get; set; }

    [DataMember(Name = "Properties")]
    public Dictionary<string, object> Properties { get; set; } 

    [DataMember(Name = "Body")]
    public byte[] Body { get; set; }

    public EventData(dynamic record)
    {
        SequenceNumber = (long)record.SequenceNumber;
        Offset = (string)record.Offset;
        DateTime.TryParse((string)record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
        EnqueuedTimeUtc = enqueuedTimeUtc;
        SystemProperties = (Dictionary<string, object>)record.SystemProperties;
        Properties = (Dictionary<string, object>)record.Properties;
        Body = (byte[])record.Body;
    }

}

When you're reading your avro file, you can read it as a dynamic object and then serialize it. Here's an example:

var reader = AvroContainer.CreateGenericReader(stream);
while (reader.MoveNext()) 
{
   foreach (dynamic record in reader.Current.Objects)
   {
       var eventData = new EventData(record);
       var sequenceNumber = eventData.SequenceNumber;
       var bodyText = Encoding.UTF8.GetString(eventData.Body);
       var properties = eventData.Properties;
       var sysProperties = eventData.SystemProperties;
   }
}

You can refer to this answer for more details.

查看更多
虎瘦雄心在
3楼-- · 2019-05-10 16:07

I used both the Microsoft.Hadoop.Avro and apache avro C# libs and they seemed to have the same exact issue. When just trying to read the sequence, offset, and EnqueuedTimeUTC they both get the same garbled data that appears to be the codec and schema definition data. So here's what I found out. I was downloading the blob to a memorystream and then trying to deserialize from there. The issue is that the deserializer was not taking into account the header and schema in the file and was trying to deserialize from the very beginning of the stream.

To solve this and what worked was to use the Apache Avro C# library and use their gen tool to create the C# class based off of the dumped json formatted schema and then use a DataFileReader that can read from the stream.

using (var dataFileReader = Avro.File.DataFileReader<EventData>.OpenReader(stream, evtSample.Schema))

where evtSample.Schema is an instance of the EventData class which contains it's schema.

Now to find out if I can do the same thing with the Microsoft.Hadoop.Avro library.

BTW, here is the generated C# class output from the Apache AVRO gen tool:

public partial class EventData : ISpecificRecord
{
    public static Schema _SCHEMA = Avro.Schema.Parse(@"{""type"":""record"",""name"":""EventData"",""namespace"":""Microsoft.ServiceBus.Messaging"",""fields"":[{""name"":""SequenceNumber"",""type"":""long""},{""name"":""Offset"",""type"":""string""},{""name"":""EnqueuedTimeUtc"",""type"":""string""},{""name"":""SystemProperties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes""]}},{""name"":""Properties"",""type"":{""type"":""map"",""values"":[""long"",""double"",""string"",""bytes"",""null""]}},{""name"":""Body"",""type"":[""null"",""bytes""]}]}");
    private long _SequenceNumber;
    private string _Offset;
    private string _EnqueuedTimeUtc;
    private IDictionary<string, System.Object> _SystemProperties;
    private IDictionary<string, System.Object> _Properties;
    private byte[] _Body;
    public virtual Schema Schema
    {
        get
        {
            return EventData._SCHEMA;
        }
    }
    public long SequenceNumber
    {
        get
        {
            return this._SequenceNumber;
        }
        set
        {
            this._SequenceNumber = value;
        }
    }
    public string Offset
    {
        get
        {
            return this._Offset;
        }
        set
        {
            this._Offset = value;
        }
    }
    public string EnqueuedTimeUtc
    {
        get
        {
            return this._EnqueuedTimeUtc;
        }
        set
        {
            this._EnqueuedTimeUtc = value;
        }
    }
    public IDictionary<string, System.Object> SystemProperties
    {
        get
        {
            return this._SystemProperties;
        }
        set
        {
            this._SystemProperties = value;
        }
    }
    public IDictionary<string, System.Object> Properties
    {
        get
        {
            return this._Properties;
        }
        set
        {
            this._Properties = value;
        }
    }
    public byte[] Body
    {
        get
        {
            return this._Body;
        }
        set
        {
            this._Body = value;
        }
    }
    public virtual object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return this.SequenceNumber;
            case 1: return this.Offset;
            case 2: return this.EnqueuedTimeUtc;
            case 3: return this.SystemProperties;
            case 4: return this.Properties;
            case 5: return this.Body;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()");
        };
    }
    public virtual void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: this.SequenceNumber = (System.Int64)fieldValue; break;
            case 1: this.Offset = (System.String)fieldValue; break;
            case 2: this.EnqueuedTimeUtc = (System.String)fieldValue; break;
            case 3: this.SystemProperties = (IDictionary<string, System.Object>)fieldValue; break;
            case 4: this.Properties = (IDictionary<string, System.Object>)fieldValue; break;
            case 5: this.Body = (System.Byte[])fieldValue; break;
            default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()");
        };
    }
}

}

查看更多
登录 后发表回答