Terminate a Stream when there is no incoming Data after certain Timeout
Asked Answered
G

2

6

I have an InputStream and OutputStream (there is no socket).

I have a stream-based code that does some mapping/filtering/grouping/processing.

My main goal to terminate the stream if the maxDuration was exceeded:

void fillStreamMap(BufferedReader reader) {
    final Instant end = Instant.now().plusNanos(TimeUnit.NANOSECONDS.convert(maxDuration));

    this.map = reader.lines()
        .takeWhile(e -> checkTimeout(end))
        .map(this::jsonToBuyerEventInput)
        .filter(Objects::nonNull)
        .filter(getFilter()::apply)
        .limit(super.maxEvent)
        .collect(Collectors.groupingBy(BuyerEventInput::getBuyer));
}

boolean checkTimeout(Instant end){
    return Instant.now().getEpochSecond() <= end.getEpochSecond();
}

I'm using takeWhile which is a very useful function, but it checks the termination condition if there is an upcoming event.

So if there is no data sent, it doesn't check the condition because this function is built to take a Predicate as an argument.

Is there any way to accomplish this goal?

Godred answered 5/6, 2022 at 18:58 Comment(3)
There are several things to consider: 1. Because your application is based on the interaction with the console, until the application is running input and output should not be closed. If you close a BufferedRaeder wrapped around System.in you'll not be able to use it anymore. 2. I find out that invoking close() on a valotile reference to the stream (I mean Java 8 Stream) from the different thread has no impact on the stream pipeline execution - it continues to consume and process the console input and doesn't terminate.Kohn
... If we invoke close on the stream-source (meaning BufferedReader) it leads to the issue 1 and the result still would not be achieved - stream pipeline no longer reacts on the console input (since it's closed), but its terminal operation will not be triggered, i.e. collect() doesn't produce the map and method hangs infinitely.Kohn
Take a look into Reactive Programing and Project Reactor, It may be a better tool than Streams for this problem. vinsguru.com/reactor-flux-file-readingDownward
C
2

Here is an approach that operates on Streams. The core function is timedTake(Stream<T> stream, long timeout, TimeUnit unit). The idea is to traverse the original stream using its raw Spliterator, which makes it possible to set a timeout.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Stream;

class Main {
    static <T> Stream<T> generateOrderedStream(Supplier<Optional<T>> s) {
        // Returns an ordered stream with the values of the Optionals returned by s.get(). An empty Optional ends the stream.
        // As pseudocode:
        //     for (Optional<T> o = s.get(); o.isPresent(); o = s.get())
        //         emit o.get();
        return Stream.iterate(s.get(), Optional::isPresent, prev -> s.get())
            .map(Optional::get);
    }

    static <T> Optional<T> advance(Spliterator<T> iter) {
        // Returns an Optional with the next element of the iterator, or an empty Optional if there are no more elements.
        // (This method is much nicer than calling iter.tryAdvance() directly.)
        final var r = new Object() { T elem; };
        return iter.tryAdvance(elem -> r.elem = elem) ? Optional.of(r.elem) : Optional.empty();
    }

    static ThreadFactory daemonThreadFactory() {
        return (r) -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            return thread;
        };
    }

    static <T> Stream<T> timedTake(Stream<T> stream, long timeout, TimeUnit unit) {
        // Traverses the stream until the timeout elapses and returns the traversed elements.
        final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        final ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory());
        final Spliterator<T> iter = stream.spliterator();
        return generateOrderedStream(() -> {
            try {
                Future<Optional<T>> future = executor.submit(() -> advance(iter));
                long remainingNanos = deadlineNanos - System.nanoTime();
                Optional<T> optElem = future.get(remainingNanos, TimeUnit.NANOSECONDS);
                if (!optElem.isPresent()) { // this is the end of the input stream, so clean up
                    executor.shutdownNow();
                }
                return optElem;
            } catch (TimeoutException e) {
                executor.shutdownNow();
                return Optional.empty(); // mark this as the end of the result stream
            } catch (ExecutionException e) {
                executor.shutdownNow();
                throw new RuntimeException(e.getCause());
            } catch (InterruptedException e) {
                executor.shutdownNow();
                throw new RuntimeException(e);
            }
        });
    }

    static void fillStreamMap(BufferedReader reader) {
        // streaming demo
        long maxDurationSecs = 5;
        timedTake(reader.lines(), maxDurationSecs, TimeUnit.SECONDS)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        fillStreamMap(reader);
    }
}

