How to best shut down a clojure core.async pipeline of processes
Asked Answered
H

3

9

I have a clojure processing app that is a pipeline of channels. Each processing step does its computations asynchronously (ie. makes a http request using http-kit or something), and puts it result on the output channel. This way the next step can read from that channel and do its computation.

My main function looks like this

(defn -main [args]
 (-> file/tmp-dir
  (schedule/scheduler)
  (search/searcher)
  (process/resultprocessor)
  (buy/buyer)
  (report/reporter)))

Currently, the scheduler step drives the pipeline (it hasn't got an input channel), and provides the chain with workload.

When I run this in the REPL:

(-main "some args")

It basically runs forever due to the infinity of the scheduler. What is the best way to change this architecture such that I can shut down the whole system from the REPL? Does closing each channel means the system terminates?

Would some broadcast channel help?

Haveman answered 10/10, 2015 at 7:55 Comment(1)
That also kills the REPL unfortunately. I'll try the Component approachHaveman
V
7

You could have your scheduler alts! / alts!! on a kill channel and the input channel of your pipeline:

(def kill-channel (async/chan))

(defn scheduler [input output-ch kill-ch]
  (loop []
    (let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
                  :priority true)]
      (if-not (= p kill-ch)
        (recur))))

Putting a value on kill-channel will then terminate the loop.

Technically you could also use output-ch to control the process (puts to closed channels return false), but I normally find explicit kill channels cleaner, at least for top-level pipelines.

To make things simultaneously more elegant and more convenient to use (both at the REPL and in production), you could use Stuart Sierra's component, start the scheduler loop (on a separate thread) and assoc the kill channel on to your component in the component's start method and then close! the kill channel (and thereby terminate the loop) in the component's stop method.

Violist answered 10/10, 2015 at 12:34 Comment(0)
D
5

I would suggest using something like https://github.com/stuartsierra/component to handle system setup. It ensures that you could easily start and stop your system in the REPL. Using that library, you would set it up so that each processing step would be a component, and each component would handle setup and teardown of channels in their start and stop protocols. Also, you could probably create an IStream protocol for each component to implement and have each component depend on components implementing that protocol. It buys you some very easy modularity.

You'd end up with a system that looks like the following:

(component/system-map
 :scheduler (schedule/new-scheduler file/tmp-dir)
 :searcher  (component/using (search/searcher)
                             {:in :scheduler})
 :processor (component/using (process/resultprocessor)
                             {:in :searcher})
 :buyer     (component/using (buy/buyer)
                             {:in :processor})
 :report    (component/using (report/reporter)
                             {:in :buyer}))

One nice thing with this sort of approach is that you could easily add components if they rely on a channel as well. For example, if each component creates its out channel using a tap on an internal mult, you could add a logger for the processor just by a logging component that takes the processor as a dependency.

 :processor (component/using (process/resultprocessor)
                             {:in :searcher})
 :processor-logger (component/using (log/logger)
                                    {:in processor})

I'd recommend watching his talk as well to get an idea of how it works.

Destructible answered 10/10, 2015 at 12:57 Comment(0)
H
1

You should consider using Stuart Sierra's reloaded workflow, which depends on modelling your 'pipeline' elements as components, that way you can model your logical singletons as 'classes', meaning you can control the construction and destruction (start/stop) logic for each one of them.

Hill answered 10/10, 2015 at 13:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.