What is the preferred way to combine two sinks?
Asked Answered
I

2

21

I've used zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r') for this but it is considered deprecated.

Interlocutory answered 7/8, 2012 at 11:2 Comment(3)
What behavior, exactly, do you want the "combined" sinks to have? I tried looking at the old documentation and implementation of zipSinks, but the behavior not easily discernible at a glance.Glycerinate
@DanBurton: zipSinks takes two Sinks and returns a Sink that produces a pair with results of corresponding Sinks. For example sizeCrc32Sink = zipSinks sizeSink crc32Sink will count size and checksum. I what the same behavior as described by Oleg here.Interlocutory
Ok I see; it basically hooks up the awaits and feeds the upstream output to both sinks simultaneously, sort of forking the input stream in two. The docs for Data.Conduit.Util state that "there are now easier ways to handle their use cases" but I see no easier way for this use case, as it requires delving into conduit internals to implement.Glycerinate
W
7

Edit

After considering this, I don't think it is possible with the current version of Data.Conduit. Pipes aren't Categories, so &&& is out of the question. And there's no way that I can think of to pull results from upstream, feed them incrementally to both sinks, and short-circuit when the first sink finishes. (Although I don't think that Data.Conduit.Util.zipSinks short-circuits this way, it seems like it would be very desirable.) Except of course, to pattern match on both Sinks (like zipSinks in the package does), but that's what we're trying to avoid here.

That said, I would love to be proven wrong here.


It's not pretty, but you can do this in a kind-of obvious way.

First imports:

module Main where

import Control.Monad.Trans
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Text as CT
import qualified Data.Conduit.Util as CU
import Data.Maybe
import Data.Text (unpack)

Now for zipSinks. Basically, you want to create a sink that pulls the input from upstream and sends it to each child sink separately. In this case, I've used CL.sourceList to do this. If await returns Nothing, maybeToList returns an empty list, so the child sinks are also run with no input. Finally, the output of each child sink is then fed into the tuple.

zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks s1 s2 = do
    l  <- fmap maybeToList await
    o1 <- lift $ CL.sourceList l $$ s1
    o2 <- lift $ CL.sourceList l $$ s2
    return (o1, o2)

Here are some examples of using zipSinks. It appears to work fine both inside of IO and outside of it, and in the few tests I did, the output matches the output of zipped', created using the old zipSinks.

doubleHead :: Monad m => Sink Int m (Maybe Int)
doubleHead = await >>= return . fmap (2*)

-- old version
zipped' :: Monad m => Sink Int m (Maybe Int, Maybe Int)
zipped' = CU.zipSinks CL.head doubleHead

-- new version
zipped :: Monad m => Sink Int m (Maybe Int, Maybe Int)
zipped = zipSinks CL.head doubleHead

fromList = CL.sourceList [7, 8, 9] $$ zipped
-- (Just 7, Just 14)

fromFile :: String -> IO (Maybe Int, Maybe Int)
fromFile filename = runResourceT $
       CB.sourceFile filename
    $= CB.lines
    $= CT.decode CT.utf8
    $= CL.map (read . unpack)
    $$ zipped

-- for a file with the lines:
--
-- 1
-- 2
-- 3
--
-- returns (Just 1, Just 2)
Woods answered 15/8, 2012 at 19:14 Comment(4)
Nice! (NB, you could write await >>= return . fmap (2*) for doubleHead, and similarly, l <- fmap maybeToList await instead of using input in zipSinks. Also, is Data.Conduit.Internals an extraneous import?)Trogon
Yeah, I realized that I probably could have used functors in some places. I'm still enough of a n00b at Haskell that that's usually a second-pass edit for me, unfortunately. And yes, Data.Conduits.Internals is extraneous. Originally, I was looking at using sinkToPipe from it. Thanks for pointing these out. I'll update the answer.Woods
Your version of zipSinks combines only the first elements. For example runResourceT $ CL.sourceList [1,2,3] $$ zipSinks (CL.take 2) (CL.take 2) will return ([1],[1]) but should ([1,2],[1,2]).Interlocutory
Hmm. That's a problem. I think there might be a way to do it with the arrow fan-out operator (&&&), but I haven't teased it out yet.Woods
H
6

((The package is conduit-0.5.2.3. The whole module is just for backwards compatibility.))


[edit]

So, my straightforward monadic guess (see below) seems to be wrong, even though the types are correct. Now, I can only guess that the answer is:

The replacing features are still in development, pretty much like all Pipe/Conduit and similar concepts and libraries.

I'd wait for the next API to solve this question and still use zipSink until then. (Maybe it was just misplaced.)

[/edit]

I'm not that familar with this package, but wouldn't it do just the same as this?

zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r')
zipSinks s1 s2 = (,) <$> s1 <*> s2

It is a Monad after all. (Functor, Applicative)

zipSinks :: Monad sink => sink r -> sink r' -> sink (r, r')
zipSinks s1 s2 = liftM2 (,) s1 s2
Hygienic answered 14/8, 2012 at 16:42 Comment(1)
Types are ok, but not semantics. Your version of zipSinks will run sinks consecutively and first sink will fully consume the source.Interlocutory

© 2022 - 2024 — McMap. All rights reserved.