Generally speaking, what is "the" correct way is probably depending on your application needs, but given your problem description, I think there are three things you need to consider:
xf-run-computation
returns data that your business logic would see as errors,
xf-run-computation
throws an exception and
- given that http calls are involved, some runs of
xf-run-computation
might never finish (or not finish in time).
Regarding point 3., the first thing you should consider is using pipeline-blocking
instead of pipeline
.
I think your question is mostly related to point 1. The basic idea is that the result of xf-run-computation
needs to return a data structure (say a map or a record), which clearly marks a result as an error or a success, e.g. {:title nil :body nil :status "error"}
. This will give you some options of dealing with the situation:
all your later code simply ignores input data which has :status "error"
. I.e., your xf-run-computation
would contain a line like (when (not (= (:status input) "error")) (run-computation input))
,
you could run a filter on all results between the pipeline
-calls and filter
them as needed (note that filter
can also be used as a transducer in a pipeline, thereby obliterating the old filter>
and filter<
functions of core.async),
you use async/split
like you suggested / Alan Thompson shows in his answer to to filter out the error values to a separate error channel. There is no real need to have a second error channel for your second pipeline if you're going to merge the values anyway, you can simply re-use your error channel.
For point 2., the problem is that any exception in xf-run-computation
is happening in another thread and will not simply propagate back to your calling code. But you can make use of the ex-handler
argument to pipeline
(and pipeline-blocking
). You could either simply filter out all exceptions, put the result on a separate exception channel or try to catch them and turn them into errors (potentially putting them back on the result or another error channel) -- the latter only makes sense, if the exception gives you enough information, e.g. an id or something that allows to tie back the exception to the input which caused the exception. You could arrange for this in xf-run-computation
(i.e. catch
any exception thrown from a third-party library like the http call).
For point 3, the canonical answer in core.async would be to point to a timeout
channel, but this doesn't make much sense in relation to pipeline
. A better idea is to ensure on your http calls that a timeout is set, e.g. the :timeout
option of http-kit or :socket-timeout
and :conn-timeout
of clj-http. Note that these options will usually result in an exception on timeout.
errors (merge [error-values error-values2]
– Filingsfirst-out
into the 2nd stage of processing, which includes both successful & unsuccessful results. Shouldn't the 2nd stage input be onlysuccess-values1
? – Listelfirst-out
as an output channel so wheneverxf-run-computation
publish some result the predicatefn-to-split
will be evaluated and publish the result intosuccess-values1
orerror-values1
– Filings