I believe I have done everything correctly. I create a pipe, pass the sink to a writer thread, register the source on my selector with OP_READ, start my selector. Everything works but as soon as I write something to the sink I get a broken pipe exception. Why !!!??? There is no broken pipe here. I am frustrated. How can I debug / understand what is happening here? Does anyone have a simple pipe example that I can run to test if this is working. A thread writing on the sink and the selector reading it.
EDIT: I pretty much followed the suggestion here. It is hard to find concrete examples of NIO pipes in the Internet.
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class SystemOutPipe extends Thread {
public static void main(String[] args)
try {
SystemOutPipe sop = new SystemOutPipe();
System.out.println("This message should be redirected to System.err\nNow waiting 5 seconds ...");
} catch (Exception e) {
private Selector selector;
private Pipe pipe;
private boolean stopped = false;
public SystemOutPipe() throws IOException {
pipe = Pipe.open();
System.setOut(new PrintStream(new PipeOutputStream(pipe)));
selector = Selector.open();
pipe.source().register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
public void run() {
try {
while (!isStopped()) {
int n = selector.select(1L);
if (n > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isReadable()) {
new ReadHandler(key).run();
} catch (Exception e) {
e.printStackTrace(); // writes to System.err !
public synchronized boolean isStopped() {
return stopped;
public synchronized void setStopped(final boolean stopped) {
this.stopped = stopped;
public class ReadHandler implements Runnable {
private final SelectionKey key;
public ReadHandler(final SelectionKey key) {
this.key = key;
public void run() {
ByteBuffer bbuf = (ByteBuffer) key.attachment();
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
int count = 0;
do {
count = channel.read(bbuf);
if (count > 0) System.err.write(bbuf.array(), 0, count);
} while(count > 0);
} catch (IOException e) {
public class PipeOutputStream extends OutputStream {
private final Pipe pipe;
public PipeOutputStream(final Pipe pipe) {
this.pipe = pipe;
public void write(final int b) throws IOException {
write(new byte[] { (byte) b });
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
public void write(final byte[] b, final int off, final int len) throws IOException {
ByteBuffer bbuf = ByteBuffer.wrap(b, off, len);
int count = 0;
while (count < len) {
int n = pipe.sink().write(bbuf);
if (n == 0) {
// let's wait a bit and not consume cpu
try {
} catch (InterruptedException e) {
throw new IOException(e);
else count += n;
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcher.write0(Native Method)
at sun.nio.ch.FileDispatcher.write(FileDispatcher.java:39)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:72)
at sun.nio.ch.IOUtil.write(IOUtil.java:43)
at sun.nio.ch.SinkChannelImpl.write(SinkChannelImpl.java:149)
at com.niostuff.util.GCLogInterceptor.fileModified(GCLogInterceptor.java:180)
at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$WatchData.notifyFileModified(Unknown Source)
at net.contentobjects.jnotify.linux.JNotifyAdapterLinux.notifyChangeEvent(Unknown Source)
at net.contentobjects.jnotify.linux.JNotifyAdapterLinux$1.notify(Unknown Source)
at net.contentobjects.jnotify.linux.JNotify_linux.callbackProcessEvent(Unknown Source)
at net.contentobjects.jnotify.linux.JNotify_linux.nativeNotifyLoop(Native Method)
at net.contentobjects.jnotify.linux.JNotify_linux.access$000(Unknown Source)
at net.contentobjects.jnotify.linux.JNotify_linux$1.run(Unknown Source)