Handling errors with clojure core.async pipeline
Asked Answered
F

2

7

I am trying to understand what's the correct way to handle errors using core.async/pipeline, my pipeline is the following:

input     --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out

Where xf-run-computation will do an http calls and return response. However some of these responses will return an error. What's the best way to handles these errors? My solution is to split the outputs channels in success-values and error-values and then merge them back to a channel:

(let [[success-values1 error-values1] (split fn-to-split first-out)
      [success-values2 error-values2] (split fn-to-split last-out)
      errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out  xf-run-computation success-values1)
[last-out errors])

So my function will return the last results and the errors.

Filings answered 28/12, 2016 at 17:20 Comment(3)
Thanks that is a mistake it should be: errors (merge [error-values error-values2]Filings
Your question is still a bit confused. You put all of first-out into the 2nd stage of processing, which includes both successful & unsuccessful results. Shouldn't the 2nd stage input be only success-values1?Listel
I put first-out as an output channel so whenever xf-run-computation publish some result the predicate fn-to-split will be evaluated and publish the result into success-values1 or error-values1Filings
P
7

Generally speaking, what is "the" correct way is probably depending on your application needs, but given your problem description, I think there are three things you need to consider:

  1. xf-run-computation returns data that your business logic would see as errors,
  2. xf-run-computation throws an exception and
  3. given that http calls are involved, some runs of xf-run-computation might never finish (or not finish in time).

Regarding point 3., the first thing you should consider is using pipeline-blocking instead of pipeline.

I think your question is mostly related to point 1. The basic idea is that the result of xf-run-computation needs to return a data structure (say a map or a record), which clearly marks a result as an error or a success, e.g. {:title nil :body nil :status "error"}. This will give you some options of dealing with the situation:

  • all your later code simply ignores input data which has :status "error". I.e., your xf-run-computation would contain a line like (when (not (= (:status input) "error")) (run-computation input)),

  • you could run a filter on all results between the pipeline-calls and filter them as needed (note that filter can also be used as a transducer in a pipeline, thereby obliterating the old filter> and filter< functions of core.async),

  • you use async/split like you suggested / Alan Thompson shows in his answer to to filter out the error values to a separate error channel. There is no real need to have a second error channel for your second pipeline if you're going to merge the values anyway, you can simply re-use your error channel.

For point 2., the problem is that any exception in xf-run-computation is happening in another thread and will not simply propagate back to your calling code. But you can make use of the ex-handler argument to pipeline (and pipeline-blocking). You could either simply filter out all exceptions, put the result on a separate exception channel or try to catch them and turn them into errors (potentially putting them back on the result or another error channel) -- the latter only makes sense, if the exception gives you enough information, e.g. an id or something that allows to tie back the exception to the input which caused the exception. You could arrange for this in xf-run-computation (i.e. catch any exception thrown from a third-party library like the http call).

For point 3, the canonical answer in core.async would be to point to a timeout channel, but this doesn't make much sense in relation to pipeline. A better idea is to ensure on your http calls that a timeout is set, e.g. the :timeout option of http-kit or :socket-timeout and :conn-timeout of clj-http. Note that these options will usually result in an exception on timeout.

Philanthropy answered 3/1, 2017 at 17:14 Comment(2)
To be fair I don't have any application needs but just me trying to find out the best way to how to propagate errors - indicate failures. I will try to use a map/record instead of splitting the channeł also regarding point 2 and 3 is definitelly something that needs to be added! Thanks!Filings
i will add to point 3 - you can add retry mechanism (diehard?) for any network hickups. after some retries it throws the exception to your pipeline ex-handlerBolin
L
2

Here is an example that does what you are suggesting. Beginning with (range 10) it first filters out the multiples of 5, then the multiples of 3.

(ns tst.clj.core
  (:use clj.core
        clojure.test )
  (:require
    [clojure.core.async :as async]
    [clojure.string :as str]
  )
)

(defn err-3 [x]
  "'fail' for multiples of 3"
  (if (zero? (mod x 3))
    (+ x 300)       ; error case
    x))             ; non-error

(defn err-5 [x]
  "'fail' for multiples of 5"
  (if (zero? (mod x 5))
    (+ x 500)       ; error case
    x))             ; non-error

(defn is-ok?
  "Returns true if the value is not 'in error' (>=100)"
  [x]
  (< x 100))

(def ch-0  (async/to-chan (range 10)))
(def ch-1  (async/chan 99))
(def ch-2  (async/chan 99))

(deftest t-2
  (let [
        _                         (async/pipeline 1 ch-1 (map err-5) ch-0)
        [ok-chan-1 fail-chan-1]   (async/split is-ok? ch-1 99 99)
        _                         (async/pipeline 1 ch-2 (map err-3) ok-chan-1)
        [ok-chan-2 fail-chan-2]   (async/split is-ok? ch-2 99 99)

        ok-vec-2                  (async/<!! (async/into [] ok-chan-2))
        fail-vec-1                (async/<!! (async/into [] fail-chan-1))
        fail-vec-2                (async/<!! (async/into [] fail-chan-2))
  ]
    (is (= ok-vec-2 [1 2 4 7 8]))
    (is (= fail-vec-1 [500 505]))
    (is (= fail-vec-2 [303 306 309]))))

Rather than return the errors, I would probably just log them as soon as they are detected and then forget about them.

Listel answered 2/1, 2017 at 2:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.