Another approach is to operate at the Reader level, and read with a timeout from the BufferedReader (which presumably wraps System.in). Unfortunately, it's very hard to do this properly (see e.g. Set timeout for user's input, and the article Timeout on Console Input).

One idea from those linked pages is to poll BufferedReader.ready() until it returns true, and then call readLine(). This is ugly (because it uses polling) and unreliable, because readLine() can block even if ready() returned true – for example because an incomplete line is available (on Unix-like systems the user can achieve this by typing some text then pressing Ctrl+D instead of Enter).

Another idea is to create a background thread that repeatedly calls BufferedReader.readLine() and inserts the results into a BlockingQueue (such as ArrayBlockingQueue). Then the main thread can call take() or poll(timeout, unit) on the queue to obtain lines.

A limitation of this approach is that if you later want to read from the BufferedReader directly (as opposed to through the queue), it's pretty much impossible to avoid losing (at least) one line of input. This is because a thread can't be interrupted cleanly when it's blocked on readLine(), so if the main thread decides to stop early (e.g. because of a timeout) it can't prevent the background thread from reading the line it is currently waiting for.

You could try to "unread" the last line using mark(readAheadLimit) and reset(), but synchronization will be difficult – another thread could try to read from the BufferedReader before the background thread calls reset(). You'd probably have to synchronize using the the lock field, however its access level is protected so you'd only be able to access it using reflection or by subclassing BufferedReader. Also, reset() will fail if the line to be unread is longer than readAheadLimit.

Here is an implementation that assumes you only read lines via the queue.

DISCLAIMER: Beware of bugs in these code snippets – multi-threading is tricky. I might try improve the code another time.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

class InterruptibleLineReader {
    private static final String EOF = new String("<EOF>");
    BufferedReader reader;
    ArrayBlockingQueue<String> lines = new ArrayBlockingQueue<>(/* capacity: */ 2);
    Thread backgroundThread;
    IOException exception;

    public InterruptibleLineReader(BufferedReader reader) {
        this.reader = reader;
        // start a background thread to read lines
        backgroundThread = new Thread(this::backgroundTask);
        backgroundThread.setDaemon(true);
        backgroundThread.start();
    }

    public void close() {
        backgroundThread.interrupt();
        lines.clear();
        lines.add(EOF);
    }

    private void backgroundTask() {
        try {
            try {
                while (true) {
                    String line = reader.readLine();
                    if (Thread.interrupted()) {
                        // nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
                        break;
                    } else if (line == null) {
                        lines.put(EOF);
                        break;
                    }
                    lines.put(line);
                }
            } catch (IOException e) {
                exception = e;
                lines.put(EOF);
            }
        } catch (InterruptedException e) {
            // nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
        }
    }

    public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
        String line = lines.poll(timeout, unit);
        if (line == EOF) { // EOF or IOException
            lines.put(EOF); // restore the EOF so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null;
            }
        }
        return line;
    }
}

class Main {
    static <T> Stream<T> generateOrderedStream(Supplier<Optional<T>> s) {
        // Returns an ordered stream with the values of the Optionals returned by s.get(). An empty Optional ends the stream.
        // As pseudocode:
        //     for (Optional<T> o = s.get(); o.isPresent(); o = s.get())
        //         emit o.get();
        return Stream.iterate(s.get(), Optional::isPresent, prev -> s.get())
            .map(Optional::get);
    }

