Why doesn't more Java code use PipedInputStream / PipedOutputStream?
Asked Answered
A

9

66

I've discovered this idiom recently, and I am wondering if there is something I am missing. I've never seen it used. Nearly all Java code I've worked with in the wild favors slurping data into a string or buffer, rather than something like this example (using HttpClient and XML APIs for example):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

That code uses a Unix-piping style technique to prevent multiple copies of the XML data being kept in memory. It uses the HTTP Post output stream and the DOM Load/Save API to serialize an XML Document as the content of the HTTP request. As far as I can tell it minimizes the use of memory with very little extra code (just the few lines for Runnable, PipedInputStream, and PipedOutputStream).

So, what's wrong with this idiom? If there's nothing wrong with this idiom, why haven't I seen it?

EDIT: to clarify, PipedInputStream and PipedOutputStream replace the boilerplate buffer-by-buffer copy that shows up everywhere, and they also allow you to process incoming data concurrently with writing out the processed data. They don't use OS pipes.

Argenteuil answered 27/1, 2009 at 16:35 Comment(5)
I saw your code and made an example for JAXB xml (pastebin.com/zsWR8Dgx). The code just seems fragile to me. It would be good to see a solid example of this.Amerce
Try Pipe4j: code.google.com/p/pipe4jBridesmaid
For what it's worth, I do use this all the time and it's not fragile in any way. You do have to store the exceptions thrown in the Runnable object somewhere and rethrow them if you want more meaningful tracebacks. It's still way better than coding to explicit buffers.Argenteuil
What would be perfect is if Java could chain Process objects together, thus serving as a glue language for non-java programs communicating via input/output streams. Unfortunately it's not supported. https://mcmap.net/q/75998/-building-a-process-pipe-with-processbuilder-in-java-7/714112Explorer
@SridharSarnobat In JDK 11, you can now launch a single .java text file as a script with a shebang line at the top. So you can do traditional piping to and from a java program via the *NIX shell: openjdk.java.net/jeps/330#%22Shebang%22-files Not super useful, but at least some progress.Interstratify
I
57

From the Javadocs:

Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread.

This may partially explain why it is not more commonly used.

I'd assume another reason is that many developers do not understand its purpose / benefit.

Intercostal answered 27/1, 2009 at 17:6 Comment(6)
Sadly concurrency is overused where it isn't needed, and underused where it was needed... oops! :)Stuffed
@iny, I'd argue that most developers aren't writing concurrent code. Maybe it's running in a concurrent environment, but I think that it is a minority of developers who deal every day with multithreading (and this is probably a good thing)Intercostal
"many developers do not understand it's purpose / benefit" probably those developers who have not previously used Unix,and therefore have not been exposed to the usefulness of the pips-and-filters design pattern.Wheels
@JohnGardner agreed. All Java/JVM developers should read Java Concurrency In Practice (the bullet train book). It is the best written book on this critical topic (Doug Lea would agree) and it explains many of the problems due to underspecified concurrency attributes in most java code - including the JDK. It helps you solve these problems and also understand what to decide on and declare in your own APIs.Morganmorgana
To add to this, a thread requires additional memory and introduces context switching. It's only worth it when you get real benefit from streaming. Handling 1kb of data this way would be a step backwards.Odor
I wrote a properly behaving replacement because I couldn't deal with the stupidity of the JDK version. See github.com/archiecobbs/dellroad-stuff/blob/master/…Grote
A
7

In your example you're creating two threads to do the work that could be done by one. And introducing I/O delays into the mix.

Do you have a better example? Or did I just answer your question.


To pull some of the comments (at least my view of them) into the main response:

  • Concurrency introduces complexity into an application. Instead of dealing with a single linear flow of data, you now have to be concerned about sequencing of independent data flows. In some cases, the added complexity may be justified, particularly if you can leverage multiple cores/CPUs to do CPU-intensive work.
  • If you are in a situation where you can benefit from concurrent operations, there's usually a better way to coordinate the flow of data between threads. For example, passing objects between threads using a concurrent queue, rather than wrapping the piped streams in object streams.
  • Where a piped stream may be a good solution is when you have multiple threads performing text processing, a la a Unix pipeline (eg: grep | sort).

