Rx.Net Message Parser
Asked Answered
R

2

8

I'm trying to parse an incoming stream of bytes that represents messages. I need to split the stream and create a message structure for each part.

A message always starts with a 0x81 (BOM) and ends with a 0x82 (EOM).

start:  0x81
header: 3 bytes
data:   arbitrary length
stop:   0x82

The data part is escaped using an escape byte 0x1B (ESC): Anytime a byte in the data part contains one of the control bytes {ESC, BOM, EOM}, it is prefixed with ESC.

The header part is not escaped, and may contain control bytes.

I would like to code this in a functional reactive style using Rx.Net, by consuming an IObservable<byte> and transforming it into an IObservable<Message>.

What is the most idiomatic way to do this?

Some examples:

[81 01 02 03 82] single message
[81 82 81 82 82] single message, header = [82 81 82]
[81 01 02 1B 82] single message, header = [01 02 1B].
[81 01 02 03 1B 82 82] single message, header = [01 02 03], (unescaped) data = [82]
[81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored.
                          header = [01 02 03], (unescaped) data = [1B]

Here's a state machine drawing for this: enter image description here

Reexamine answered 7/6, 2016 at 19:18 Comment(0)
I
3

If you are looking for something the is "more functional" then this may help, however the answer by @Evk pass these tests too.

Firstly can I suggest, to take the leg work out of providing a verifiable answer, could you provide a test suite to implement on complex questions like this.

Something like this would have been very helpful.

var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<byte>(
    ReactiveTest.OnNext<byte>(01,0x81), //BOM m1
    ReactiveTest.OnNext<byte>(02,0x01), 
    ReactiveTest.OnNext<byte>(03,0x02), 
    ReactiveTest.OnNext<byte>(04,0x03), 
    ReactiveTest.OnNext<byte>(05,0x82), //EOM m1
    ReactiveTest.OnNext<byte>(06,0x81), //BOM m2
    ReactiveTest.OnNext<byte>(07,0x82), 
    ReactiveTest.OnNext<byte>(08,0x81), 
    ReactiveTest.OnNext<byte>(09,0x82), 
    ReactiveTest.OnNext<byte>(10,0x82), //EOM m2
    ReactiveTest.OnNext<byte>(11,0x81), //BOM m3
    ReactiveTest.OnNext<byte>(12,0x01),     
    ReactiveTest.OnNext<byte>(13,0x02), 
    ReactiveTest.OnNext<byte>(14,0x1B), 
    ReactiveTest.OnNext<byte>(15,0x82), //EOM m3
    ReactiveTest.OnNext<byte>(16,0x81), //BOM m4
    ReactiveTest.OnNext<byte>(17,0x01), 
    ReactiveTest.OnNext<byte>(18,0x02), 
    ReactiveTest.OnNext<byte>(19,0x03), 
    ReactiveTest.OnNext<byte>(20,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(21,0x82), //Data
    ReactiveTest.OnNext<byte>(22,0x82), //EOM m4
    ReactiveTest.OnNext<byte>(23,0x81), //BOM m5
    ReactiveTest.OnNext<byte>(24,0x01), 
    ReactiveTest.OnNext<byte>(25,0x02), 
    ReactiveTest.OnNext<byte>(26,0x03), 
    ReactiveTest.OnNext<byte>(27,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(28,0x1B), //Data
    ReactiveTest.OnNext<byte>(29,0x82), //EOM m5
    ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)

var observer = scheduler.CreateObserver<Message>();

//CurrentAnswer(source)
MyAnswer(source)
    .Subscribe(observer);

scheduler.Start();

ReactiveAssert.AreElementsEqual(
    new[] {
        ReactiveTest.OnNext(05, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[0]{}}),
        ReactiveTest.OnNext(10, new Message(){Header=new byte[]{0x82, 0x81, 0x82}, Data=new byte[0]{}}),
        ReactiveTest.OnNext(15, new Message(){Header=new byte[]{0x01, 0x02, 0x1B}, Data=new byte[0]{}}),
        ReactiveTest.OnNext(22, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x82}}),
        ReactiveTest.OnNext(29, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x1B}}),
    },                                                  
    observer.Messages); 

I have also written a version of Message that allows me to verify the code

public class Message
{
    public static readonly byte BOM = 0x81;
    public static readonly byte EOM = 0x82;
    public static readonly byte Control = 0x1B;