    static Stream<String> timedReadLines(InterruptibleLineReader lineReader, long timeout, TimeUnit unit) {
        // Reads lines until the timeout elapses and returns them as a stream.
        final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        return generateOrderedStream(() -> {
            try {
                long remainingNanos = deadlineNanos - System.nanoTime();
                return Optional.ofNullable(lineReader.readLine(remainingNanos, TimeUnit.NANOSECONDS));
            } catch (IOException|InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    static void fillStreamMap(InterruptibleLineReader lineReader) {
        // streaming demo
        long maxDurationSecs = 5;
        timedReadLines(lineReader, maxDurationSecs, TimeUnit.SECONDS)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        // stream lines
        InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
        fillStreamMap(lineReader);
        lineReader.close();

        /*
        // attempt to use the BufferedReader directly
        // NOTE: several lines may be lost (depending on the capacity of the ArrayBlockingQueue and how quickly the lines are consumed)
        System.out.println("--- reading directly from BufferedReader ---");
        while (true) {
            try {
                String line = reader.readLine();
                if (line == null) { break; }
                System.out.println("[raw] " + line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        */
    }
}

Here is a more sophisticated implementation that only loses one line of input if you close the queue and read directly from the BufferedReader. It uses a custom "0-capacity" queue to ensure that at most one line will be lost.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

class InterruptibleLineReader {
    BufferedReader reader;
    ZeroCapacityBlockingQueue<String> lines = new ZeroCapacityBlockingQueue<>(); // a null line indicates EOF or IOException
    Thread backgroundThread;
    IOException exception;
    boolean eof;

    public InterruptibleLineReader(BufferedReader reader) {
        this.reader = reader;
        // start a background thread to read lines
        backgroundThread = new Thread(this::backgroundTask);
        backgroundThread.setDaemon(true);
        backgroundThread.start();
    }

    private void markAsEOF() {
        eof = true;
        if (lines.poll() != null) { // markAsEOF() should not be called when there are unconsumed lines
            throw new IllegalStateException();
        }
        lines.offer(null); // unblock threads that are waiting on the queue
    }

    public void close() {
        backgroundThread.interrupt();
        // warn if there is an unconsumed line, and consume it so we can indicate EOF
        String line = lines.poll();
        if (line != null) {
            System.err.println("InterruptibleLineReader: warning: discarding unconsumed line during close(): '" + line + "'");
        }
        markAsEOF();
    }

    private void backgroundTask() {
        try {
            while (true) {
                String line = reader.readLine();
                if (Thread.interrupted()) {
                    if (line != null) {
                        System.err.println("InterruptibleLineReader: warning: discarding line that was read after close(): '" + line + "'");
                    }
                    // nothing further to do (close() is responsible for calling markAsEOF() in this case)
                    break;
                } else if (line == null) { // EOF
                    markAsEOF();
                    break;
                }
                lines.put(line); // this blocks until the line has been consumed ("0-capacity" behaviour)
                if (Thread.interrupted()) {
                    // nothing to do (close() is responsible for calling markAsEOF() in this case)
                    break;
                }
            }
        } catch (IOException e) {
            exception = e;
            markAsEOF();
        } catch (InterruptedException e) {
            // nothing to do (close() is responsible for calling markAsEOF() in this case)
        }
    }

    public String readLine() throws IOException, InterruptedException {
        String line = lines.take();
        if (line == null) { // EOF or IOException
            markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null; // EOF
            }
        } else {
            return line;
        }
    }

    public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
        String line = lines.poll(timeout, unit);
        if (line == null && eof) { // EOF or IOException (not timeout)
            markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null; // EOF
            }
        } else {
            return line;
        }
    }
}

class ZeroCapacityBlockingQueue<T> {
    int count;
    T item;

