NIO Pipe throws "Broken Pipe"when I write to the Sink for no reason! How to debug?
Asked Answered
Q

1

0

I believe I have done everything correctly. I create a pipe, pass the sink to a writer thread, register the source on my selector with OP_READ, start my selector. Everything works but as soon as I write something to the sink I get a broken pipe exception. Why !!!??? There is no broken pipe here. I am frustrated. How can I debug / understand what is happening here? Does anyone have a simple pipe example that I can run to test if this is working. A thread writing on the sink and the selector reading it.

EDIT: I pretty much followed the suggestion here. It is hard to find concrete examples of NIO pipes in the Internet.

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class SystemOutPipe extends Thread {

  public static void main(String[] args)
  {
    try {
      SystemOutPipe sop = new SystemOutPipe();
      sop.start();
      System.out.println("This message should be redirected to System.err\nNow waiting 5 seconds ...");
      Thread.sleep(5000L);
      sop.setStopped(true);
      sop.join();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private Selector selector;
  private Pipe pipe;
  private boolean stopped = false;

  public SystemOutPipe() throws IOException {
    super("SystemOutPipe");
    pipe = Pipe.open();
    System.setOut(new PrintStream(new PipeOutputStream(pipe)));
    selector = Selector.open();
    pipe.source().configureBlocking(false);
    pipe.source().register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
  }

  @Override
  public void run() {
    try {
      while (!isStopped()) {
        int n = selector.select(1L);
        if (n > 0) {
          Iterator<SelectionKey> it = selector.selectedKeys().iterator();
          while (it.hasNext()) {
            SelectionKey key = it.next();
            it.remove();
            if (key.isReadable()) {
              new ReadHandler(key).run();
            }
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace(); // writes to System.err !
    }
  }

  public synchronized boolean isStopped() {
    return stopped;
  }

  public synchronized void setStopped(final boolean stopped) {
    this.stopped = stopped;
  }

  public class ReadHandler implements Runnable {
    private final SelectionKey key;

    public ReadHandler(final SelectionKey key) {
      this.key = key;
    }

    @Override
    public void run() {
      ByteBuffer bbuf = (ByteBuffer) key.attachment();
      ReadableByteChannel channel = (ReadableByteChannel) key.channel();
      try
      {
        int count = 0;
        do {
          bbuf.clear();
          count = channel.read(bbuf);
          if (count > 0) System.err.write(bbuf.array(), 0, count);
        } while(count > 0);
      } catch (IOException e) {
        e.printStackTrace();
        key.cancel();
      }
    }
  }

  public class PipeOutputStream extends OutputStream {
    private final Pipe pipe;

    public PipeOutputStream(final Pipe pipe) {
      this.pipe = pipe;
    }

    @Override
    public void write(final int b) throws IOException {
      write(new byte[] { (byte) b });
    }

    @Override
    public void write(final byte[] b) throws IOException {
      write(b, 0, b.length);
    }

    @Override
    public void write(final byte[] b, final int off, final int len) throws IOException {
      ByteBuffer bbuf = ByteBuffer.wrap(b, off, len);
      bbuf.position(len);
      bbuf.flip();
      int count = 0;
      while (count < len) {
        int n = pipe.sink().write(bbuf);
        if (n == 0) {
          // let's wait a bit and not consume cpu
          try {
            Thread.sleep(1L);
          } catch (InterruptedException e) {
            throw new IOException(e);
          }
        }
        else count += n;
      }
    }
  }
}

EXCEPTION:

java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcher.write0(Native Method)
    at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72)
    at sun.nio.ch.IOUtil.write(IOUtil.java:43)
    at sun.nio.ch.SinkChannelImpl.write(SinkChannelImpl.java:149)
    at com.niostuff.util.GCLogInterceptor.fileModified(GCLogInterceptor.java:180)
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$WatchData.notifyFileModified(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux.notifyChangeEvent(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$1.notify(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotify_linux.callbackProcessEvent(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotify_linux.nativeNotifyLoop(Native Method)
    at net.contentobjects.jnotify.linux.JNotify_linux.access$000(Unknown Source)
    at net.contentobjects.jnotify.linux.JNotify_linux$1.run(Unknown Source)
Quintic answered 25/3, 2012 at 4:18 Comment(10)
How do you expect any help without posting your code?Nodus
@JimGarrison Sorry but the code is huge with many ramifications. That's the problem, but the basic operation as I described is there. A broken pipe is a terrible error because it tells you nothing about the problem. I guess a good reason to stay away pipes and use non-blocking queues.Quintic
@Quintic - if the code is too large, cut it down into something smaller that still has the problem.Jaws
@StephenC I pretty much followed what was described here: #9853274Quintic
You mention passing the "sink" to a writer thread. Is that a typo for a reader thread? If not, then of course a write into the pipe will get an error if there are no readers.Relativistic
@Relativistic This is correct, Jim. The SinkChannel is passed to the WRITER thread and the source is passed to the READER thread, in our case the NIO selector. Why did you think it was wrong, Jim? You can read about it here: docs.oracle.com/javase/1.5.0/docs/api/java/nio/channels/…Quintic
Is the example code supposed to be throwing the exception, because it runs just fine for me (using OpenJDK Runtime Environment (IcedTea6 1.9.13) (6b20-1.9.13-0ubuntu1~10.10.1) OpenJDK 64-Bit Server VM (build 19.0-b09, mixed mode) )Buttermilk
@Buttermilk Can you double check that pipe source is not being selected continuously on a OP_READ and returning 0 bytes read when you try to read it. That was my problem here! Is it a bug or a "feature" of pipes? The pipe source must only be selected when there is stuff to read.Quintic
Check if this sounds correct, as I'm not 100% sure: I placed System.err.println("New ReadHandler.run()"); inside the if (key.isReadable())-block in SystemOutPipe.run(), and another err-printout System.err.println("End of ReadHandler run()"); at the end of ReadHandler.run. The output is: New ReadHandler.run() This message should be redirected to System.err Now waiting 5 seconds ... End of ReadHandler run(). I also checked using debugger, and it seems ReadHandler.run is called only once (prints out the stuff written into System.out in the main and then gets count 0 and exits).Buttermilk
@Buttermilk So my problem is, my pipe source channel is CONTINUOUSLY selected but read returns 0 bytes read which is an ERROR condition. Not sure why this is happening here. :( :( :( Any ideas?Quintic
Q
0

Ok, so I found the problem. First thanks for everyone trying to help. Hopefully you will learn from my mistake. The chain of events were:

1 - I was not draining the receiving buffer (the one the source channel reads into) and it eventually got full.

2 - Now that it is full, pipeSourceChannel.read(readBuffer) returns 0 bytes. There is data to be read but It can't read on a full buffer.

3 - That caused the channel to be closed (i was doing that myself on bytesRead == 0) and the BrokenPipe.

One lesson I learned here: PIPES are tricky. I would think that non-blocking concurrent queues are much simpler to use like this guy here once mentioned: Java NIO Pipe vs BlockingQueue

Quintic answered 25/3, 2012 at 18:48 Comment(4)
Sounds like I was debugging the wrong parts all the time =D. Good to hear you got it solved, I'll delete my answer so any future visitors in this question won't get confused.Buttermilk
That's not a problem with NIO pipes specifically, it is just incorrect usage of NIO in general. You always have to compact or clear the read buffer.Unfaithful
@EJP I agree. But I think non-blocking queues are simpler to implement with the same results. Not sure about performance comparison between them, but you can check what I did here: https://mcmap.net/q/993400/-java-nio-pipe-vs-blockingqueueQuintic
I agree. I've used pipes exactly once since 1997, and I took them out for a Queue. I really cannot see the point.Unfaithful

© 2022 - 2024 — McMap. All rights reserved.