    public byte[] Header { get; set; }
    public byte[] Data { get; set; }

    public static Message Create(byte[] bytes)
    {   
        if(bytes==null)
            throw new ArgumentNullException(nameof(bytes));
        if(bytes.Length<3)
            throw new ArgumentException("bytes<3").Dump();


        var header = new byte[3];
        Array.Copy(bytes, header, 3);

        var body = new List<byte>();
        var escapeNext = false;
        for (int i = 3; i < bytes.Length; i++)
        {
            var b = bytes[i];

            if (b == Control && !escapeNext)
            {
                escapeNext = true;
            }
            else
            {
                body.Add(b);
                escapeNext = false;
            }
        }
        var msg = new Message { Header = header, Data = body.ToArray()};
        return msg;
    }

    public override string ToString()
    {
        return string.Format("Message(Header=[{0}], Data=[{1}])", ByteArrayString(Header), ByteArrayString(Data));
    }

    private static string ByteArrayString(byte[] bytes)
    {
        return string.Join(",", bytes.Select(b => b.ToString("X")));
    }

    public override bool Equals(object obj)
    {
        var other = obj as Message;
        if(obj==null)
            return false;
        return Equals(other);
    }

    protected bool Equals(Message other)
    {
        return IsSequenceEqual(Header, other.Header) 
            && IsSequenceEqual(Data, other.Data);
    }

    private bool IsSequenceEqual<T>(IEnumerable<T> expected, IEnumerable<T> other)
    {
        if(expected==null && other==null)
            return true;
        if(expected==null || other==null)
            return false;
        return expected.SequenceEqual(other);
    }

    public override int GetHashCode()
    {
        unchecked
        {
            return ((Header != null ? Header.GetHashCode() : 0) * 397) ^ (Data != null ? Data.GetHashCode() : 0);
        }
    }
}

Now that I have all the plumbing, I can focus on the actual problem.

public static IObservable<Message> MyAnswer(IObservable<byte> source)
{
    return source.Publish(s =>
        {

            return 
                Observable.Defer(()=>
                    //Start consuming once we see a BOM
                    s.SkipWhile(b => b != Message.BOM)
                     .Scan(new Accumulator(), (acc, cur)=>acc.Accumulate(cur))
                )
                .TakeWhile(acc=>!acc.IsEndOfMessage())
                .Where(acc=>!acc.IsBeginingOfMessage())
                .Select(acc=>acc.Value())
                .ToArray()
                .Where(buffer=>buffer.Any())
                .Select(buffer => Message.Create(buffer))
                .Repeat();
        }); 

}
public class Accumulator
{
    private int _index = 0;
    private byte _current =0;
    private bool _isCurrentEscaped = false;
    private bool _isNextEscaped = false;

    public Accumulator Accumulate(byte b)
    {
        _index++;
        _current = b;
        _isCurrentEscaped = _isNextEscaped;
        _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
        return this;
    }
    public byte Value()
    {
        return _current;
    }

    private bool IsHeader()
    {
        return _index < 5;
    }
    public bool IsBeginingOfMessage()
    {
        return _index == 1 && _current == Message.BOM;
    }
    public bool IsEndOfMessage()
    {
        return !IsHeader()
            && _current == Message.EOM 
            && !_isCurrentEscaped;
    }
}

For completeness, here is the guts of @Evk's answer so you easily swap in and out implementations.

