I have a conduit pipeline processing a long file. I want to print a progress report for the user every 1000 records, so I've written this:
-- | Every n records, perform the IO action.
-- Used for progress reports to the user.
progress :: (MonadIO m) => Int -> (Int -> i -> IO ()) -> Conduit i m i
progress n act = skipN n 1
where
skipN c t = do
mv <- await
case mv of
Nothing -> return ()
Just v ->
if c <= 1
then do
liftIO $ act t v
yield v
skipN n (succ t)
else do
yield v
skipN (pred c) (succ t)
No matter what action I call this with, it leaks memory, even if I just tell it to print a full stop.
As far as I can see the function is tail recursive and both counters are regularly forced (I tried putting "seq c" and "seq t" in, to no avail). Any clue?
If I put in an "awaitForever" that prints a report for every record then it works fine.
Update 1: This occurs only when compiled with -O2. Profiling indicates that the leaking memory is allocated in the recursive "skipN" function and being retained by "SYSTEM" (whatever that means).
Update 2: I've managed to cure it, at least in the context of my current program. I've replaced the function above with this. Note that "proc" is of type "Int -> Int -> Maybe i -> m ()": to use it you call "await" and pass it the result. For some reason swapping over the "await" and "yield" solved the problem. So now it awaits the next input before yielding the previous result.
-- | Every n records, perform the monadic action.
-- Used for progress reports to the user.
progress :: (MonadIO m) => Int -> (Int -> i -> IO ()) -> Conduit i m i
progress n act = await >>= proc 1 n
where
proc c t = seq c $ seq t $ maybe (return ()) $ \v ->
if c <= 1
then {-# SCC "progress.then" #-} do
liftIO $ act t v
v1 <- await
yield v
proc n (succ t) v1
else {-# SCC "progress.else" #-} do
v1 <- await
yield v
proc (pred c) (succ t) v1
So if you have a memory leak in a Conduit, try swapping the yield and await actions.
skipN
but rather to(>>) (yield v) (skipN x y)
. This is a common pitfall when writing recursive routines using monads. I'm not sure if GHC would optimize this correctly without looking at a core dump, but my initial guess is that you aren't actually using a tail-recursive function. – Belshazzarm>>n
might not invoken
in tail position? – Reddicksum (x:xs) = x + sum xs
is not tail recursive, the last function being called is notsum
, but(+)
as it's equivalent tosum (x:xs) = (+) x xs
. This is why we often write recursive functions using a helper function with an accumulator argument, or just usefold
s if the situation is simple enough, such assum = go 0 where { go a [] = a; go a (x:xs) a = go (x + a) xs }
orsum = foldl' (+) 0
. Since the do notation desugars to use>>
and>>=
, this means that the last call in the stack is to one of those, not to it's second argument. – Belshazzar(+)
needs to examine its second argument before it can dispose of its first. I don't know anything about conduits, but inIO
,m >>= f
translates to code that looks like "1. Executem
(non-tail). Put the result inr
. 2. Executef r
(tail). By the timef
is inspected, nothing remains ofm
but the result it produced, and (more importantly),>>=
doesn't have to do anything withf r
; its job is already done. Obviously this is not the case for all monads; maybe something about conduits breaks it? – Reddickpipes
norconduit
need to be tail recursive to run in constant space. The tail recursive discussion is just a red herring. – TeutonicIO
compared to custom monads, sinceIO
is a very special, low level monad, but for custom ones I would imagine that it could be important to consider things like this. – Belshazzarprogress
that demonstrates the memory leak. – Calycinet
is never forced unlessact
forces it? Printing out a full stop won't do that ... – CalycineskipN c t
withskipN !c !t
and see if it helps. – CrepeInt
arithmetic is "conlike", and the initialt
argument toskipN
is locally known to be1
, so the compiler could safely make it strict int
. Whether it does or not I'm not sure, and that would certainly need optimizations to be enabled. – Reddick