Parallel processing in conduit flow
Asked Answered
F

1

6

I really like the concept of conduit/pipes for applying operations to a streaming IO source. I am interested in building tools that work on very large log files. One of the attractions of moving to Haskell from Python/Ruby is the easier way of writing parallel code, but I can't find any documentation of this. How could I set up a conduit-flow which reads lines from a file and works on them in parallel (ie. with 8 cores, it should read eight lines, and hand them off to eight different threads to be processed, and then collected again etc), ideally with as little "ceremony" as possible...

Optionally it could be noted whether the lines need to be rejoined in order or not, if that could influence the speed of the process?

I am sure it would be possible to cobble together something myself using ideas from the Parallel Haskell book, but it seems to me that running a pure function in parallel (parmap etc) in the middle of a Conduit workflow should be very easy?

Freedafreedman answered 4/11, 2014 at 18:23 Comment(3)
In general, the concept of conduits (and pipes) is designed to be sequential, not parallel. In particular, when a conduit requests input, only then its upstream conduit is run to produce one value. So there is nothing to parallelize. What you could do is to create a conduit that is internally parallelized - receiving inputs, scheduling tasks to process them and yield their output.Droughty
I just find it strange, since so many proponents of Haskell mention the increasing importance of multi-core processors, and how Haskell as a pure functional language really shines in this regard. And whenever you talk about doing things to very large amounts of data in Haskell, conduit and pipe are mentioned as the best frameworks. So I would have thought that parallelizing the processing in a pipe/conduit should be "trivial"...Multitude
There are two somwehat different concepts: concurrency and parallelism (I should have make this distinction clearer in my first comment). Parallelism is natural in pure languages and it doesn't depend on any framework, you simply start several sparks to compute pure functions on multiple cores, fpr example in a conduit. On the other hand, concurrency (multi-threading) is explicit and non-deterministic and is conceptually different from conduits/pipes.Droughty
P
8

As an example of the "internal parallelism" mentioned by Petr Pudlák in his comment, consider this function (I'm using pipes, but could be implemented with conduit just as easily):

import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L

concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer = 
      L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
      >->
      forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield) 

This function takes as parameters a group size, an action we want to run for each value of type a, and a Producer of a values.

It returns a new Producer. Internally, the producer reads a values in batches of groupsize, processes them concurrently, and yields the results one by one.

The code uses Pipes.Group to "partition" the original producer into sub-producers of size groupsize, and then Control.Foldl to "fold" each sub-producer into a list.

For more sophisticated tasks, you could turn to the asynchronous channels provided by pipes-concurrency or stm-conduit. But these yank you out somewhat of the "single pipeline" worldview of vanilla pipes/conduits.

Pomposity answered 5/11, 2014 at 7:55 Comment(1)
This is beautiful. Thanks very much! I have been thinking about this for a while now and it seems Pipes.Group gives a natural way of how to group a pipe stream into chunks and work on them in parallel. I never got the idea of using purely folds list to create the chunks. Very nice indeed!Pachalic

© 2022 - 2024 — McMap. All rights reserved.