public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
{
    return Observable.Create<Message>(o =>
    {
        // some crude parsing code for the sake of example
        bool nextIsEscaped = false;
        bool readingHeader = false;
        bool readingBody = false;
        List<byte> body = new List<byte>();
        List<byte> header = new List<byte>();
        return source.Subscribe(b =>
        {
            if (b == 0x81 && !nextIsEscaped && !readingHeader)
            {
                // start
                readingHeader = true;
                readingBody = false;
                nextIsEscaped = false;
            }
            else if (b == 0x82 && !nextIsEscaped && !readingHeader)
            {
                // end
                readingHeader = false;
                readingBody = false;
                if (header.Count > 0 || body.Count > 0)
                {
                    o.OnNext(new Message()
                    {
                        Header = header.ToArray(),
                        Data = body.ToArray()
                    });
                    header.Clear();
                    body.Clear();
                }
                nextIsEscaped = false;
            }
            else if (b == 0x1B && !nextIsEscaped && !readingHeader)
            {
                nextIsEscaped = true;
            }
            else
            {
                if (readingHeader)
                {
                    header.Add(b);
                    if (header.Count == 3)
                    {
                        readingHeader = false;
                        readingBody = true;
                    }
                }
                else if (readingBody)
                    body.Add(b);
                nextIsEscaped = false;
            }

        });
    });

}
Inaudible answered 8/6, 2016 at 4:1 Comment(5)
Thanks for the elaborate answer! You are correct in asserting that this is in essence a state-machine. I even added a diagram to my question. I understand your solution, but I kinda wish there was a more intuitive way to describe a state-machine in Rx.Reexamine
It seems that something like Stateless for your state machine would be a great fit (replacing my Accumulator class). Then you just put some Rx around that. I was able to play around and get a nice solution with 7 Rx operators and a statemachine. Testable, clean, encapsulated. Not sure what else you are chasing here.Inaudible
I just wish there'd be a way to express a state-machine in a functional way. I'd also like if the main pipeline could be written like stream.SplitOnMessageBoundaries().Select(UnescapeMessageBuffer).Select(CreateMessage);Reexamine
So why not write that?Inaudible
Maybe you are looking for some that is more declarative? If you open up the RX operator source code there is lots of queues and statemachines in thereInaudible
R
2

You can just use basic building blocks: Observable.Create and Subscribe. First let's grab some code which will help us to read stream as observable of byte[] (there are many different examples of how to do that):

static class Extensions {
    public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize) {           
        var buffer = new byte[bufferSize];            
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead,
            stream.EndRead);
        return Observable.While(
            () => stream.CanRead,
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

Then define message class:

class Message {
    public byte[] Header { get; set; }
    public byte[] Body { get; set; }
}

And then put that into small console app:

    static void Main(string[] args) {
        // original stream
        var stream = new MemoryStream(new byte[] { 0x81, 0x01,0x02,0x03,0x1B,0x1B,0x82,0x82});
        // your initial IObservable<byte[]>
        IObservable<byte[]> bytes = stream.AsyncRead(128); // or any other buffer size
        // your IObservable<Message>
        var observable = Observable.Create<Message>(observer => {
            // some crude parsing code for the sake of example
            bool nextIsEscaped = false;
            bool readingHeader = false;
            bool readingBody = false;
            List<byte> body = new List<byte>();
            List<byte> header = new List<byte>();
            return bytes.Subscribe(buffer => {
                foreach (var b in buffer) {
                    if (b == 0x81 && !nextIsEscaped && !readingHeader) {
                        // start
                        readingHeader = true;
                        readingBody = false;
                        nextIsEscaped = false;
                    }
                    else if (b == 0x82 && !nextIsEscaped && !readingHeader) {
                        // end
                        readingHeader = false;
                        readingBody = false;
                        if (header.Count > 0 || body.Count > 0) {
                            observer.OnNext(new Message() {
                                Header = header.ToArray(),
                                Body = body.ToArray()
                            });
                            header.Clear();
                            body.Clear();
                        }
                        nextIsEscaped = false;
                    }
                    else if (b == 0x1B && !nextIsEscaped && !readingHeader) {
                        nextIsEscaped = true;
                    }
                    else {
                        if (readingHeader) {
                            header.Add(b);
                            if (header.Count == 3) {
                                readingHeader = false;
                                readingBody = true;
                            }
                        }
                        else if (readingBody)
                            body.Add(b);
                        nextIsEscaped = false;
                    }
                }
            });
        });
        observable.Subscribe(msg =>
        {
            Console.WriteLine("Header: " + BitConverter.ToString(msg.Header));
            Console.WriteLine("Body: " + BitConverter.ToString(msg.Body));
        });
        Console.ReadKey();
    }  
Rhythmics answered 7/6, 2016 at 20:43 Comment(2)
Thanks for answering! I understand your approach, but it feels very procedural to me: it's basically a state-machine reader that's wrapped rx. I wish to know if there's another more functional approach to this.Reexamine
You do have the elements of a state machine in your problem. You need to know what the current character is, what the character was preceding it, and potentially the character before that (was the escape character itself escaped?). In my tests, this code meets your requirementsInaudible

© 2022 - 2024 — McMap. All rights reserved.