    public synchronized boolean add(T x) {
        // does not block (i.e. behaves as if the capacity is actually 1)
        if (count == 1) {
            throw new IllegalStateException("Queue full");
        }
        item = x;
        count++;
        notifyAll();
        return true;
    }

    public synchronized boolean offer(T x) {
        // does not block (i.e. behaves as if the capacity is actually 1)
        if (count == 1) {
            return false;
        }
        return add(x);
    }

    public synchronized void put(T x) throws InterruptedException {
        // blocks until the item has been removed ("0-capacity" behaviour)
        while (count == 1) {
            wait();
        }
        add(x);
        while (count == 1 && item == x) {
            wait();
        }
    }

    public synchronized T remove() {
        if (count == 0) {
            throw new NoSuchElementException();
        }
        T x = item;
        item = null;
        count--;
        notifyAll();
        return x;
    }

    public synchronized T poll() {
        if (count == 0) {
            return null;
        }
        return remove();
    }

    public synchronized T take() throws InterruptedException {
        while (count == 0) {
            wait();
        }
        return remove();
    }

    public synchronized T poll(long timeout, TimeUnit unit) throws InterruptedException {
        long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        while (count == 0) {
            long remainingNanos = deadlineNanos - System.nanoTime();
            if (remainingNanos <= 0) {
                return null;
            }
            TimeUnit.NANOSECONDS.timedWait(this, remainingNanos);
        }
        return remove();
    }
}

class Main {
    static <T> Stream<T> generateOrderedStream(Supplier<Optional<T>> s) {
        // Returns an ordered stream with the values of the Optionals returned by s.get(). An empty Optional ends the stream.
        // As pseudocode:
        //     for (Optional<T> o = s.get(); o.isPresent(); o = s.get())
        //         emit o.get();
        return Stream.iterate(s.get(), Optional::isPresent, prev -> s.get())
            .map(Optional::get);
    }

