Concurrent writing upon a standard OutputStream
Asked Answered
N

3

6

I'm writing an application which involves writing considerably big chunks of data to an OutputStream (belonging to a Socket). The thing that makes this a bit complicated is that there are usually multiple threads trying to write to the same OutputStream. Currently, I have it designed so that the OutputStream to which data is being written to is in its own thread. The thread contains a queue (LinkedList) which polls byte arrays and writes them as soon as possible.

private class OutputStreamWriter implements Runnable {

    private final LinkedList<byte[]> chunkQueue = new LinkedList<byte[]>();

    public void run() {
        OutputStream outputStream = User.this.outputStream;
        while (true) {
            try {
                if (chunkQueue.isEmpty()) {
                    Thread.sleep(100);
                    continue;
                }
                outputStream.write(chunkQueue.poll());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

The problem with this design is that as more and more writes occur, more and more data queues up and it is not getting written any faster. Initially, when data is put into the queue, it is written practically immediately. Then after about 15 seconds or so, the data begins to lag behind; a delay develops from the time the data is queued to the time the data is actually written. As time goes on, this delay becomes longer and longer. It is very noticeable.

A way to fix this would be some sort of ConcurrentOutputStream implementation that allows data to be sent without blocking so that writes do not start to get backed up (heck, the queue would be unnecessary then). I don't know if there is such an implementation -- I have been unable to find one -- and personally I don't think it's even possible to write one.

So, does anybody have any suggestions of how I can re-design this?

Nonunion answered 13/11, 2012 at 5:5 Comment(2)
That's not very constructive. What's wrong with it?Nonunion
As an aside, are you synchronizing modifications to your linkedlist? Because it is not thread safe by design. Also, what kind of output stream are you layering on top of the socket output, and how much data are you pushing through it?Jonell
M
4

The throughput of the socket is limited; if it's slower than your data generation throughput, data must be buffered up, there's no way around that. Writing "concurrently" won't help at all.

You may consider pausing data generation when queued data exceeds certain limit, to reduce memory consumption.

Milone answered 13/11, 2012 at 5:27 Comment(2)
I'm just throwing things at the wall here, but, what about a SocketChannel?Nonunion
I don't think it'll help. The bottleneck is the network bandwidth.Milone
O
1

I needed a filter to intercept slow connections where I need to close DB connections ASAP so I initially used Java pipes but when looked closer at their implementation, it is all synchronized so I ended up creating my own QueueInputStream using a small buffer and Blocking queue to put the buffer in the queue once was full, it is lock free except when for the lock conditions used at LinkedBlockingQueue which with the aid of the small buffer it should be cheap, this class is only intended to be used for a single producer and consumer per instance and you should pass an ExecutorService to start streaming your queued bytes to the final OutputStream:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
Overnice answered 18/10, 2013 at 17:50 Comment(0)
P
0

I agree with @irreputable that concurrent writing won't help in the slightest. Instead you should be looking at the producing side, i.e. at what you already have.

  1. Use a BlockingQueue instead of a LinkedList.

  2. Use the queue's blocking poll operation, rather than just a blind sleep for 100msl, which by definition will be wasting 50% of the time on average. Over a long period that could really add up.

Proximo answered 13/11, 2012 at 7:46 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.