Conduit: Multiple Stream Consumers
Asked Answered
P

1

8

I write a program which counts the frequencies of NGrams in a corpus. I already have a function that consumes a stream of tokens and produces NGrams of one single order:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

At the moment i just can connect one stream consumer to a stream source:

tokens --- trigrams --- countFreq

How do I connect multiple stream consumers to the same stream source? I would like to have something like this:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq

A plus would be to run each consumer in parallel

EDIT: Thanks to Petr I came up with this solution

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs
Perlis answered 29/7, 2013 at 18:3 Comment(1)
You want that when tokens yields a value, all your ...grams receive it?Patriliny
P
6

I'm assuming you want all your sinks to receive all values.

I'd suggest:

  1. Use newBroadcastTMChan to create a new channel Control.Concurrent.STM.TMChan (stm-chans).
  2. Use this channel to build a sink using sinkTBMChan from Data.Conduit.TMChan (stm-conduit) for your main producer.
  3. For each client use dupTMChan to create its own copy for reading. Start a new thread that will read this copy using sourceTBMChan.
  4. Collect results from your threads.
  5. Be sure your clients can read the data as fast as they're produced, otherwise you can get heap overflow.

(I haven't tried it, let us know how it works.)


Update: One way how you could collect the results is to create a MVar for each consumer thread. Each of them would putMVar its result after it's finished. And your main thread would takeMVar on all these MVars, thus waiting for every thread to finish. For example if vars is a list of your MVars, the main thread would issue mapM takeMVar vars to collect all the results.

Patriliny answered 29/7, 2013 at 19:8 Comment(3)
Thanks for the answer, how do i collect the results if i spawn the threads with forkIO?Perlis
@Perlis I updated the answer with an idea how to collect the results.Patriliny
Why does TMChan have a broadcast version, and TBMChan doesn't, where can I find a newBroadcastTBMChan?Sparkman

© 2022 - 2024 — McMap. All rights reserved.