Fusing conduits with multiple inputs
Asked Answered
L

2

9

I am trying to create a conduit that can consume multiple input streams. I need to be able to await on one or the other of the input streams in no particular order (e.g., not alternating) making zip useless. There is nothing parallel or non-deterministic going on here: I await on one stream or the other. I want to be able to write code similar to the following (where awaitA and awaitB await on the first or second input stream respectively):

do
  _ <- awaitA
  x <- awaitA
  y <- awaitB
  yield (x,y)
  _ <- awaitB
  _ <- awaitB
  y' <- awaitB
  yield (x,y')

The best solution I have is to make the inner monad another conduit, e.g.

foo :: Sink i1 (ConduitM i2 o m) ()

Which then allows

awaitA = await
awaitB = lift await

And this mostly works. Unfortunately, this seems to make it very difficult to fuse to the inner conduit before the outer conduit is fully connected. The first thing I tried was:

fuseInner :: Monad m =>
                Conduit i2' m i2 -> 
                Sink i1 (ConduitM i2 o m) () -> 
                Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)

But this doesn't work, at least when x is stateful since (x =$=) is run multiple times, effectively restarting x each time.

Is there any way to write fuseInner, short of breaking into the internals of conduit (which looks like it would be pretty messy)? Is there some better way to handle multiple input streams? Am I just way to far beyond what conduit was designed for?

Thanks!

Lette answered 24/3, 2013 at 2:37 Comment(5)
I assume that you mean that you want to receive elements as they are generated from the two IO input streams. Use stm-conduit to do this.Nusku
I've read your updated question. Is this closer to what you had in mind? If so, I can modify it to the equivalent conduit version and submit it as an answer.Nusku
I think that link describes exactly what I tried (foo is a Sink over a ConduitM monad). The problem is, I can't figure out how to fuse to the inner conduit with this strategy.Lette
You can't, at least not usefully. Think of it this way: the inner conduit cannot even know what its commands are until the outer conduit runs. You have to unwrap the outer one before you can fuse the inner one.Nusku
One option is to use ResumableSources. For each of the sources, start with (rSource, value) <- source $$+ head, followed by (rSource', value) <- rSource $$++ head each time you want to read from that stream. When a stream is exhausted or no longer needed, use rSource $$+- return () (or head if you know this is the last value) to finalize it (perform cleanup operations). Unfortunately, this means that you can't use it with arbitrary conduits; only sources.Melisandra
S
3

If you want to combine two IO-generated streams, then Gabriel's comment is the solution.

Otherwise, you can't wait for both streams, which one produces a value first. Conduits are single-threaded and deterministic - it processes only one pipe at a time. But you could create a function that interleaves two streams, letting them decide when to switch:

{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
import Control.Monad (liftM)
import Data.Conduit.Internal (
    Pipe (..), Source, Sink,
    injectLeftovers, ConduitM (..),
    mapOutput, mapOutputMaybe
  )

-- | Alternate two given sources, running one until it yields `Nothing`,
-- then switching to the other one.
merge :: Monad m
      => Source m (Maybe a)
      -> Source m (Maybe b)
      -> Source m (Either a b)
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r
  where
    goL :: Monad m => Pipe () () (Maybe a) () m () 
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goL (Leftover l ()) r           = goL l r
    goL (NeedInput _ c) r           = goL (c ()) r
    goL (PipeM mx) r                = PipeM $ liftM (`goL` r) mx
    goL (Done _) r                  = mapOutputMaybe (liftM Right) r
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o)
    goL (HaveOutput c f Nothing) r  = goR c r
    -- This is just a mirror copy of goL. We should combine them together to
    -- avoid code repetition.
    goR :: Monad m => Pipe () () (Maybe a) () m ()
                   -> Pipe () () (Maybe b) () m ()
                   -> Pipe () () (Either a b) () m ()
    goR l (Leftover r ())           = goR l r
    goR l (NeedInput _ c)           = goR l (c ())
    goR l (PipeM mx)                = PipeM $ liftM (goR l) mx
    goR l (Done _)                  = mapOutputMaybe (liftM Left) l
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o)
    goR l (HaveOutput c f Nothing)  = goL l c

It processes one source until it returns Nothing, then switches to another, etc. If one source finishes, the other one is processed to the end.

As an example, we can combine and interleave two lists:

import Control.Monad.Trans
import Data.Conduit (($$), awaitForever)
import Data.Conduit.List (sourceList)

main =  (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [  1..10])
               (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110]) )
         $$ awaitForever (\x -> lift $ print x)

If you need multiple sources, merge could be adapted to something like

mergeList :: Monad m => [Source m (Maybe a)] -> Source m a

which would cycle through the given list of sources until all of them are finished.

Stoplight answered 24/3, 2013 at 9:45 Comment(2)
I think both your solution and Gabriel's comment assumes that I want the upstream streams to decide what happens next, which is not what I'm going for. I've updated my question to try to clarify.Lette
@Lette I'm not sure if this is possible, because conduit's await (or more specifically NeedInput) doesn't pass any information upstream that could be used to decide what stream to read. The order of values coming received from upstream can't be influenced by a conduit. However this seems to be possible with pipes. They are bidirectional and request allows information to be send upstream, which could be used to select one of two streams.Stoplight
L
3

This can be done by diving into the internals of conduit. I wanted to avoid this because it looked extremely messy. Based on the responses here, it sounds like there is no way around it (but I would really appreciate a cleaner solution).

The key difficulty is that (x =$=) is a pure function, but to make transPipe give the correct answer, it needs a kind of stateful, function-like thing:

data StatefulMorph m n = StatefulMorph
    { stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a)
    , finalizeStatefulMorph :: n () }

Stepping StatefulMorph m n takes a value in m and returns, in n, both that value and the next StatefulMorph, which should be used to transform the next m value. The last StatefulMorph should be finalized (which, in the case of the "stateful (x =$=)", finalizes the x conduit.

Conduit fusion can be implemented as a StatefulMorph, using the code for pipeL with minor changes. The signature is:

fuseStateful :: Monad m
             => Conduit a m b
             -> StatefulMorph (ConduitM b c m) (ConduitM a c m)

I also need a replacement for transPipe (a special case of hoist) that uses StatefulMorph values instead of functions.

class StatefulHoist t where
    statefulHoist :: (Monad m, Monad n)
                  => StatefulMorph m n
                  -> t m r -> t n r

A StatefulHoist instance for ConduitM i o can be written using the code for transPipe with some minor changes.

fuseInner is then easy to implement.

fuseInner :: Monad m
          => Conduit a m b
          -> ConduitM i o (ConduitM b c m) r
          -> ConduitM i o (ConduitM a c m) r
fuseInner left = statefulHoist (fuseStateful left)

I've written a more detailed explanation here and posted the full code here. If someone can come up with a cleaner solution, or one that uses the conduit public API, please post it.

Thanks for all the suggestions and input!

Lette answered 25/3, 2013 at 1:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.