Conduit Broadcast
Asked Answered
A

1

8

A view days ago, I asked this question. Now I need a pure single threaded version of this function:

To repeat, I need a function that sends each received value to each sink and collects their results. The type signature of the function should be something like this:

broadcast :: [Sink a m b] -> Sink a m [b]

Best Sven


P.S. It is not sequence, I've tried that:

> C.sourceList [1..100] $$ sequence [C.fold (+) 0, C.fold (+) 0]
[5050, 0]

expected result:

[5050, 5050]

P.P.S. zipSinks gives the desired result, but it works just with tuples:

> C.sourceList [1..100] $$ C.zipSinks (C.fold (+) 0) (C.fold (+) 0)
(5050, 5050)
Angelita answered 15/8, 2013 at 11:48 Comment(0)
S
10

Basically all we need to do is to reimplement sequence, but with zipSinks instead of the original sequencing operation:

import Data.Conduit as C
import Data.Conduit.List as C
import Data.Conduit.Util as C

fromPairs
    :: (Functor f)
    => f [a]                        -- ^ an empty list to start with
    -> (f a -> f [a] -> f (a, [a])) -- ^ a combining function
    -> [f a]                        -- ^ input list
    -> f [a]                        -- ^ combined list
fromPairs empty comb = g
  where
    g []     = empty
    g (x:xs) = uncurry (:) `fmap` (x `comb` g xs)

Now creating broadcast is just applying fromPairs to zipSinks:

broadcast :: (Monad m) => [Sink a m b] -> Sink a m [b]
broadcast = fromPairs (return []) zipSinks

And we can do something like

main = C.sourceList [1..100] $$ broadcast [C.fold (+) 0, C.fold (*) 1]

Update: We can see that fromPairs looks just sequenceA and so we can push the idea even further. Let's define a zipping applicative functor on conduits similar to ZipList:

import Control.Applicative
import Control.Monad
import Data.Conduit
import Data.Conduit.Util
import Data.Traversable (Traversable(..), sequenceA)

newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }

instance Monad m => Functor (ZipSink i m) where
    fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
    pure  = ZipSink . return
    (ZipSink f) <*> (ZipSink x) =
         ZipSink $ liftM (uncurry ($)) $ zipSinks f x

Then broadcast becomes as simple as

broadcast :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
broadcast = getZipSink . sequenceA . fmap ZipSink
Spoliation answered 15/8, 2013 at 13:21 Comment(2)
@Angelita Actually sinks form an applicative functor wrt zipping, which makes it even easier to combine them. I updated the answer.Spoliation
+1, this solution totally rocks!!! There are certain things in Haskell libraries I have seen, but had no clue what they are good for until I discover a use case. Now I know what ZipLists/Sinks are, thanks.Angelita

© 2022 - 2024 — McMap. All rights reserved.