Haskell fast concurrent queue
Asked Answered
A

2

28

The Problem

Hello! I'm writing a logging library and I would love to create a logger, that would run in separate thread, while all applications threads would just send messages to it. I want to find the most performant solution for this problem. I need simple unboud queue here.

Approaches

I've created some tests to see how available solutions perform and I get very strange results here. I tested 4 implementations (source code provided below) based on:

  1. pipes-concurrency
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. MVar based as described in the book "Parallel and Concurrent Programming in Haskell" Please note that this technique gives us bounded queues of capacity 1 - it's used only for tests

Tests

Here is the source code used for testing:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum

You can compile it with ghc -O2 Main.hs and just run it. The tests create 20 message producers, each producing 1000000 messages.

Results

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 μs   (558.4 μs .. 906.1 μs)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 μs   (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 μs   (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 μs   (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)

Question

I would love to ask you why the pipes-concurrent version performs so slow and why it is so much slower than even the chan based. I'm very surprised, that the MVar one is the fastest from all the versions - could anybody tell more, why we get this results and if can we do better in any case?

Adelladella answered 14/1, 2015 at 0:23 Comment(8)
How many OS threads are you giving it?Laritalariviere
Yes please, did you compile with -threaded and run with -N? And how many cores do you have in your machine?Oberammergau
I tried both - with and without -threaded and running with +RTS -N8. The results are funny - the pipes version behaves yet much slower (whole example goes from 1s to 4s!), Chan is lot slower also and unagi is faster about 20%. I've got 8 cores here (modern i7)Adelladella
Seems like TChan version performs better than MVar when threaded (as it should!): gist.github.com/phadej/ca603306992cee39ce9dChallenging
The MVar version behaviour is different from chans. Writing in chan is non-blocking (assuming there is space in the buffer), where writing in-non-empty MVar is blocking. In your benchmark writes and reads are aligned nicely, so MVar isn't bottlenecking anything.Challenging
The benchmark for pipes is wrong because it pointlessly introduces 20 calls for major gc into each sample pipes program.Murcia
@Murcia could you point where the error is?Adelladella
The calls to performGC are not needed in this particular case sprunge.us/JQYP Try test to see if it is doing anything different. (I'm not sure I remember all the relevant bits; I went through this several times.) For more complex cases you would need spawn' in place of spawn, for still others, you would have needed performGC. 'performGC' was nice because it covered all cases, and a case that can no longer be replicated: in its distinctive uses case it doesn't work with recent ghcs anyway.Murcia
O
21

So I can give you a little overview of some of the analysis of Chan and TQueue (which pipes-concurrency is using internally here) that motivated some design decisions that went into unagi-chan. I'm not sure if it will answer your question. I recommend forking different queues and playing with variations while benchmarking to get a real good sense of what is going on.

Chan

Chan looks like:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)

It's a linked-list of MVars. The two MVars in the Chan type act as pointers to the current head and tail of the list, respectively. This is what a write looks like:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]

At 1 the writer takes a lock on the write end, at 2 our item a is made available to the reader, and at 3 the write end is unlocked for other writers.

This actually performs pretty well in a single-consumer/single-producer scenario (see the graph here) because reads and writes don't contend. But once you have multiple concurrent writers you can start having trouble:

  • a writer that hits 1 while another writer is at 2 will block and be descheduled (the fastest I've been able to measure a context switch is ~150ns (pretty darn fast); there are probably situations where it is much slower). So when you get many writers contending you're basically making a big round-trip through the scheduler, onto a wait-queue for the MVar and then finally the write can complete.

  • When a writer gets descheduled (because it timed out) while at 2, it holds onto a lock and no writes will be allowed to complete until it can be rescheduled again; this becomes more of an issue when we're over-subscribed, i.e. when our threads/core ratio is high.

Finally, using an MVar-per-item requires some overhead in terms of allocation, and more importantly when we accumulate many mutable objects we can cause a lot of GC pressure.

TQueue

TQueue is great because STM makes it super simple to reason about its correctness. It's a functional dequeue-style queue, and a write consists of simply reading the writer stack, consing our element, and writing it back:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory

If after a writeTQueue writes its new stack back, another interleaved write does the same, one of the writes will be retried. As more writeTQueues are interleaved the effect of contention is worsened. However performance degrades much more slowly than in Chan because there is only a single writeTVar operation that can void competing writeTQueues, and the transaction is very small (just a read and a (:)).

A read works by "dequeuing" the stack from the write side, reversing it, and storing the reversed stack in its own variable for easy "popping" (altogether this gives us amortized O(1) push and pop)

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z

Readers have a symmetrical moderate contention issue to writers. In the general case readers and writers don't contend, however when the reader stack is depleted readers are contending with other readers and writers. I suspect if you pre-loaded a TQueue with enough values and then launched 4 readers and 4 writers you might be able to induce livelock as the reverse struggled to complete before the next write. It's also interesting to note that unlike with MVar, a write to a TVar on which many readers are waiting wakes them all simultabeously (this might be more or less efficient, depending on the scenario).

I suspect you don't see much of the weaknesses of TQueue in your test; primarily you're seeing the moderate effects of write contention and the overhead of a lot of allocating and GC'ing a lot of mutable objects.

unagi-chan

unagi-chan was designed firstly to handle contention well. It's conceptually very simple, but the implementation has some complexities

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)

Read and write sides of the queue share the Stream on which they coordinate passing values (from writer to reader) and indications of blocking (from reader to writer), and read and write sides each have an independent atomic counter. A write works like:

  1. a writer calls the atomic incrCounter on the write counter to receive its unique index on which to coordinate with its (single) reader

  2. the writer finds its cell and performs a CAS of Written a

  3. if successful it exits, else it sees that a reader has beat it and is blocking (or proceeding to block), so it does a (\Blocking v)-> putMVar v a) and exits.

A read works in a similar and obvious way.

The first innovation is to make the point of contention an atomic operation which doesn't degrade under contention (as a CAS/retry loop or a Chan-like lock would). Based on simple benchmarking and experimenting, the fetch-and-add primop, exposed by the atomic-primops library works best.

Then in 2 both reader and writer need perform only one compare-and-swap (the fast path for the reader is a simple non-atomic read) to finish coordination.

So to try to make unagi-chan good, we

  • use fetch-and-add to handle the point of contention

  • use lockfree techniques such that when we're oversubscribed a thread being descheduled at inopportune times doesn't block progress for other threads (a blocked writer may block at most the reader "assigned" to it by the counter; read caveats re. async exceptions in unagi-chan docs, and note that Chan has nicer semantics here)

  • use an array to store our elements, which has better locality (but see below) lower overhead per element and puts less pressure on the GC

A final note re. using an array: concurrent writes to an array are generally a bad idea for scaling because you cause a lot of cache-coherence traffic as cachelines are invalidated back and forth across your writer threads. The general term is "false sharing". But there are also upsides cache-wise and downsides to alternative designs that I can think of that would stripe writes or something; I've been experimenting with this a bit but don't have anything conclusive at the moment.

One place where we legitimately are concerned with false sharing is in our counter, which we align and pad to 64 bytes; this did indeed show up in benchmarks, and the only downside is the increased memory usage.

Oberammergau answered 14/1, 2015 at 18:0 Comment(3)
Thanks for the detail answer. As a more general concern, is there a good implementation of a fast logger library for Haskell used in production ? By logging I do mean the same as @Danilo (not some trace messages that you might want to enable in development)Gaea
@pierreR - we are using internally a fast logging library in our company and we are right now developing it second version. It is just in finish stage, is very extensible and allows for multi-threaded logging. You should expect it will be released opensource on hackage in a day or two. I'll write here about it :)Adelladella
@PierreR: here it is! a fast, & extensible logging library for you! hackage.haskell.org/package/logger (or on github: github.com/wdanilo/haskell-logger)Adelladella
L
5

If I had to guess why pipes-concurrency perform more poorly, it's because every read and write is wrapped in an STM transaction, whereas the other libraries use more efficient low-level concurrency primitives.

Lishalishe answered 14/1, 2015 at 5:9 Comment(5)
So is there a reason for that, or can we expect a fix?Laritalariviere
Ugh, the performance difference is huge. If I change some values (100 producers, 10000 consumers, the unagi performs in 0.64s, while pipes-concurrent in about 70s!, this is over 100 times slower)Adelladella
The API is unlikely to change for pipes-concurrency. The library emphasizes ease of use and correctness over performance.Lishalishe
In this particular case it seems the trouble is all with performGC as the results I put up suggest.Murcia
@WojciechDanilo It is now clear to me that the correct statement is something like "Unagi-Chan is about 4 times as fast as pipes-concurrency"Murcia

© 2022 - 2024 — McMap. All rights reserved.