Folding a subset of a stream using pipes 4.0
Asked Answered
C

2

6

I'm trying to understand pipes 4.0, and want to convert some conduit code. Suppose I have a stream of Ints, and I'd like to skip the first five, then get the sum of the following 5. Using plain lists, this would be:

sum . take 5 . drop 5

In conduit, this would be:

drop 5
isolate 5 =$ fold (+) 0

Or as a complete program:

import Data.Conduit
import Data.Conduit.List (drop, isolate, fold)
import Prelude hiding (drop)

main :: IO ()
main = do
    res <- mapM_ yield [1..20] $$ do
        drop 5
        isolate 5 =$ fold (+) 0
    print res

However, I'm not quite certain how to do this with pipes.

Carrico answered 24/9, 2013 at 5:20 Comment(2)
But Pipes have the same take, drop and foldLadyship
@SassaNF The fold type in pipes is significantly different than the type in conduit, which is where the confusion comes from.Carrico
P
5

I haven't used Pipes before, but after going through the tutorial I found it really simple:

import Pipes
import qualified Pipes.Prelude as P

nums :: Producer Int IO ()
nums = each [1..20]

process :: Producer Int IO ()
process = nums >-> (P.drop 5) >-> (P.take 5)

result :: IO Int
result = P.fold (+) 0 id process

main = result >>= print

UPDATE:

As there is no "effectful" processing in the example we can even use Identity monad as the base monad for pipe:

import Pipes
import qualified Pipes.Prelude as P
import Control.Monad.Identity

nums :: Producer Int Identity ()
nums = each [1..20]

process :: Producer Int Identity ()
process = nums >-> (P.drop 5) >-> (P.take 5)

result :: Identity Int
result = P.fold (+) 0 id process

main = print $ runIdentity result

UPDATE 1:

Below is the solution I came up with (for the gist link comment), but I feel like it can be made more elegant

fun :: Pipe Int (Int, Int) Identity ()
fun = do
  replicateM_ 5 await
  a <- replicateM 5 await
  replicateM_ 5 await
  b <- replicateM 5 await
  yield (sum a, sum b)

main = f $ runIdentity $ P.head $ nums >-> fun where
  f (Just (a,b)) = print (a,b)
  f Nothing = print "Not enough data"
Pompom answered 24/9, 2013 at 9:2 Comment(4)
Thanks, that's what I was looking for. I tried something similar to that which didn't work, though I can't seem to find what it was.Carrico
Actually, I think the thing I ended up getting really tripped up on was this slightly more complicated example: gist.github.com/snoyberg/6683033, but that's really a separate issue. If you have any ideas on it, let me know, otherwise I'll just open it as its own question.Carrico
Note that there is a sum pipe you can use, too, but this is still a very good answer.Arise
I understand that that answer gives the same result, but it's quite different than the gist. (1) It's reading the values into memory instead of doing a streaming fold. (2) For the case of sum this works fine, but if the consumer was something more complicated (e.g., zlib-compressing and saving to a file), we couldn't just pull in a Prelude function. (3) It behaves differently if there is not enough input.Carrico
A
4

To answer your comment, this still works in the general case. I also posted the same answer on reddit where you also asked a similar question there, but I'm duplicating the answer here:

import Pipes
import Pipes.Parse
import qualified Pipes.Prelude as P

main :: IO ()
main = do
    res <- (`evalStateT` (each [1..20])) $ do
        runEffect $ for (input >-> P.take 5) discard
        P.sum (input >-> P.take 5)
    print res

This will generalize to the more complicated cases that you had in mind.

Arise answered 24/9, 2013 at 22:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.