    static Stream<String> timedReadLines(InterruptibleLineReader lineReader, long timeout, TimeUnit unit) {
        // Reads lines until the timeout elapses and returns them as a stream.
        final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        return generateOrderedStream(() -> {
            try {
                long remainingNanos = deadlineNanos - System.nanoTime();
                return Optional.ofNullable(lineReader.readLine(remainingNanos, TimeUnit.NANOSECONDS));
            } catch (IOException|InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    static void fillStreamMap(InterruptibleLineReader lineReader) {
        // streaming demo
        long maxDurationSecs = 5;
        timedReadLines(lineReader, maxDurationSecs, TimeUnit.SECONDS)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        // stream lines
        InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
        fillStreamMap(lineReader);
        lineReader.close();

        /*
        // attempt to use the BufferedReader directly
        // NOTE: a line will be lost
        System.out.println("--- reading directly from BufferedReader ---");
        while (true) {
            try {
                String line = reader.readLine();
                if (line == null) { break; }
                System.out.println("[raw] " + line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        */
    }
}

Here is an example run of the second implementation (with the last part of main() uncommented). The timestamps are in seconds and ">" denotes input.

0.06 --- streaming lines using InterruptibleLineReader for 5.0 sec  ---
0.82 > one
0.83 [mapped] one
1.76 > two
1.76 [mapped] two
2.73 > three
2.73 [mapped] three
5.06 --- reading directly from BufferedReader ---
6.93 > four
6.94 InterruptibleLineReader: warning: discarding line that was read after close(): 'four'
7.76 > five
7.76 [raw] five
8.60 > six
8.60 [raw] six

Note how the line "four" was lost. To avoid losing lines, don't use the underlying BufferedReader after the InterruptibleLineReader instance is created.

(If you really need a BufferedReader after that point, you could write a dummy subclass of BufferedReader that wraps InterruptibleLineReader and forwards readLine() calls to it. The other BufferedReader methods, such as read() and mark(), can't be implemented easily.)

Calchas answered 9/6, 2022 at 0:50 Comment(1)
I don't know how to thank you @Tom This is exactly what I was looking for. You didn't just write a solution, you also wrote a couple of ways and topics to explore. I appreciate your effortsGodred
D
2

You can execute your method inside an executor. Assuming a singleThread executor is enough for your case, here is the code:

    public void executeFillStreamMap(BufferedReader reader,long timeout){
         ExecutorService executor = Executors.newSingleThreadExecutor();
         Future<?> result = executor.submit(this::fillStreamMap(reader));
         try {
              result.get(timeout, TimeUnit.NANOSECONDS);
         }catch (Exception e) {
              // handle exceptions
         } finally{
              if(executor!=null){
                   executor.shutdown();
              }
         }
    }

And you won't need takeWhile method and Instant you defined. In case task take more than the defined timeout, get method will interrupt task, thus you can get an InterruptedException which should be handled.

Editor's note

With this method, if a timeout occurs the partial results are not available.

Disbud answered 5/6, 2022 at 22:39 Comment(6)
Can't be precise but looks good enough I think.Monck
Thanks but when I try this it gives me timeout error at the end of the duration and the map is emptyGodred
Yes indeed. This either return the entire output or nothing at all. What about processing BufferReader old style and adding in the while loop also the condition for timeout. Like: while ((thisLine = br.readLine()) != null &&checkTimeout(end)){//logic on line}Disbud
But It will also wait for the next line like takeWhile functionGodred
And I don't want to lose the buffered data so entire or nothing won't work for me I guess :( appreciate your comment thoGodred
So maybe this::jsonToBuyerEventInput or getFilter()::apply is a time consuming task? So either run the consuming task into an executor and keep takeWhile also. Or use while ((thisLine = br.readLine()) != null){//logic on line} and execute logic inside while into executor.This way, every line you add element to map and in case you get to timeout when processing a task you break the while.Disbud
C
2

Here is an approach that operates on Streams. The core function is timedTake(Stream<T> stream, long timeout, TimeUnit unit). The idea is to traverse the original stream using its raw Spliterator, which makes it possible to set a timeout.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Stream;

class Main {
    static <T> Stream<T> generateOrderedStream(Supplier<Optional<T>> s) {
        // Returns an ordered stream with the values of the Optionals returned by s.get(). An empty Optional ends the stream.
        // As pseudocode:
        //     for (Optional<T> o = s.get(); o.isPresent(); o = s.get())
        //         emit o.get();
        return Stream.iterate(s.get(), Optional::isPresent, prev -> s.get())
            .map(Optional::get);
    }

    static <T> Optional<T> advance(Spliterator<T> iter) {
        // Returns an Optional with the next element of the iterator, or an empty Optional if there are no more elements.
        // (This method is much nicer than calling iter.tryAdvance() directly.)
        final var r = new Object() { T elem; };
        return iter.tryAdvance(elem -> r.elem = elem) ? Optional.of(r.elem) : Optional.empty();
    }

    static ThreadFactory daemonThreadFactory() {
        return (r) -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            return thread;
        };
    }

    static <T> Stream<T> timedTake(Stream<T> stream, long timeout, TimeUnit unit) {
        // Traverses the stream until the timeout elapses and returns the traversed elements.
        final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        final ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory());
        final Spliterator<T> iter = stream.spliterator();
        return generateOrderedStream(() -> {
            try {
                Future<Optional<T>> future = executor.submit(() -> advance(iter));
                long remainingNanos = deadlineNanos - System.nanoTime();
                Optional<T> optElem = future.get(remainingNanos, TimeUnit.NANOSECONDS);
                if (!optElem.isPresent()) { // this is the end of the input stream, so clean up
                    executor.shutdownNow();
                }
                return optElem;
            } catch (TimeoutException e) {
                executor.shutdownNow();
                return Optional.empty(); // mark this as the end of the result stream
            } catch (ExecutionException e) {
                executor.shutdownNow();
                throw new RuntimeException(e.getCause());
            } catch (InterruptedException e) {
                executor.shutdownNow();
                throw new RuntimeException(e);
            }
        });
    }

    static void fillStreamMap(BufferedReader reader) {
        // streaming demo
        long maxDurationSecs = 5;
        timedTake(reader.lines(), maxDurationSecs, TimeUnit.SECONDS)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        fillStreamMap(reader);
    }
}

Another approach is to operate at the Reader level, and read with a timeout from the BufferedReader (which presumably wraps System.in). Unfortunately, it's very hard to do this properly (see e.g. Set timeout for user's input, and the article Timeout on Console Input).

One idea from those linked pages is to poll BufferedReader.ready() until it returns true, and then call readLine(). This is ugly (because it uses polling) and unreliable, because readLine() can block even if ready() returned true – for example because an incomplete line is available (on Unix-like systems the user can achieve this by typing some text then pressing Ctrl+D instead of Enter).

Another idea is to create a background thread that repeatedly calls BufferedReader.readLine() and inserts the results into a BlockingQueue (such as ArrayBlockingQueue). Then the main thread can call take() or poll(timeout, unit) on the queue to obtain lines.

A limitation of this approach is that if you later want to read from the BufferedReader directly (as opposed to through the queue), it's pretty much impossible to avoid losing (at least) one line of input. This is because a thread can't be interrupted cleanly when it's blocked on readLine(), so if the main thread decides to stop early (e.g. because of a timeout) it can't prevent the background thread from reading the line it is currently waiting for.

You could try to "unread" the last line using mark(readAheadLimit) and reset(), but synchronization will be difficult – another thread could try to read from the BufferedReader before the background thread calls reset(). You'd probably have to synchronize using the the lock field, however its access level is protected so you'd only be able to access it using reflection or by subclassing BufferedReader. Also, reset() will fail if the line to be unread is longer than readAheadLimit.

Here is an implementation that assumes you only read lines via the queue.

DISCLAIMER: Beware of bugs in these code snippets – multi-threading is tricky. I might try improve the code another time.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

class InterruptibleLineReader {
    private static final String EOF = new String("<EOF>");
    BufferedReader reader;
    ArrayBlockingQueue<String> lines = new ArrayBlockingQueue<>(/* capacity: */ 2);
    Thread backgroundThread;
    IOException exception;

    public InterruptibleLineReader(BufferedReader reader) {
        this.reader = reader;
        // start a background thread to read lines
        backgroundThread = new Thread(this::backgroundTask);
        backgroundThread.setDaemon(true);
        backgroundThread.start();
    }

    public void close() {
        backgroundThread.interrupt();
        lines.clear();
        lines.add(EOF);
    }

    private void backgroundTask() {
        try {
            try {
                while (true) {
                    String line = reader.readLine();
                    if (Thread.interrupted()) {
                        // nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
                        break;
                    } else if (line == null) {
                        lines.put(EOF);
                        break;
                    }
                    lines.put(line);
                }
            } catch (IOException e) {
                exception = e;
                lines.put(EOF);
            }
        } catch (InterruptedException e) {
            // nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
        }
    }

    public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
        String line = lines.poll(timeout, unit);
        if (line == EOF) { // EOF or IOException
            lines.put(EOF); // restore the EOF so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null;
            }
        }
        return line;
    }
}

class Main {
    static <T> Stream<T> generateOrderedStream(Supplier<Optional<T>> s) {
        // Returns an ordered stream with the values of the Optionals returned by s.get(). An empty Optional ends the stream.
        // As pseudocode:
        //     for (Optional<T> o = s.get(); o.isPresent(); o = s.get())
        //         emit o.get();
        return Stream.iterate(s.get(), Optional::isPresent, prev -> s.get())
            .map(Optional::get);
    }

    static Stream<String> timedReadLines(InterruptibleLineReader lineReader, long timeout, TimeUnit unit) {
        // Reads lines until the timeout elapses and returns them as a stream.
        final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        return generateOrderedStream(() -> {
            try {
                long remainingNanos = deadlineNanos - System.nanoTime();
                return Optional.ofNullable(lineReader.readLine(remainingNanos, TimeUnit.NANOSECONDS));
            } catch (IOException|InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    static void fillStreamMap(InterruptibleLineReader lineReader) {
        // streaming demo
        long maxDurationSecs = 5;
        timedReadLines(lineReader, maxDurationSecs, TimeUnit.SECONDS)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        // stream lines
        InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
        fillStreamMap(lineReader);
        lineReader.close();

        /*
        // attempt to use the BufferedReader directly
        // NOTE: several lines may be lost (depending on the capacity of the ArrayBlockingQueue and how quickly the lines are consumed)
        System.out.println("--- reading directly from BufferedReader ---");
        while (true) {
            try {
                String line = reader.readLine();
                if (line == null) { break; }
                System.out.println("[raw] " + line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        */
    }
}

Here is a more sophisticated implementation that only loses one line of input if you close the queue and read directly from the BufferedReader. It uses a custom "0-capacity" queue to ensure that at most one line will be lost.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

class InterruptibleLineReader {
    BufferedReader reader;
    ZeroCapacityBlockingQueue<String> lines = new ZeroCapacityBlockingQueue<>(); // a null line indicates EOF or IOException
    Thread backgroundThread;
    IOException exception;
    boolean eof;

    public InterruptibleLineReader(BufferedReader reader) {
        this.reader = reader;
        // start a background thread to read lines
        backgroundThread = new Thread(this::backgroundTask);
        backgroundThread.setDaemon(true);
        backgroundThread.start();
    }

    private void markAsEOF() {
        eof = true;
        if (lines.poll() != null) { // markAsEOF() should not be called when there are unconsumed lines
            throw new IllegalStateException();
        }
        lines.offer(null); // unblock threads that are waiting on the queue
    }

    public void close() {
        backgroundThread.interrupt();
        // warn if there is an unconsumed line, and consume it so we can indicate EOF
        String line = lines.poll();
        if (line != null) {
            System.err.println("InterruptibleLineReader: warning: discarding unconsumed line during close(): '" + line + "'");
        }
        markAsEOF();
    }

    private void backgroundTask() {
        try {
            while (true) {
                String line = reader.readLine();
                if (Thread.interrupted()) {
                    if (line != null) {
                        System.err.println("InterruptibleLineReader: warning: discarding line that was read after close(): '" + line + "'");
                    }
                    // nothing further to do (close() is responsible for calling markAsEOF() in this case)
                    break;
                } else if (line == null) { // EOF
                    markAsEOF();
                    break;
                }
                lines.put(line); // this blocks until the line has been consumed ("0-capacity" behaviour)
                if (Thread.interrupted()) {
                    // nothing to do (close() is responsible for calling markAsEOF() in this case)
                    break;
                }
            }
        } catch (IOException e) {
            exception = e;
            markAsEOF();
        } catch (InterruptedException e) {
            // nothing to do (close() is responsible for calling markAsEOF() in this case)
        }
    }

    public String readLine() throws IOException, InterruptedException {
        String line = lines.take();
        if (line == null) { // EOF or IOException
            markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null; // EOF
            }
        } else {
            return line;
        }
    }

    public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
        String line = lines.poll(timeout, unit);
        if (line == null && eof) { // EOF or IOException (not timeout)
            markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null; // EOF
            }
        } else {
            return line;
        }
    }
}

class ZeroCapacityBlockingQueue<T> {
    int count;
    T item;

    public synchronized boolean add(T x) {
        // does not block (i.e. behaves as if the capacity is actually 1)
        if (count == 1) {
            throw new IllegalStateException("Queue full");
        }
        item = x;
        count++;
        notifyAll();
        return true;
    }

    public synchronized boolean offer(T x) {
        // does not block (i.e. behaves as if the capacity is actually 1)
        if (count == 1) {
            return false;
        }
        return add(x);
    }

    public synchronized void put(T x) throws InterruptedException {
        // blocks until the item has been removed ("0-capacity" behaviour)
        while (count == 1) {
            wait();
        }
        add(x);
        while (count == 1 && item == x) {
            wait();
        }
    }

    public synchronized T remove() {
        if (count == 0) {
            throw new NoSuchElementException();
        }
        T x = item;
        item = null;
        count--;
        notifyAll();
        return x;
    }

    public synchronized T poll() {
        if (count == 0) {
            return null;
        }
        return remove();
    }

    public synchronized T take() throws InterruptedException {
        while (count == 0) {
            wait();
        }
        return remove();
    }

    public synchronized T poll(long timeout, TimeUnit unit) throws InterruptedException {
        long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        while (count == 0) {
            long remainingNanos = deadlineNanos - System.nanoTime();
            if (remainingNanos <= 0) {
                return null;
            }
            TimeUnit.NANOSECONDS.timedWait(this, remainingNanos);
        }
        return remove();
    }
}

class Main {
    static <T> Stream<T> generateOrderedStream(Supplier<Optional<T>> s) {
        // Returns an ordered stream with the values of the Optionals returned by s.get(). An empty Optional ends the stream.
        // As pseudocode:
        //     for (Optional<T> o = s.get(); o.isPresent(); o = s.get())
        //         emit o.get();
        return Stream.iterate(s.get(), Optional::isPresent, prev -> s.get())
            .map(Optional::get);
    }

    static Stream<String> timedReadLines(InterruptibleLineReader lineReader, long timeout, TimeUnit unit) {
        // Reads lines until the timeout elapses and returns them as a stream.
        final long deadlineNanos = System.nanoTime() + unit.toNanos(timeout);
        return generateOrderedStream(() -> {
            try {
                long remainingNanos = deadlineNanos - System.nanoTime();
                return Optional.ofNullable(lineReader.readLine(remainingNanos, TimeUnit.NANOSECONDS));
            } catch (IOException|InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    static void fillStreamMap(InterruptibleLineReader lineReader) {
        // streaming demo
        long maxDurationSecs = 5;
        timedReadLines(lineReader, maxDurationSecs, TimeUnit.SECONDS)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        // stream lines
        InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
        fillStreamMap(lineReader);
        lineReader.close();

        /*
        // attempt to use the BufferedReader directly
        // NOTE: a line will be lost
        System.out.println("--- reading directly from BufferedReader ---");
        while (true) {
            try {
                String line = reader.readLine();
                if (line == null) { break; }
                System.out.println("[raw] " + line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        */
    }
}

Here is an example run of the second implementation (with the last part of main() uncommented). The timestamps are in seconds and ">" denotes input.

0.06 --- streaming lines using InterruptibleLineReader for 5.0 sec  ---
0.82 > one
0.83 [mapped] one
1.76 > two
1.76 [mapped] two
2.73 > three
2.73 [mapped] three
5.06 --- reading directly from BufferedReader ---
6.93 > four
6.94 InterruptibleLineReader: warning: discarding line that was read after close(): 'four'
7.76 > five
7.76 [raw] five
8.60 > six
8.60 [raw] six

Note how the line "four" was lost. To avoid losing lines, don't use the underlying BufferedReader after the InterruptibleLineReader instance is created.

(If you really need a BufferedReader after that point, you could write a dummy subclass of BufferedReader that wraps InterruptibleLineReader and forwards readLine() calls to it. The other BufferedReader methods, such as read() and mark(), can't be implemented easily.)

Calchas answered 9/6, 2022 at 0:50 Comment(1)
I don't know how to thank you @Tom This is exactly what I was looking for. You didn't just write a solution, you also wrote a couple of ways and topics to explore. I appreciate your effortsGodred

© 2022 - 2024 — McMap. All rights reserved.