How can I turn a Sink into a Conduit?
Asked Answered
I

1

7

I'm trying to write a Conduit using an attoparsec parser. Specifically, given parseOne :: Parser T, I'd like to construct a Conduit ByteString m T that repeatedly applies the parser to the input and streams the results.

attoparsec-conduit offers sinkParser to turn a Parser into a Sink, but how can I turn this Sink into a Conduit? What I'm looking for is a function like:

conduitSink :: (Resource m) => Sink a m b -> Conduit a m b

which repeatedly feeds data into the Sink, producing each result as it goes. It seems like it could be written fairly easily as a manual loop, but I'm wondering if there's a better way.

The lack of this seemingly-obvious function in the conduit library makes me think I might be doing something wrong; is there a better way to accomplish this? The use case is turning raw bytes into the parsed form of a message-based network protocol, to be processed by later stages of the pipeline. I already have the opposite direction (i.e. Conduit T m ByteString) thanks to blaze-builder-conduit, so this seemed like the most natural way to structure things.

Inkstand answered 28/1, 2012 at 4:10 Comment(0)
R
6

You need to use the SequencedSink system for this; it uses a sink and a tracked state to produce a conduit from the repeated application of a sink producer.

The sink you have created is optimized for incrementally parsing one value, which will be the result at the end of a conduit sequence.

Since you want this to be part of a conduit pipeline, however, and each chunk of the incoming ByteString might or might not match your parser one or multiple times, you need to take care to get more fine-grained control of the parsing process, passing on the state of an incomplete parse between every application of the sink.

Assuming, for example, that your parser parses [--] or [----] etc., and T is Int denoting the number of dashes parsed, you need to track the state of the parser as demonstrated by this:

Input chunk    Sink result - Data.Conduit.SequencedSinkResponse
[--][---]      Emit Nothing [2, 3]
[---][---      Emit (Just #func) [3]
---------      Emit (Just #func) []
]              Emit Nothing [12]
               Stop

In this case, I use Maybe (ByteString -> Data.Attoparsec.ByteString.Result) as the passed-on state; a different data type might be more suitable depending on the situation.

This explicit stream treatment is needed to maintain the pipeline nature of the conduit; having the parser conduit be a "bottleneck", always waiting for enough data to chunk-wise satisfy a parser, would be a major performance sink.

The implementation of the needed sink should be fairly trivial with the available ResourceT monad interface.

EDIT: Simply applying your sink in a loop would indeed be the simplest solution, but it will have slightly different performance characteristics if your parser parses short snippets that often end up on the borders of byte chunks.

Ritualize answered 28/1, 2012 at 4:52 Comment(3)
Thanks, I'll give that a try. Does this mean that I won't be using attoparsec-conduit at all? If so, would there be any obstacle to adding a generic conduitParser :: (AttoparsecInput a, ResourceThrow m) => Parser a b -> Conduit a m b to its interface using this technique, or is it a simple omission?Inkstand
@ehird, I believe that it's simply an omission; the current sinkParser code suggests that it could easily be transformed to parse the input stream multiple times, as it uses a similar technique as I described above, only that it stops consuming input after the first parse.Ritualize
Indeed, it seems there's already a pull request adding conduitParser to attoparsec-enumerator; I'll probably use that implementation. Thanks for letting me know about SequencedSink, by the way; I passed over it when reading the documentation.Inkstand

© 2022 - 2024 — McMap. All rights reserved.