What's the real benefit of conduit's upstream type parameter?
Asked Answered
E

2

10

I'm trying to understand the differences between different implementations of the concept of pipes. One of the differences between conduit and pipes is how they fuse pipes together. Conduit has

(>+>) :: Monad m
      => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2

while pipes have

(>->) :: (Monad m, Proxy p)
      => (b' -> p a' a b' b m r) -> (c' -> p b' b c' c m r) -> c' -> p a' a c' c m r

If I understand it correctly, with pipes, when any pipe of the two stops, its result is returned and the other one is stopped. With conduit, if the left pipe finished, its result is sent downstream to the right pipe.

I wonder, what is the benefit of conduit's approach? I'd like to see some example (preferably real-world) which is easy to implement using conduit and >+>, but hard(er) to implement using pipes and >->.

Each answered 6/3, 2013 at 21:27 Comment(0)
S
5

In my experience, the real-world benefits of upstream terminators are very slim, which is why they're hidden from the public API at this point. I think I only used them in one piece of code ever (wai-extra's multipart parsing).

In its most general form, a Pipe allows you to produce both a stream of output values and a final result. When you fuse that Pipe with another downstream Pipe, then that stream of output values becomes downstream's input stream, and upstream's final result becomes downstream's "upstream terminator." So from that perspective, having arbitrary upstream terminators allows for a symmetric API.

However, in practice, it's very rare that such functionality is actually used, and since it just confuses the API, it was hidden in the .Internal module with the 1.0 release. One theoretical use case could be the following:

  • You have a Source which produces a stream of bytes.
  • A Conduit which consumes a stream of bytes, calculates a hash as a final result, and passes on all of the bytes downstream.
  • A Sink which consumes the stream of bytes, e.g., to store them in a file.

With upstream terminators, you could connect these three up and have the result from the Conduit returned as the final result of the pipeline. However, in most cases there's an alternate, simpler means to achieve the same goals. In this case, you could:

  1. Use conduitFile to store the bytes in a file and turn the hash Conduit into a hash Sink and place it downstream
  2. Use zipSinks to merge both a hash sink and a file writing sink into a single sink.
Sandberg answered 7/3, 2013 at 5:15 Comment(0)
S
9

The classic example of something easier to implement with conduit currently is handling end of input from upstream. For example, if you want to fold a list of values and bind the result within the pipeline, you cannot do it within pipes without engineering an extra protocol on top of pipes.

In fact, this is precisely what the upcoming pipes-parse library solves. It engineers a Maybe protocol on top of pipes and then defines convenient functions for drawing input from upstream that respect that protocol.

For example, you have the onlyK function, which takes a pipe and wraps all outputs in Just and then finishes with a Nothing:

onlyK :: (Monad m, Proxy p) => (q -> p a' a b' b m r) -> (q -> p a' a b' (Maybe b) m r)

You also have the justK function, which defines a functor from pipes that are Maybe-unaware to pipes that are Maybe-aware for backwards compatibility

justK :: (Monad m, ListT p) => (q -> p x a x b m r) -> (q -> p x (Maybe a) x (Maybe b) m r)

justK idT = idT
justK (p1 >-> p2) = justK p1 >-> justK p2

And then once you have a Producer that respects that protocol you can use a large variety of parsers that abstract over the Nothing check for you. The simplest one is draw:

draw :: (Monad m, Proxy p) => Consumer (ParseP a p) (Maybe a) m a

It retrieves a value of type a or fails in the ParseP proxy transformer if upstream ran out of input. You can also take multiple values at once:

drawN :: (Monad m, Proxy p) => Int -> Consumer (ParseP a p) (Maybe a) m [a]

drawN n = replicateM n draw  -- except the actual implementation is faster

... and several other nice functions. The user never actually has to directly interact with the end of input signal at all.

Usually when people ask for end-of-input handling, what they really wanted was parsing, which is why pipes-parse frames end-of-input issues as a subset of parsing.

Showroom answered 6/3, 2013 at 22:23 Comment(2)
I'm curious, how does this protocol go together with pipe composability? Suppose I have a pipe readFileK that reads a file and the sends Nothing to signal the end. The if I do (readFileK "file1" >> readFileK "file2") >-> otherPipeK then otherPipeK receives Nothing twice? And on the other hand, if I have readFileK "file" >-> (pipe1K >> pipe2K) and the input from the file is exhausted while pipe1K is processing then pipe2K never learns that the input has already been depleted.Each
This is why onlyK is a separate combinator and the Nothing behavior is not built in to the sources. This way you can combine multiple sources into one, such as onlyK (readFileS "file" >=> readSocketS socket). Your second example doesn't cause any problems. If pipe1K runs out of input it will fail in ParseP and pipe2K will never run. None of the parsing primitives are capable of going past the end of input marker.Showroom
S
5

In my experience, the real-world benefits of upstream terminators are very slim, which is why they're hidden from the public API at this point. I think I only used them in one piece of code ever (wai-extra's multipart parsing).

In its most general form, a Pipe allows you to produce both a stream of output values and a final result. When you fuse that Pipe with another downstream Pipe, then that stream of output values becomes downstream's input stream, and upstream's final result becomes downstream's "upstream terminator." So from that perspective, having arbitrary upstream terminators allows for a symmetric API.

However, in practice, it's very rare that such functionality is actually used, and since it just confuses the API, it was hidden in the .Internal module with the 1.0 release. One theoretical use case could be the following:

  • You have a Source which produces a stream of bytes.
  • A Conduit which consumes a stream of bytes, calculates a hash as a final result, and passes on all of the bytes downstream.
  • A Sink which consumes the stream of bytes, e.g., to store them in a file.

With upstream terminators, you could connect these three up and have the result from the Conduit returned as the final result of the pipeline. However, in most cases there's an alternate, simpler means to achieve the same goals. In this case, you could:

  1. Use conduitFile to store the bytes in a file and turn the hash Conduit into a hash Sink and place it downstream
  2. Use zipSinks to merge both a hash sink and a file writing sink into a single sink.
Sandberg answered 7/3, 2013 at 5:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.