In the specific example, the piped stream allows use of an existing RequestEntity implementation class provided by HttpClient. I believe that a better solution is to create a new implementation class, as below, because the example is ultimately a sequential operation that cannot benefit from the complexity and overhead of a concurrent implementation. While I show the RequestEntity as an anonymous class, reusability would indicate that it should be a first-class class.

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});
Alvera answered 27/1, 2009 at 16:48 Comment(18)
That's an example I have handy. What IO delays are being introduced? PipedInputStreams and PipedOutputStreams are memory buffers.Argenteuil
They may be memory buffers, but they use the underlying pipe implementation, which is a kernel I/O operation.Alvera
Not according to the source they don't.Argenteuil
As for your example: I haven't used HttpClient, but I would expect an alternate method to get access to the request body as an OutputStream. Perhaps not, although are you sure that PostMethod doesn't buffer its content in memory (in which case you don't gain anything)Alvera
PostMethod can buffer or not, depending on whether the method has been configured to chunk the enclosed entity. By default it chunks when the content length is not set. It'd be more helpful if you assumed I had already read the APIs and source in question when you answer.Argenteuil
Re Java piped streams: I learned something, and am somewhat disappointed. I always assumed that those classes used the pipe(2) syscall.Alvera
Sorry, it's not by default but I elided post.setContentChunked(true); in my setup. This is not really relevant to my question though, it's only details about the specific example.Argenteuil
Sorry to offend, but you asked the question "why isn't this more common," not "in this particular case, is there a reason this technique isn't used." And in the general case, you're creating a second thread to handle a sequential operation.Alvera
It's not a sequential operation. You can serialize the XML at the same time you write it to the output stream.Argenteuil
Actually, in this case it is a sequential operation. One thread is writing the XML to a stream, the other thread is writing a stream to a stream. In this specific case, there's one unnecessary stream. That's not to say that the technique is not useful in some cases (to be contd)Alvera
The case in which piping from one thread to another is useful is when there is significant text-level processing that will happen in each stage (ie, something similar to a Unix pipeline). In that case, you can (1) logically partition the operations, and (2) benefit from multi-core architectures.Alvera
However, unless you're doing text processing, there's probably a more efficient way to represent and pass the data (eg, ConcurrentLinkedQueue using defined objects).Alvera
This "unnecessary stream" is for using the HttpClient API, which requires an InputStream for request entities.Argenteuil
There is significant text-level processing that will happen in the InputStreamRequestEntity -- namely chunking.Argenteuil
Why don't you do "text-level processing" in a FilterWriter or a FilterOutputStream ?Harvison
That code is part of the HttpClient API, which requires an InputStream.Argenteuil
@kdgregory: your code appears to be an unnecessary class. Why is an unnecessary class preferable to concurrency?Argenteuil
1 - because concurrency increases complexity, and 2 - because it is a piece of reusable functionality (one that should probably get back into HttpClient)Alvera
M
7

I too only discovered the PipedInputStream/PipedOutputStream classes recently.

I am developing an Eclipse plug-in that needs to execute commands on a remote server via SSH. I am using JSch and the Channel API reads from an input stream and writes to an output stream. But I need to feed commands through the input stream and read the responses from an output stream. Thats where PipedInput/OutputStream comes in.

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();
Mutant answered 27/1, 2009 at 17:48 Comment(1)
javadoc said piped streams could get deadlock on one thread???? (which sucks as I want to use something exactly like this with no extra thread).....does that actually work or do you get deadlock?Poppas
M
4

Also, back to the original example: no, it does not exactly minimize memory usage either. DOM tree(s) get built, in-memory buffering done -- while that is better than full byte array replicas, it's not that much better. But buffering in this case will be slower; and an extra thread is also created -- you can not use PipedInput/OutputStream pair from within a single thread.

Sometimes PipedXxxStreams are useful, but the reason they are not used more is because quite often they are not the right solution. They are ok for inter-thread communication, and that's where I have used them for what that's worth. It's just that there aren't that many use cases for this, given how SOA pushes most such boundaries to be between services, instead of between threads.

Myrica answered 27/1, 2009 at 20:50 Comment(0)
I
4

Here's a use case where pipes make sense:

Suppose you have a third party lib, such as an xslt mapper or crypto lib that has an interface like this: doSomething(inputStream, outputStream). And you do not want to buffer the result before sending over the wire. Apache and other clients disallow direct access to the wire outputstream. Closest you can get is obtaining the outputstream - at an offset, after headers are written - in a request entity object. But since this is under the hood, it's still not enough to pass an inputstream and outputstream to the third party lib. Pipes are a good solution to this problem.

Incidentally, I wrote an inversion of Apache's HTTP Client API [PipedApacheClientOutputStream] which provides an OutputStream interface for HTTP POST using Apache Commons HTTP Client 4.3.4. This is an example where Piped Streams might make sense.

Irrigation answered 11/2, 2016 at 1:21 Comment(0)
G
2

I tried using these classes a while back for something, I forget the details. But I did discover that their implementation is fatally flawed. I can't remember what it was but I have a sneaky memory that it may have been a race condition which meant that they occasionally deadlocked (And yes, of course I was using them in separately threads: they simply aren't usable in a single thread and weren't designed to be).

I might have a look at their source code andsee if I can see what the problem might have been.

Greaten answered 27/1, 2009 at 20:27 Comment(1)
I found that both ends need to be closed.Argenteuil
F
1

java.io pipes have too much context switching (per byte read/write) and their java.nio counterpart requires you to have some NIO background and proper usage of channels and stuff, this is my own implementation of pipes using a blocking queue which for a single producer/consumer will perform fast and scale well:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
Forgetmenot answered 19/10, 2013 at 12:28 Comment(0)
S
0

So, what's wrong with this idiom? If there's nothing wrong with this idiom, why haven't I seen it?

EDIT: to clarify, PipedInputStream and PipedOutputStream replace the boilerplate buffer-by-buffer copy that shows up everywhere, and they also allow you to process incoming data concurrently with writing out the processed data. They don't use OS pipes.

You have stated what it does but haven't stated why you are doing this.

If you believe that this will either reduce resources used (cpu/memory) or improve performance then it won't do either. However it will make your code more complex.

Basically you have a solution without a problem for which it solves.

Suter answered 27/1, 2009 at 20:2 Comment(8)
In this particular case you are right -- there is another way to code it that avoids unbounded memory consumption. In the general case, however, it requires less code to avoid unbounded memory consumption than would equivalent buffer copying code.Argenteuil
What do you see as an "unbounded memory consumption" I have been developing networking solutions for trading system for six years and I have never come across this problem.Suter
Do your trading systems handle single messages with gigs of payload without running out of space? If so then they have bounded memory consumption; otherwise they have unbounded memory consumption. (Not that I would expect trading systems to do anything but reject messages over a certain size, but believe it or not that's not the case in every domain.)Argenteuil
It is true that trading messages are typically small as latency is important. They can add up fairly quickly and we end up with 10s of gigs of data in memory. However, I am not sure how this is relevant. The solution posted will not help you deal with very large messages as far as I can see, in fact instead of having one copy of the message passed around you will end up with two copies (as the writer cannot complete serialization of a large message and discard the original until the reader has almost read/rebuilt the copy)Suter
The reader can be streaming to the server as far as I can tell, with content-encoding: chunked. There doesn't need to be a second copy constructed (in this process, anyway).Argenteuil
@stevenhuwig - re "Do your trading systems handle single messages with gigs of payload without running out of space?" I think this is a non issue as long as the client code can get a handle on an inputstream that is outside of memory eg FileInputStream.Irrigation
Here's a use case. Suppose you have a third party lib, such as an xslt mapper or crypto lib that has an interface like this: doSomething(inputStream, outputStream). And you do not want to buffer the result before sending over the wire. Apache and other clients disallow direct access to the wire outputstream. Closest you can get is obtaining the outputstream - at an offset, after headers are written - in a request entity object. But since this is under the hood, it's still not enough to pass an inputstream and outputstream to the third party lib. Pipes are a good solution to this problem.Irrigation
@RobertChristian good point, if you have to use an API which is not really fit for purpose, you need to use the API you have. +1Suter
I
0

PipedInputStream and PipeOutputStream will sleep its thread for 1 second whenever they are blocking waiting for the other side to read or write out of the full or empty buffer. Do not use.

Interstratify answered 10/4, 2020 at 12:32 Comment(1)
Comments are not for extended discussion; this conversation has been moved to chat.Iow

© 2022 - 2024 — McMap. All rights reserved.