Get an input stream from an output stream
Asked Answered
P

3

4

I have a component that's given me data in an output stream (ByteArrayOutputStream) and I need to write this into a blob field of a SQL database without creating temp buffers hence the need to get an input stream.

Based on answers here and here I came up the following method to get an input stream from an output stream:

private PipedInputStream getInputStream(ByteArrayOutputStream outputStream) throws InterruptedException
{
    PipedInputStream pipedInStream = new PipedInputStream();
    Thread copyThread = new Thread(new CopyStreamHelper(outputStream, pipedInStream));
    copyThread.start();
    // Wait for copy to complete
    copyThread.join();
    return pipedInStream;
}

class CopyStreamHelper implements Runnable
{
    private ByteArrayOutputStream outStream;
    private PipedInputStream pipedInStream;

    public CopyStreamHelper (ByteArrayOutputStream _outStream, PipedInputStream _pipedInStream)
    {
        outStream = _outStream;
        pipedInStream = _pipedInStream;
    }

    public void run()
    {
        PipedOutputStream pipedOutStream = null;
        try
        {
            // write the original OutputStream to the PipedOutputStream
            pipedOutStream = new PipedOutputStream(pipedInStream);
            outStream.writeTo(pipedOutStream);
        }
        catch (IOException e) 
        {
            // logging and exception handling should go here
        }
        finally
        {
            IOUtils.closeQuietly(pipedOutStream);
        }
    }
}

Please note that the output stream already contains the written data and it can run up to 1-2 MB. However regardless of trying to do this in two separate threads or the same thread I am finding that always PipedInputStream hangs at the following:

Object.wait(long) line: not available [native method]   
PipedInputStream.awaitSpace() line: not available   
Poussette answered 26/3, 2015 at 18:51 Comment(7)
The component should return an InputStream. An OutputStream is a sink and not meant to be read. In this case this is possible because of the special nature of ByteArrayOutputStream. Your goal to avoid temp buffers is lost because the ByteArrayOutputStream already contains the full data when it is passed.Grown
@Sponiro, agreed but I want to prevent creating another buffer just to get an input stream. The said component generates output so I am not clear how it can return an InputStream?Poussette
One simple solution would be to write your data to a file first and open an InputStream for reading. If you really want to use PipedInputStream for a real producer-consumer scenario you want to use two threads. It is perfectly possible but a bit more complicated. Your component would live in one thread and hand out an InputStream (a PipedInputStream really) and another thread would pick that one up and read from it. Your solution above creates a thread and waits for its results which is very different.Grown
@Sponiro, I considered and dropped writing to a file since I think that is unnecessary. What I have here is not a producer-consumer scenario in the usual sense where both can happen asynchronously. The situation here is that the component has finished producing the output and has written the data to an OutputStream. Now I need to get that data and send it off to the DB via a SQL statement. Given this I was trying to find the most efficient way of doing it. From what it appears may be this is not possible - which implies the other similar answers on the linked SO questions don't work?Poussette
@Sponiro, continuing... regarding the need for two threads to get the Piped streams working properly, the code I have shown above is doing that - do you see something wrong with it since it's not working as I expected.Poussette
Yes, you have two threads, main and copyThread. But you are using them wrong. It is impossible to explain proper use of threads in a comment. But the basic problem is that your threads do not work in parallel. Your main thread starts the copy thread - which is ok. But then you wait for the copy thread with join which makes the whole thing non-parallel.Grown
Let us continue this discussion in chat.Poussette
B
2

You are overcomplicating the solution

ByteArrayOutputStream baos = ...;
byte[] data = baos.toByteArray();
return new ByteArrayInputStream(data);
Blackpool answered 26/3, 2015 at 18:58 Comment(2)
The OP says he wants to do it without creating temp buffers.Literacy
I am trying to avoid creating a temp buffer since the data size is quite large (1-2 MB) and there will be many parallel threads running doing this. This was my fallback solution if I can't solve the original problem.Poussette
N
0

I have worked out a very simple demo for use of PipedInput/OutputStream. It may or may not fit your usecase.

A producing class writing into PipedOutputStream:

public class Producer implements Runnable {

    private final PipedOutputStream pipedOutputStream;
    private final PipedInputStream pipedInputStream;

    public Producer() throws IOException {
        this.pipedOutputStream = new PipedOutputStream();
        this.pipedInputStream = new PipedInputStream(pipedOutputStream);
    }

    public PipedInputStream getPipedInputStream() {
        return pipedInputStream;
    }

    @Override
    public void run() {

        try(InputStream inputStream = ByteStreams.limit(new RandomInputStream(), 100000)) {
            // guava copy function
            ByteStreams.copy(inputStream, pipedOutputStream);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedOutputStream.close();
            } catch (IOException e) {
                // no-op
            }
        }
    }

    public static void main(String[] args) throws IOException {

        try {
            Producer producer = new Producer();
            Consumer consumer = new Consumer(producer);

            Thread thread1 = new Thread(producer);
            Thread thread2 = new Thread(consumer);

            thread1.start();
            thread2.start();

            thread1.join();
            thread2.join();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

A consumer just counting the bytes:

public class Consumer implements Runnable {

    private final Producer producer;

    public Consumer(Producer producer) {
        this.producer = producer;
    }

    @Override
    public void run() {

        try (PipedInputStream pipedInputStream = producer.getPipedInputStream()) {

            int counter = 0;
            while (pipedInputStream.read() != -1) {
                counter++;
            }

            System.out.println(counter);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Noblenobleman answered 27/3, 2015 at 11:17 Comment(0)
H
-1

At some level there has to be a buffer. See Connecting an input stream to an outputstream.

My favorite answer from there is from Dean Hiller:

void feedInputToOutput(InputStream in, OutputStream out) {
   IOUtils.copy(in, out);
}

See the api for details

Halicarnassus answered 26/3, 2015 at 20:2 Comment(3)
What I need is exactly the opposite, get an InputStream such that I can read from the OutputStream.Poussette
There is no outputstream to an inputstream. An inputstream is a source; you read things from an inputstream. An outputstream is a sink; you write things to an outputstream. If you are looking for something which you can read from as well as write to perhaps you list or a queue would be more suitable.Halicarnassus
Andreas - I believe Santosh is trying to redirect the contents of an output stream to another input stream. I have the same problem. I get a thumbnailed image as an output stream, and I wish to serve this over HTTP through an input stream.Coefficient

© 2022 - 2024 — McMap. All rights reserved.