Most data processing can be envisioned as a pipeline of components, the output of one feeding into the input of another. A typical processing pipeline is:
reader | handler | writer
As a foil for starting this discussion, let's consider an object-oriented implementation of this pipeline where each segment is an object. The handler
object contains references to both the reader
and writer
objects and has a run
method which looks like:
define handler.run:
while (reader.has_next) {
data = reader.next
output = ...some function of data...
writer.put(output)
}
Schematically the dependencies are:
reader <- handler -> writer
Now suppose I want to interpose a new pipeline segment between the reader and the handler:
reader | tweaker | handler | writer
Again, in this OO implementation, tweaker
would be a wrapper around the reader
object, and the tweaker
methods might look something like (in some pseudo-imperative code):
define tweaker.has_next:
return reader.has_next
define tweaker.next:
value = reader.next
result = ...some function of value...
return result
I'm finding that this is not a very composable abstraction. Some issues are:
tweaker
can only be used on the left hand side ofhandler
, i.e. I can't use the above implementation oftweaker
to form this pipeline:reader | handler | tweaker | writer
I'd like to exploit the associative property of pipelines, so that this pipeline:
reader | handler | writer
could be expressed as:
reader | p
where p
is the pipeline handler | writer
. In this OO implementation I would have to partially instantiate the handler
object
- Somewhat of a restatement of (1), the objects have to know if they "push" or "pull" data.
I'm looking for a framework (not necessarily OO) for creating data processing pipelines which addresses these issues.
I've tagged this with Haskell
and functional programming
because I feel functional programming concepts might be useful here.
As a goal, it would be nice to be able to create a pipeline like this:
handler1
/ \
reader | partition writer
\ /
handler2
For some perspective, Unix shell pipes solves a lot of these problems with the following implementation decisions:
Pipeline components run asynchronously in separate processes
Pipe objects mediate passing data between "pushers" and "pullers"; i.e. they block writers which write data too fast and readers who try to read too fast.
You use special connectors
<
and>
to connect passive components (i.e. files) to the pipeline
I am especially interested in approaches which do not use threading or message-passing among agents. Maybe that's the best way to do this, but I'd like to avoid threading if possible.
Thanks!
Chan
s? I'm not 100% sure I understand what the top-level question is, though... – Testreader >>> partition >>> handler1 *** handler2 >>> writer
, but there will probably be some requirements making it more complicated. – Hausnerpartition
was that it would send input data to one output or the other based on a choice function. – BrandenburgArrowChoice
, the dual of yourpartition
operator (partitioning is easy just usingarr
, but it doesn't do any good if you can't rejoin) is(|||)
. – Shankle