Java NIO Pipe vs BlockingQueue
Asked Answered
A

4

11

I just discovered that just has an NIO facility, Java NIO Pipe that's designed for passing data between threads. Is there any advantage of using this mechanism over the more conventional message passing over a queue, such as an ArrayBlockingQueue?

Artel answered 26/9, 2011 at 16:36 Comment(3)
Pipes go through the kernel, rarely useful as the Selector has wakeup... that's implemented via pipe on linux...Rudolph
@Rudolph care to elaborate? you can register pipes with selectors to receive notifications, what's the issue?Willemstad
@raffian, to put it simply - you cant really use Pipes for IPC and within the process there are a lot more efficient way to pass information.Rudolph
E
7

Usually the simplest way to pass data for another thread to process is to use an ExecutorService. This wraps up both a queue and a thread pool (can have one thread)

You can use a Pipe when you have a library which supports NIO channels. It is also useful if you want to pass ByteBuffers of data between threads.

Otherwise its usually simple/faster to use a ArrayBlockingQueue.

If you want a faster way to exchange data between threads I suggest you look at the Exchanger however it is not as general purpose as an ArrayBlockingQueue.

The Exchanger and GC-less Java

Effloresce answered 26/9, 2011 at 16:42 Comment(4)
Thanks, I never considered the fact that using the Exchanger minimizes GC overhead. However, the downside of the exchanger is that it's synchronous. Usually you just want to pump data into another thread without having to wait for it to be picked up.Artel
A pipe is a fixed size. The problem is the same if the producer is producing to fast it has to stop. If the producer is never filling the buffer before the consumer finishes it doesn't have to stop (in either case)Effloresce
Pipes are used to implemented Selector.wakeup, beyond that they are not very useful, as memory only solutions are more effective and don't go through the kernel.Rudolph
@Artel Technically Exchanger isn't really synchronous. It can be used to implement a double buffered solution. Each side of the exchange can work on their buffer in parallel, but they do block at the exchange point. If either the producer or the consumer is always faster then Exchanger is most likely the better option. If the producer and consumer performance vary then using a queue may allow the other end to continue and catch-up and effectively even out latencies. In the end it is best to base these kinds of choices on empirical data rather than guessing the optimal solution.Churning
C
3

I believe a NIO Pipe was designed so that you can send data to a channel inside the selector loop in a thread safe way, in other words, any thread can write to the pipe and the data will be handled in the other extreme of the pipe, inside the selector loop. When you write to a pipe you make the channel in the other side readable.

Caty answered 26/9, 2011 at 17:37 Comment(2)
I wonder about the performance characteristics of passing data between threads using a selector loop over a simple queue poll. Also, passing data through a pipe seems to carry the inconvenience of having to pass bytes rather than objects to the other threads. In other words it forces you to develop a wire protocol for inter-thread data exchange.Artel
You mean over a ConcurrentLinkedQueue, right? That's a great question. I bet my chips on the ConcurrentLinkedQueue. :) But one advantage I see of pipes is: you send a message like everybody else is doing, in other words, you read from a channel instead of fetching an object frmo the queue.Caty
C
3

So after having a lot of trouble with pipe (check here) I decided to favor non-blocking concurrent queues over NIO pipes. So I did some benchmarks on Java's ConcurrentLinkedQueue. See below:

public static void main(String[] args) throws Exception {

    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

    // first test nothing:

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";

        for (int i = 0; i < 1000000; i++) {
            bench.mark();
            // s = queue.poll();
            bench.measure();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // first test empty queue:

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";

        for (int i = 0; i < 1000000; i++) {
            bench.mark();
            s = queue.poll();
            bench.measure();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // now test polling one element on a queue with size one

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";
        String x = "pela";

        for (int i = 0; i < 1000000; i++) {
            queue.offer(x);
            bench.mark();
            s = queue.poll();
            bench.measure();
            if (s != x) throw new Exception("bad!");
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // now test polling one element on a queue with size two

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";
        String x = "pela";

        for (int i = 0; i < 1000000; i++) {
            queue.offer(x);
            queue.offer(x);
            bench.mark();
            s = queue.poll();
            bench.measure();
            if (s != x) throw new Exception("bad!");
            queue.poll();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }
}

The results:

totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos)

Conclusion:

The maxTime can be scary but I think it is safe to conclude we are in the 50 nanos range for polling a concurrent queue.

Caty answered 25/3, 2012 at 19:1 Comment(0)
B
1

I suppose the pipe will have better latency as it could very likely be implemented with coroutines behind the scenes. Thus, the producer immediately yields to the consumer when data is available, not when the thread scheduler decides.

Pipes in general represent a consumer-producer problem and are very likely to be implemented this way so that both threads cooperate and are not preempted externally.

Beecham answered 26/9, 2011 at 16:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.