How to create a responding TCP listener with the System.IO.Pipelines package?
Asked Answered
P

1

7

I want to create a TCP Listener using Kestrel and the System.IO.Pipelines package. The messages I receive will always be HL7 messages. An example message could be

MSH|^~&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5 EVN||200605290901||||200605290900 PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMINGHAM^AL^35200^^O|||||||0105I30001^^^99DEF^AN PV1||I|W^389^1^UABH^^^^3||||12345^MORGAN^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L|||||||||||||||||||||||||||200605290900 OBX|1|NM|^Body Height||1.80|m^Meter^ISO+|||||F OBX|2|NM|^Body Weight||79|kg^Kilogram^ISO+|||||F AL1|1||^ASPIRIN DG1|1||786.50^CHEST PAIN, UNSPECIFIED^I9|||A

The only important thing to note is that each incoming HL7 message starts with a vertical tab character so you know where the message begins. Each HL7 message contains multiple segments so I think I will have to loop through each segment. After processing the request I want to send back a HL7 message as a response. First of all I came up with this

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IDuplexPipe pipe = connection.Transport;

        await FillPipe(pipe.Output);
        await ReadPipe(pipe.Input);
    }

    private async Task FillPipe(PipeWriter pipeWriter)
    {
        const int minimumBufferSize = 512;

        while (true)
        {
            Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
            
            try
            {
                int bytesRead = 32; // not sure what to do here
                
                if (bytesRead == 0)
                {
                    break;
                }
                
                pipeWriter.Advance(bytesRead);
            }
            catch (Exception ex)
            {
                // ... something failed ...

                break;
            }

            FlushResult result = await pipeWriter.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeWriter.Complete();
    }

    private async Task ReadPipe(PipeReader pipeReader)
    {
        while (true)
        {
            ReadResult result = await pipeReader.ReadAsync();

            ReadOnlySequence<byte> buffer = result.Buffer;
            SequencePosition? position;

            do
            {
                position = buffer.PositionOf((byte)'\v');

                if (position != null)
                {
                    ReadOnlySequence<byte> line = buffer.Slice(0, position.Value);

                    // ... Process the line ...

                    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                }
            }
            while (position != null);

            pipeReader.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeReader.Complete();
    }
}

Unfortunately I'm struggling with some things:

  • The part int bytesRead = 32;, how do I get to know how many bytes have been read? Or how to read with the writer instance?
  • Currently the debugger does not hit the code at // ... Process the line .... Basically I have to extract the whole HL7 message so I can use my HL7 parser to convert the message string.
  • Where do I have to respond? After calling await ReadPipe(pipe.Input);? By using await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);?
Pleasure answered 18/9, 2020 at 8:0 Comment(4)
Hi; I created a 3-and-a-bit blog series that covers this topic in depth, with code examples (including a full GitHub repo); kinda too deep to try and summarise here, but I suspect it will help - starts here: blog.marcgravell.com/2018/07/pipe-dreams-part-1.htmlYawmeter
@MarcGravell I tried to follow your series (part 1 and 2). Unfortunately the Github repo is really huge and too complex for me as a beginner. I was able to follow your series e.g. how does the pipe buffer work but I'm not able to transform the knowledge from this to my own problemPleasure
Have you considered github.com/Efferent-Health/HL7-dotnetcore?Forging
@AndyVaal I'm currently using NHAPI but I think this one is better and actively maintained :) thanks for thatPleasure
S
9

Have you seen David Fowler's TcpEcho example? I'd say that's fairly canonical as he's the one that posted the devblogs System.IO.Pipelines announcement.

His example deals with raw sockets. I've adapted it to the ConnectionHandler API and HL7 messages (however, I know very little about HL7):

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        while (true)
        {
            var result = await connection.Transport.Input.ReadAsync();
            var buffer = result.Buffer;

            while (TryReadMessage(ref buffer, out ReadOnlySequence<byte> hl7Message))
            {
                // Process the line.
                var response = ProcessMessage(hl7Message);
                await connection.Transport.Output.WriteAsync(response);
            }

            if (result.IsCompleted)
            {
                break;
            }

            connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
        }
    }

    public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> hl7Message)
    {
        var endOfMessage = buffer.PositionOf((byte)0x1C);

        if (endOfMessage == null || !TryMatchNextByte(ref buffer, endOfMessage.Value, 0x0D, out var lastBytePosition))
        {
            hl7Message = default;
            return false;
        }

        var messageBounds = buffer.GetPosition(1, lastBytePosition.Value); // Slice() is exclusive on the upper bound
        hl7Message = buffer.Slice(0, messageBounds);
        buffer = buffer.Slice(messageBounds); // remove message from buffer
        return true;
    }

    /// <summary>
    /// Does the next byte after currentPosition match the provided value?
    /// </summary>
    private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer, SequencePosition currentPosition, byte value, out SequencePosition? nextPosition)
    {
        nextPosition = buffer.Slice(currentPosition).PositionOf(value);
        if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1, currentPosition)))
        {
            nextPosition = null;
            return false;
        }
        return true;
    }

    private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
    {
        var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
        // do something with the message and generate your response. I'm using UTF8 here
        // but not sure if that's valid for HL7.
        return Encoding.UTF8.GetBytes("Response message: OK!");
    }
}

Update: Added the most recent information about HL7 messages structure.

Subvention answered 22/9, 2020 at 14:47 Comment(5)
hey Will, thanks for your reply! I found a package for HL7 which is able to parse the whole TCP message string to multiple HL7 messages. I tried to modify your code and would like to know if this should be fine and where to slice the buffer .. ? pastebin.com/McLiQjQVPleasure
Hi @OlafSvenson I would still recommend using the HL7 package to parse a single message, and let the pipeline care about extracting messages from the incoming bytes. This is because you're not guaranteed, when reading the incoming bytes, to have a complete HL7 message or segment -- maybe you just get a single byte or two, and need to wait for the next ReadAsync to return the remaining bytes. The HL7 package can process multiple messages, but can it handle partial messages? My guess is no, but the pipeline code can (it's handling it in the buffer = buffer.Slice line).Subvention
I tested your code and appreciate that :) 1 problem: When TryReadMessage gets called the slicing seems to be wrong. When assigning hl7Message you will only cut off the character so the message will only contain the vertical tab. I think the best way is to check for the last character, I created a separate question for that #64044332Pleasure
What I mean by this: Currently I'm trying to extract 1 message with your code. Which doesn't work properly as mentioned above. When checking for the last occuring HL7 suffix '\r' I can extract all valid messages from the buffer at oncePleasure
Ah! I didn't understand the HL7 message structure, I thought \v was the delimiter. I updated the code to work with 0x1c, 0x0d. It adds a TryMatchNextByte function that can peek at the byte after 0x1c and see if it's 0x0d.Subvention

© 2022 - 2024 — McMap. All rights reserved.