How can I optimize parallel sorting to improve temporal performance?
Asked Answered
R

1

9

I have an algorithm for parallel sorting a list of a given length:

import Control.Parallel (par, pseq)
import Data.Time.Clock (diffUTCTime, getCurrentTime)
import System.Environment (getArgs)
import System.Random (StdGen, getStdGen, randoms)


parSort :: (Ord a) => [a] -> [a]
parSort (x:xs)    = force greater `par` (force lesser `pseq`
                                         (lesser ++ x:greater))
    where lesser  = parSort [y | y <- xs, y <  x]
          greater = parSort [y | y <- xs, y >= x]
parSort _         = []

sort :: (Ord a) => [a] -> [a]
sort (x:xs) = lesser ++ x:greater
    where lesser  = sort [y | y <- xs, y <  x]
          greater = sort [y | y <- xs, y >= x]
sort _ = []

parSort2 :: (Ord a) => Int -> [a] -> [a]
parSort2 d list@(x:xs)
  | d <= 0     = sort list
  | otherwise = force greater `par` (force lesser `pseq`
                                     (lesser ++ x:greater))
      where lesser      = parSort2 d' [y | y <- xs, y <  x]
            greater     = parSort2 d' [y | y <- xs, y >= x]
            d' = d - 1
parSort2 _ _              = []

force :: [a] -> ()
force xs = go xs `pseq` ()
    where go (_:xs) = go xs
          go [] = 1


randomInts :: Int -> StdGen -> [Int]
randomInts k g = let result = take k (randoms g)
                 in force result `seq` result

testFunction = parSort

main = do
  args <- getArgs
  let count | null args = 500000
            | otherwise = read (head args)
  input <- randomInts count `fmap` getStdGen
  start <- getCurrentTime
  let sorted = testFunction input
  putStrLn $ "Sort list N = " ++ show (length sorted)
  end <- getCurrentTime
  putStrLn $ show (end `diffUTCTime` start) 

I want to get the time to perform parallel sorting on 2, 3 and 4 processor cores less than 1 core. At the moment, this result I can not achieve. Here are my program launches:

1. SortList +RTS -N1 -RTS 10000000
time = 41.2 s
2.SortList +RTS -N3 -RTS 10000000
time = 39.55 s
3.SortList +RTS -N4 -RTS 10000000
time = 54.2 s

What can I do?

Update 1:

testFunction = parSort2 60
Rampageous answered 23/4, 2019 at 12:0 Comment(17)
Closely related: https://mcmap.net/q/867417/-is-it-possible-to-speed-up-a-quicksort-with-par-in-haskell/745903Penneypenni
force is not doing what you think it's doing: it only forces the spine of the list it's given, not the contents. For random number generation, forcing the contents is what actually does the work. Additionally, force result `seq` result doesn't do what you might hope it does. Try putting evaluate (last input) in main before you start timing and you'll immediately see the difference. I also recommend using mkStdGen 0 (or some other constant) instead of getStdGen to make your timings more consistent. (Neither of these address your actual problem, though.)Leaf
As for your actual problem: you're probably creating waaaay too many sparks, and the spark-tracking overhead is dominating the runtime. You might try a simpler strategy: split the list into some small number of chunks, say, 60, or perhaps the number passed to -N, sorting each in parallel, then merging the results. Haven't tested it myself, so I won't write this as an answer as I can't be sure it's right, but I would bet that will behave better.Leaf
@DanielWagner I second that. "Fine grained parallelism" almost always implies "waaay too much overhead". The trick is to divide work into reasonable chunks such that messaging and synchronization overhead doesn't kill you. And for sorting specifically, the best parallel algorithms involve merge sort.Nonna
One of the problems with parallelizing quicksort is that it does not build balanced sublists. You might get a better result from parallelizing mergesort, which balances on the way in and sorts on the way out.Hispaniola
@DanielWagner I think all of that should be properly discussed in an answer. I added a bounty over on the old question.Penneypenni
@ReinHenrichs yeah, quicksort isn't great for parallelisation, but it should still be possible to get some performance boost even despite bad balancing, especially with a completely random-number list.Penneypenni
@Penneypenni Yeah, there are ofc issues here that prevent it from parallelizing properly at all.Hispaniola
@DanielWagner, Thank you for your comments. I followed your advice and rewrote the parSort function on parSort2. Did you mean it?Rampageous
You can use Data.Map.Internal (maps representing bags) to sort in parallel. Break up the list into chunks. Use fromListWith in parallel to convert each into a map. Then take unions in pairs. You'll have to parallelize the union algorithm (in the obvious way) in order to use all capabilities in each round of merges. I offer no performance guarantees, but see Blelloch et al, "Just Join for Parallel Ordered Sets".Strangle
@Rampageous No. parSort2 60 will spawn up to 2^60 sparks, so you're almost certainly still spawning way too many sparks.Leaf
All sorting algorithms can be parallelized. Just chunk up the unordered array into thread many chunks and get them sorted in parallel. Once you have all sorted chunks merge them two by two on new parallel jobs up until you are left with only two chunks. Then merge them in the current thread. The best base sorting algorithm available is Data.List.sort so it's better to use that instead o an ineficient sort algorithm to start with.Ami
@Redu, if you're starting and ending with Haskell lists, then there's exactly one linear-time part that's fundamentally sequential: breaking up the list into chunks. As far as I can tell, it's possible to fully parallelize everything else. Your algorithm, however, adds a sequential merge, and reduces the number of capabilities used as the merging process proceeds and the merges get more expensive. That doesn't sound like a great way to parallelize to me, but I'm no expert.Strangle
@Redu, ah, I guess you can use a binary search to parallelize the merges, much like the tree sorting algorithm.Strangle
@Strangle Suppose you have 64 threads at hand. Make 64 chunks from your list and assign one chunk per thread to be sorted. Once you have 64 sorted chunks then merge them 2 by 2 (yes in a binary tree style so that each chunk takes the merging process minimum times) in a separate thread. Initially you will need 32 threads and then 16 of them and so on up until you are left with 2 chunks which you merge in the main thread. This should be eficient but the magnificent Haskell list sort algorithm (which isnt standard mergesort as we know it) might even render this inefficient for not very big lists.Ami
@Redu, what I'm suggesting is that the merges be parallelized too. When merging xs and ys, break xs into k roughly equal portions xs_1 to xs_k. Use binary searches in ys to break it up into corresponding (uneven) pieces ys_1 to ys_k, so that elements of xs_i are never greater than those of ys_{i+1} and vice versa. Then the corresponding pieces can be merged, in separate threads, into a fresh array. As the number of pieces to merge goes down, the number of threads used for each can go up, to keep using the same number of capabilities.Strangle
@Strangle Yeah well hmm.. Mergins jobs are already being done in parallel but you are left with increasing number of free threads to utilize for parallel mergers though.. I can not make sure about the efficiency on those binary searches to further simplifying the merging task to xs_i ++ ys_i+? even if they are done concurrently. But... interesting.Ami
S
2

