frameworks for representing data processing as a pipeline
Asked Answered
B

5

18

Most data processing can be envisioned as a pipeline of components, the output of one feeding into the input of another. A typical processing pipeline is:

reader | handler | writer

As a foil for starting this discussion, let's consider an object-oriented implementation of this pipeline where each segment is an object. The handler object contains references to both the reader and writer objects and has a run method which looks like:

define handler.run:
  while (reader.has_next) {
    data = reader.next
    output = ...some function of data...
    writer.put(output)
  }

Schematically the dependencies are:

reader <- handler -> writer

Now suppose I want to interpose a new pipeline segment between the reader and the handler:

reader | tweaker | handler | writer

Again, in this OO implementation, tweaker would be a wrapper around the reader object, and the tweaker methods might look something like (in some pseudo-imperative code):

define tweaker.has_next:
  return reader.has_next

define tweaker.next:
  value = reader.next
  result = ...some function of value...
  return result

I'm finding that this is not a very composable abstraction. Some issues are:

  1. tweaker can only be used on the left hand side of handler, i.e. I can't use the above implementation of tweaker to form this pipeline:

    reader | handler | tweaker | writer

  2. I'd like to exploit the associative property of pipelines, so that this pipeline:

    reader | handler | writer

could be expressed as:

reader | p

where p is the pipeline handler | writer. In this OO implementation I would have to partially instantiate the handler object

  1. Somewhat of a restatement of (1), the objects have to know if they "push" or "pull" data.

I'm looking for a framework (not necessarily OO) for creating data processing pipelines which addresses these issues.

I've tagged this with Haskell and functional programming because I feel functional programming concepts might be useful here.

As a goal, it would be nice to be able to create a pipeline like this:

                     handler1
                   /          \
reader | partition              writer
                   \          /
                     handler2

For some perspective, Unix shell pipes solves a lot of these problems with the following implementation decisions:

  1. Pipeline components run asynchronously in separate processes

  2. Pipe objects mediate passing data between "pushers" and "pullers"; i.e. they block writers which write data too fast and readers who try to read too fast.

  3. You use special connectors < and > to connect passive components (i.e. files) to the pipeline

I am especially interested in approaches which do not use threading or message-passing among agents. Maybe that's the best way to do this, but I'd like to avoid threading if possible.

Thanks!

Brandenburg answered 15/11, 2011 at 20:23 Comment(5)
Take a look at haskell.org/arrowsDiscobolus
Perhaps you'd like to spawn a few threads, one for each reader, tweaker, handler, and writer, and communicate via Chans? I'm not 100% sure I understand what the top-level question is, though...Test
So far, the last diagram looks like reader >>> partition >>> handler1 *** handler2 >>> writer, but there will probably be some requirements making it more complicated.Hausner
If it helps, my idea for partition was that it would send input data to one output or the other based on a choice function.Brandenburg
@user5402, arrows that can do that are instances of ArrowChoice, the dual of your partition operator (partitioning is easy just using arr, but it doesn't do any good if you can't rejoin) is (|||).Shankle
S
22

Yeah, arrows are almost surely your man.

I suspect that you are fairly new to Haskell, just based on the kinds of things you are saying in your question. Arrows will probably seem fairly abstract, especially if what you are looking for is a "framework". I know it took me a while to really grok what was going on with arrows.

So you may look at that page and say "yes, that looks like what I want", and then find yourself rather lost as to how to begin to use arrows to solve the problem. So here is a little bit of guidance so you know what you are looking at.

Arrows will not solve your problem. Instead, they give you a language you can use in which you phrase your problem. You may find that some predefined arrow will do the job -- some kleisli arrow maybe -- but at the end of the day you are going to want to implement an arrow (the predefined ones just give you easy ways to implement them) which expresses what you mean by a "data processor". As a almost trivial example, let's say you want to implement your data processors by simple functions. You would write:

