The Problem
Hello! I'm writing a logging library and I would love to create a logger, that would run in separate thread, while all applications threads would just send messages to it. I want to find the most performant solution for this problem. I need simple unboud queue here.
Approaches
I've created some tests to see how available solutions perform and I get very strange results here. I tested 4 implementations (source code provided below) based on:
- pipes-concurrency
- Control.Concurrent.Chan
- Control.Concurrent.Chan.Unagi
- MVar based as described in the book "Parallel and Concurrent Programming in Haskell" Please note that this technique gives us bounded queues of capacity 1 - it's used only for tests
Tests
Here is the source code used for testing:
{-# LANGUAGE NoMonomorphismRestriction #-}
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)
import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main
data Event = Msg String | Status | Quit deriving (Show)
----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------
pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg
pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
Pipes.performGC
pipesHandler max = loop 0
where
loop mnum = do
if mnum == max
then lift $ pure ()
else do event <- await
case event of
Msg _ -> loop (mnum + 1)
Status -> (lift $ putStrLn (show mnum)) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------
chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max
----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------
uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max
----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------
mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max
----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------
handlerIO f max = loop 0 where
loop mnum = do
if mnum == max
then pure ()
else do event <- f
case event of
Msg _ -> loop (mnum + 1)
Status -> putStrLn (show mnum) *> loop mnum
Quit -> return ()
----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------
main = defaultMain [
bench "pipes" $ nfIO $ do
(output, input) <- Pipes.spawn Pipes.Unbounded
replicateM_ prodNum (pipesAddProducer msgNum output)
runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
, bench "Chan" $ nfIO $ do
ch <- newChan
replicateM_ prodNum (chanAddProducer msgNum ch)
chanHandler ch totalMsg
, bench "Unagi-Chan" $ nfIO $ do
(inCh, outCh) <- U.newChan
replicateM_ prodNum (uchanAddProducer msgNum inCh)
uchanHandler outCh totalMsg
, bench "MVar" $ nfIO $ do
m <- newEmptyMVar
replicateM_ prodNum (mvarAddProducer msgNum m)
mvarHandler m totalMsg
]
where
prodNum = 20
msgNum = 1000
totalMsg = msgNum * prodNum
You can compile it with ghc -O2 Main.hs
and just run it.
The tests create 20 message producers, each producing 1000000 messages.
Results
benchmarking pipes
time 46.68 ms (46.19 ms .. 47.31 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 47.59 ms (47.20 ms .. 47.95 ms)
std dev 708.3 μs (558.4 μs .. 906.1 μs)
benchmarking Chan
time 4.252 ms (4.171 ms .. 4.351 ms)
0.995 R² (0.991 R² .. 0.998 R²)
mean 4.233 ms (4.154 ms .. 4.314 ms)
std dev 244.8 μs (186.3 μs .. 333.5 μs)
variance introduced by outliers: 35% (moderately inflated)
benchmarking Unagi-Chan
time 1.209 ms (1.198 ms .. 1.224 ms)
0.996 R² (0.993 R² .. 0.999 R²)
mean 1.267 ms (1.244 ms .. 1.308 ms)
std dev 102.4 μs (61.70 μs .. 169.3 μs)
variance introduced by outliers: 62% (severely inflated)
benchmarking MVar
time 1.746 ms (1.714 ms .. 1.774 ms)
0.997 R² (0.995 R² .. 0.998 R²)
mean 1.716 ms (1.694 ms .. 1.739 ms)
std dev 73.99 μs (65.32 μs .. 85.48 μs)
variance introduced by outliers: 29% (moderately inflated)
Question
I would love to ask you why the pipes-concurrent version performs so slow and why it is so much slower than even the chan based. I'm very surprised, that the MVar one is the fastest from all the versions - could anybody tell more, why we get this results and if can we do better in any case?
-threaded
and run with-N
? And how many cores do you have in your machine? – Oberammergau-threaded
and running with+RTS -N8
. The results are funny - the pipes version behaves yet much slower (whole example goes from 1s to 4s!), Chan is lot slower also andunagi
is faster about 20%. I've got 8 cores here (modern i7) – AdelladellaTChan
version performs better thanMVar
when threaded (as it should!): gist.github.com/phadej/ca603306992cee39ce9d – ChallengingMVar
version behaviour is different from chans. Writing in chan is non-blocking (assuming there is space in the buffer), where writing in-non-emptyMVar
is blocking. In your benchmark writes and reads are aligned nicely, soMVar
isn't bottlenecking anything. – ChallengingperformGC
are not needed in this particular case sprunge.us/JQYP Trytest
to see if it is doing anything different. (I'm not sure I remember all the relevant bits; I went through this several times.) For more complex cases you would needspawn'
in place ofspawn
, for still others, you would have neededperformGC
. 'performGC' was nice because it covered all cases, and a case that can no longer be replicated: in its distinctive uses case it doesn't work with recent ghcs anyway. – Murcia