Here's one idea you can play around with, using Data.Map. For simplicity and performance, I assume substitutivity for the element type, so we can count occurrences rather than storing lists of elements. I'm confident that you can get better results using some fancy array algorithm, but this is simple and (essentially) functional.

When writing a parallel algorithm, we want to minimize the amount of work that must be done sequentially. When sorting a list, there's one thing that we really can't avoid doing sequentially: splitting up the list into pieces for multiple threads to work on. We'd like to get that done with as little effort as possible, and then try to work mostly in parallel from then on.

Let's start with a simple sequential algorithm.

{-# language BangPatterns, TupleSections #-}
import qualified Data.Map.Strict as M
import Data.Map (Map)
import Data.List
import Control.Parallel.Strategies

type Bag a = Map a Int

ssort :: Ord a => [a] -> [a]
ssort xs =
  let m = M.fromListWith (+) $ (,1) <$> xs
  in concat [replicate c x | (x,c) <- M.toList m]

How can we parallelize this? First, let's break up the list into pieces. There are various ways to do this, none of them great. Assuming a small number of capabilities, I think it's reasonable to let each of them walk the list itself. Feel free to experiment with other approaches.

-- | Every Nth element, including the first
everyNth :: Int -> [a] -> [a]
everyNth n | n <= 0 = error "What you doing?"
everyNth n = go 0 where
  go !_ [] = []
  go 0 (x : xs) = x : go (n - 1) xs
  go k (_ : xs) = go (k - 1) xs

-- | Divide up a list into N pieces fairly. Walking each list in the
-- result will walk the original list.
splatter :: Int -> [a] -> [[a]]
splatter n = map (everyNth n) . take n . tails

Now that we have pieces of list, we spark threads to convert them to bags.

parMakeBags :: Ord a => [[a]] -> Eval [Bag a]
parMakeBags xs = 
  traverse (rpar . M.fromListWith (+)) $ map (,1) <$> xs

Now we can repeatedly merge pairs of bags until we have just one.

parMergeBags_ :: Ord a => [Bag a] -> Eval (Bag a)
parMergeBags_ [] = pure M.empty
parMergeBags_ [t] = pure t
parMergeBags_ q = parMergeBags_ =<< go q where
  go [] = pure []
  go [t] = pure [t]
  go (t1:t2:ts) = (:) <$> rpar (M.unionWith (+) t1 t2) <*> go ts

But ... there's a problem. In each round of merges, we use only half as many capabilities as we did in the previous one, and perform the final merge with just one capability. Ouch! To fix this, we'll need to parallelize unionWith. Fortunately, this is easy!

import Data.Map.Internal (Map (..), splitLookup, link)

parUnionWith
  :: Ord k
  => (v -> v -> v)
  -> Int -- Number of threads to spark
  -> Map k v
  -> Map k v
  -> Eval (Map k v)
parUnionWith f n t1 t2 | n <= 1 = rseq $ M.unionWith f t1 t2
parUnionWith _ !_ Tip t2 = rseq t2
parUnionWith _ !_ t1 Tip = rseq t1
parUnionWith f n (Bin _ k1 x1 l1 r1) t2 = case splitLookup k1 t2 of
  (l2, mb, r2) -> do
    l1l2 <- parEval $ parUnionWith f (n `quot` 2) l1 l2
    r1r2 <- parUnionWith f (n `quot` 2) r1 r2
    case mb of
      Nothing -> rseq $ link k1 x1 l1l2 r1r2
      Just x2 -> rseq $ link k1 fx1x2 l1l2 r1r2
        where !fx1x2 = f x1 x2

Now we can fully parallelize bag merging:

-- Uses the given number of capabilities per merge, initially,
-- doubling for each round.
parMergeBags :: Ord a => Int -> [Bag a] -> Eval (Bag a)
parMergeBags !_ [] = pure M.empty
parMergeBags !_ [t] = pure t
parMergeBags n q = parMergeBags (n * 2) =<< go q where
  go [] = pure []
  go [t] = pure [t]
  go (t1:t2:ts) = (:) <$> parEval (parUnionWith (+) n t1 t2) <*> go ts

We can then implement a parallel merge like this:

parMerge :: Ord a => [[a]] -> Eval [a]
parMerge xs = do
  bags <- parMakeBags xs
  -- Why 2 and not one? We only have half as many
  -- pairs as we have lists (capabilities we want to use)
  -- so we double up.
  m <- parMergeBags 2 bags
  pure $ concat [replicate c x | (x,c) <- M.toList m]

Putting the pieces together,

parSort :: Ord a => Int -> [a] -> Eval [a]
parSort n = parMerge . splatter n

pSort :: Ord a => Int -> [a] -> [a]
pSort n = runEval . parMerge . splatter n

There's just one sequential piece remaining that we can parallelize: converting the final bag to a list. Is it worth parallelizing? I'm pretty sure that in practice it is not. But let's do it anyway, just for fun! To avoid considerable extra complexity, I'll assume that there aren't large numbers of equal elements; repeated elements in the result will lead to some work (thunks) remaining in the result list.

We'll need a basic partial list spine forcer:

-- | Force the first n conses of a list
walkList :: Int -> [a] -> ()
walkList n _ | n <= 0 = ()
walkList _ [] = ()
walkList n (_:xs) = walkList (n - 1) xs

And now we can convert the bag to a list in parallel chunks without paying for concatenation:

-- | Use up to the given number of threads to convert a bag
-- to a list, appending the final list argument.
parToListPlus :: Int -> Bag k -> [k] -> Eval [k]
parToListPlus n m lst | n <= 1 = do
  rseq (walkList (M.size m) res)
  pure res
  -- Note: the concat and ++ should fuse away when compiling with
  -- optimization.
  where res = concat [replicate c x | (x,c) <- M.toList m] ++ lst
parToListPlus _ Tip lst = pure lst
parToListPlus n (Bin _ x c l r) lst = do
  r' <- parEval $ parToListPlus (n `quot` 2) r lst
  res <- parToListPlus (n `quot` 2) l $ replicate c x ++ r'
  rseq r' -- make sure the right side is finished
  pure res

And then we modify the merger accordingly:

parMerge :: Ord a => Int -> [[a]] -> Eval [a]
parMerge n xs = do
  bags <- parMakeBags xs
  m <- parMergeBags 2 bags
  parToListPlus n m []
Strangle answered 26/4, 2019 at 7:1 Comment(3)
Can we have some benchmark to see if this has actually lead to an increase in performance from parellelisation?Thorner
@DanRobertson, I did a bit of informal benchmarking and it does seem to work (pSort 2 is quite a bit faster than pSort 1 on a list like [1,-1,2,-2,...,2*10^6]), but I just have a medium-wimpy laptop ill-suited to that sort of thing. Threadscope seems to suggest a pretty well-balanced load, but I've never used it before so I can't guarantee I'm interpreting the results correctly.Strangle
@DanRobertson, it seems to scale well to at least 4 cores with pseudorandom input. I suspect there's more tweaking to be done (it might be better to use the finer parallelism controls available through monad-par than to rely on the built-in spark pool, for example), but the general approach seems to be good.Strangle

© 2022 - 2024 — McMap. All rights reserved.