newtype Proc a b = Proc { unProc :: a -> b }

-- I believe Arrow has recently become a subclass of Category, so assuming that.

instance Category Proc where
    id = Proc (\x -> x)
    Proc f . Proc g = Proc (\x -> f (g x))

instance Arrow Proc where
    arr f = Proc f
    first (Proc f) = Proc (\(x,y) -> (f x, y))

This gives you the machinery to use the various arrow combinators (***), (&&&), (>>>), etc., as well as the arrow notation which is rather nice if you are doing complex things. So, as Daniel Fischer points out in the comment, the pipeline you described in your question could be composed as:

reader >>> partition >>> (handler1 *** handler2) >>> writer

But the cool thing is that it is up to you what you mean by a processor. It is possible to implement what you mentioned about each processor forking a thread in a similar way, using a different processor type:

newtype Proc' a b = Proc (Source a -> Sink b -> IO ())

And then implementing the combinators appropriately.

So that is what you are looking at: a vocabulary for talking about composing processes, which has a little bit of code to reuse, but primarily will help guide your thinking as you implement these combinators for the definition of processor that is useful in your domain.

One of my first nontrivial Haskell projects was to implement an arrow for quantum entanglement; that project was the one that caused me to really start to understand the Haskell way of thinking, a major turning point in my programming career. Maybe this project of yours will do the same for you? :-)

Shankle answered 15/11, 2011 at 23:30 Comment(0)
C
7

Thanks to lazy evaluation, we can express pipelines in terms of ordinary function compositions in Haskell. Here an example that computes the maximum length of a line in a file:

main = interact (show . maximum . map length . lines)

Everything in here is an ordinary function, like for instance

lines :: String -> [String]

but thanks to lazy evaluation, these function only process input incrementally and only as much as needed, just as a UNIX pipe would.

Collinsia answered 16/11, 2011 at 10:30 Comment(0)
C
4

The enumerator package for Haskell is a good framework for this. It defines three types of objects:

  1. Enumerators which produce data in chunks.
  2. Iteratees that consume chunks of data and return a value after consuming enough.
  3. Enumeratees which sit in the middle of the pipeline. They consume chunks and produce chunks, possibly with side effects.

These three types of objects are composed into a stream processing pipeline, and you can even have multiple enumerators and iteratees in one pipeline (when one is finished, the next takes its place). It can be complicated to write one of these objects from scratch, but there are a lot of combinators that can be used to turn regular functions into data stream processors. For example, this pipeline reads all the characters from stdin, converts them to upper case with the function toUpper, then writes them to stdout:

ET.enumHandle stdin $$ ET.map toUpper =$ ET.iterHandle stdout

where the module Data.Enumerator.Text has been imported as ET.

Coachwhip answered 16/11, 2011 at 15:33 Comment(1)
There are several enumeration-style packages on Hackage; the OP might be interested in iter-io (hackage.haskell.org/package/iterIO) which is explicitly based on Unix shell pipelines.Antinucleon
I
2

The Yesod framework makes use of a Haskell pipes library in the form of the conduit package.

Interfaith answered 10/10, 2012 at 7:34 Comment(0)
B
0

In Java, Spring Reactor and Apache Camel does what you want, as others have mentioned haskell, let me introduce non-haskell languages.

For example in Apache Camel, data are processed in pipeline fashion, carried by an Exchange(as it is mainly used for message processing so the term), and properties that you need to access during the whole process, you store them into the properties of the Exchange(a Map<String, Object>). The process is a Route, where you can define several sub routes to group some steps into one and give names to it.

For Reactor, it's reactive programming model in Java and handles task on a single thread. The common return types are Mono(a single object) or Flux(a collection of objects) and it uses pipeline fashion.

At first it's not very easy to go from stack based coding to pipeline fashion, but after some time you will find it hard to go back maybe.

Brightwork answered 13/2, 2023 at 22:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.