Clojure core.async for data computation
Asked Answered
R

3

5

I've started using the clojure core.async library. I found the concepts of CSP, channels, go blocks really easy to use. However, I'm not sure if I'm using them right. I've got the following code -

(def x-ch (chan))
(def y-ch (chan))
(def w1-ch (chan))
(def w2-ch (chan))

; they all return matrices
(go (>! x-ch (Mat/* x (map #(/ 1.0 %) (max-fold x)))))
(go (>! y-ch (Mat/* y (map #(/ 1.0 %) (max-fold y)))))
(go (>! w1-ch (gen-matrix 200 300)))
(go (>! w2-ch (gen-matrix 300 100)))

(let [x1 (<!! (go (<! x-ch)))
        y1 (<!! (go (<! y-ch)))
        w1 (<!! (go (<! w1-ch)))
        w2 (<!! (go (<! w2-ch)))]

    ;; do stuff w/ x1 y1 w1 w2
)

I've got predefined (matrix) vectors in symbols x and y. I need to modify both vectors before I use them. Those vectors are pretty large. I also need to generate two random matrices. Since go macro starts the computation asyncronously, I split all four computation tasks into separate go blocks and put the consequent result into channels. Then I've got a let block where I take values from the channels and store them into symbols. They are all using blocking <!! take functions since they're on the main thread.

What I'm trying to do basically is speed up my computation time by splitting program fragments into async processes. Is this the right way to do it?

Ratchet answered 19/10, 2015 at 0:35 Comment(4)
Why do you want to use async code here? Async code is effective when you have to block and wait. Otherwise it does nothing. When you need just to do the math you should compute it directly or in parallel, for example in future.Lavonnelaw
Thanks for that. In my code, I start 4 computations at the same time and block/wait on the values until they return something. How does that differ from async block/wait? However, like nicolas and you pointed out, I'll be using future for that. Can I not use core.reducers inside go blocks? or would that be a horrible idea?Ratchet
You do NOT start them in the same time. Everything is executed sequentially because you have no valued blocking operations in go blocks. I defined function "prime?" to test if number i prime. Look here: "(time (do (count (filter true? (map prime? (range 2 20000))))))" gives 3 seconds and "(time (count (filter true? (map prime? (range 2 20000)))))" gives 3 seconds. You use only one thread and everything is sequential here. Test it!Lavonnelaw
Thanks. Already switched my code to use future.Ratchet
A
3

go blocks return a channel with the result of the expression, so you don't need to create intermediate channels for their results. The code below lets you kick off all 4 calculations at the same time, and then block on the values until they return. If you don't need some of the results straight away, you could block on the value only when you actually use it.

(let [x1-ch (go (Mat/* x (map #(/ 1.0 %) (max-fold x))))
      y1-ch (go (Mat/* y (map #(/ 1.0 %) (max-fold y))))
      w1-ch (go (gen-matrix 200 300))
      w2-ch (go (gen-matrix 300 100))
      x1 (<!! x1-ch)
      y1 (<!! y1-ch)
      w1 (<!! w1-ch)
      w2 (<!! w2-ch)]
  ;; do stuff w/ x1 y1 w1 w2
  )
Aboutship answered 19/10, 2015 at 2:2 Comment(0)
I
5

For this kind of processing, future may be slightly more adequate.

The example from the link is simple to grasp:

 (def f 
   (future 
     (Thread/sleep 10000) 
     (println "done") 
     100))

The processing, the future block is started immediately, so the above does start a thread, wait for 10s and prints "done" when finished.

When you need the value you can just use:

(deref f)
; or @f

Which will block and return the value of the code block of the future.

In the same example, if you call deref before the 10 seconds have gone, the call will block until the computation is finished.

In your example, since you are just waiting for computations to finish, and are not so much concern about messages and interactions between the channel participants future is what I would recommend. So:

 (future 
    (Mat/* x (map #(/ 1.0 %) (max-fold x))))
Inequitable answered 19/10, 2015 at 0:49 Comment(1)
Thanks Nicolas. I'll give that a try. Still, what are your thoughts on my implementation if I had to use core.async? I'm trying to grasp CSP.Ratchet
A
3

go blocks return a channel with the result of the expression, so you don't need to create intermediate channels for their results. The code below lets you kick off all 4 calculations at the same time, and then block on the values until they return. If you don't need some of the results straight away, you could block on the value only when you actually use it.

(let [x1-ch (go (Mat/* x (map #(/ 1.0 %) (max-fold x))))
      y1-ch (go (Mat/* y (map #(/ 1.0 %) (max-fold y))))
      w1-ch (go (gen-matrix 200 300))
      w2-ch (go (gen-matrix 300 100))
      x1 (<!! x1-ch)
      y1 (<!! y1-ch)
      w1 (<!! w1-ch)
      w2 (<!! w2-ch)]
  ;; do stuff w/ x1 y1 w1 w2
  )
Aboutship answered 19/10, 2015 at 2:2 Comment(0)
A
1

If you're looking to speed up your program more generally by running code in parallel, then you could look at using Clojure's Reducers, or Aphyr's Tesser. These work by splitting up the work on a single computation into parallelisable parts, then combining them together. These will efficiently run the work over as many cores as your computer has. If you run each of your computations with a future or in a go block, then each computation will run on a single thread, some may finish before others and those cores will be idle.

Aboutship answered 19/10, 2015 at 3:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.