Is there a way to prevent ClosedByInterruptException?
Asked Answered
S

3

27

In the following example, I have one file being used by two threads (in the real example I could have any number of threads)

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class A {
    static volatile boolean running = true;

    public static void main(String[] args) throws IOException, InterruptedException {
        String name = "delete.me";
        new File(name).deleteOnExit();
        RandomAccessFile raf = new RandomAccessFile(name, "rw");
        FileChannel fc = raf.getChannel();

        Thread monitor = new Thread(() -> {
            try {
                while (running) {
                    System.out.println(name + " is " + (fc.size() >> 10) + " KB");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Monitor thread died");
                e.printStackTrace();
            }
        });
        monitor.setDaemon(true);
        monitor.start();

        Thread writer = new Thread(() -> {
            ByteBuffer bb = ByteBuffer.allocateDirect(32);
            try {
                while (running) {
                    bb.position(0).limit(32);
                    fc.write(bb);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (IOException e) {
                System.err.println("Writer thread died");
                e.printStackTrace();
            }
        });

        writer.setDaemon(true);
        writer.start();

        Thread.sleep(5000);
        monitor.interrupt();
        Thread.sleep(2000);
        running = false;
        raf.close();
    }
}

Rather creating a RandomAccessFile and a memory mapping for each thread, I have one file and one memory mapping shared between threads, but there is a catch, if any thread is interrupted the resource is closed.

delete.me is 0 KB
delete.me is 2 KB
delete.me is 4 KB
delete.me is 6 KB
delete.me is 8 KB
Interrupted
Monitor thread died
java.nio.channels.ClosedByInterruptException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
    at A.lambda$main$0(A.java:19)
    at java.lang.Thread.run(Thread.java:748)
Writer thread died
java.nio.channels.ClosedChannelException
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
    at A.lambda$main$1(A.java:41)
    at java.lang.Thread.run(Thread.java:748)

Is there any way to prevent the FileChannel from being closed just because one thread using it was interrupted?


EDIT What I want to avoid doing is as I suspect it won't work for Java 9+

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class
                .getDeclaredField("interruptor");
        field.setAccessible(true);
        field.set(fc, (Interruptible) thread
                -> Jvm.warn().on(getClass(), fc + " not closed on interrupt"));
    } catch (Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

BTW The call to fc.size() returns the size as expected with the above hack.

Saltpeter answered 10/9, 2018 at 15:28 Comment(13)
I though: that sounds like an interesting but easy to answer thingy. Then I saw the upvote count, and that you asked the question. Something tells me that I don't even have to try to find an answer here. Now let the true guru level guys come in ...Chicle
The only possible way I can think of is what you just excluded in your edit.Gradygrae
@MaxVollmer It works for Java 8, but I suspect not for Java 9+.Saltpeter
wouldn't having different mappings for each thread just solve this? A bit confused...Georgetta
@Georgetta Sure. The files are a few 100 GB, and we could have 10s of threads which is a few TB of virtual memory. As it's only virtual memory, it does have to be a problem but when you have multiple 10s TB java processes it makes sys admins nervous.Saltpeter
@Georgetta the real problem is ensuring developers who use our libraries create and close this resource for each thread. Often they have no control over the thread creation and don't know when they go away.Saltpeter
Hello, why not using an AsynchronousFileChannel, it does not seems to be tied to the Thread context ?Politic
There have been a number of discussions on nio-dev about introducing a new OpenOption for this but no conclusions. In the mean-time, there isn't any supported way to get a FileChannel that is not interruptible. Hacking the interruptor field is not reliable, depends on JDK internals, and could break at any time.Karimakarin
@PeterLawrey seems like you are out of luck as per Alan's comment; a pity thoughGeorgetta
I have difficulties trying to understand your question. You are saying something about memory mapping, but there is no memory mapping in your example code, which is crucial as the entire problem does not apply to memory mapping. Since you are using FileChannel, there wouldn’t be any problem creating multiple FileChannel instances on the same file, as this has nothing to do with the virtual memory (as just creating a file channel is not memory mapping).Daguerre
@Daguerre adding memory mapping is a lot more complicated but I can try. Perhaps I am incorrectly assuming that once a FileChannel is close you can't use the memory mapping associated with it.Saltpeter
@PeterLawrey the memory mapping is entirely unaffected by the closing of the filechannel. In fact, I usually close the channel as early as possible, to minimize the resource usage. E.g. MappedByteBuffer mapped; try(FileChannel fc = FileChannel.open(name, READ, WRITE, CREATE_NEW, DELETE_ON_CLOSE)) { mapped = fc.map(FileChannel.MapMode.READ_WRITE, position, size); } /* use of mapped */ Note that the mapped buffer prevents DELETE_ON_CLOSE from deleting immediately on close, which turns it into a “delete on exit” behavior, which I found to be more reliable than File.deleteOnExit()Daguerre
Like @Daguerre said, a MappedByteBuffer is still valid even after closing the FileChannel which has created it. In fact it is valid until garbage collected. I advise you that memory mapping is not well suited for growing files and give strange results on some os when writing past the boundaries. It can be a good choice if you intend to allocate a given part of the file to each thread but if all threads need to write sequencialy you will need to allocate enough bytes and check if the bound is reached to create a new file (or expand the current file and map the newly created part).Politic
D
14

Since you said you want “one memory mapping shared between threads”, there is no such problem at all, as memory mapping is not affect by the closing of a FileChannel. In fact, it’s a good strategy to close the channel as soon as possible, to reduce the resources held by the application.

E.g.

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    MappedByteBuffer mapped;
    try(FileChannel fc1 = FileChannel.open(name, READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE)) {
        mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
    }
    Thread thread1 = new Thread(() -> {
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            byte[] b = new byte[5];
            mapped.position(4000);
            mapped.get(b);
            System.out.println("read "+new String(b, StandardCharsets.US_ASCII));
        }
    });
    thread1.setDaemon(true);
    thread1.start();
    Thread thread2 = new Thread(() -> {
        byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
        while(running && !Thread.interrupted()) {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
            mapped.position(4000);
            mapped.put(b);
            System.out.println("wrote "+new String(b, StandardCharsets.US_ASCII));
            byte b1 = b[0];
            System.arraycopy(b, 1, b, 0, b.length-1);
            b[b.length-1] = b1;
        }
        mapped.force();
    });
    thread2.setDaemon(true);
    thread2.start();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
    thread2.interrupt();
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
    running = false;

This demonstrates how the threads can read and write their data after the channel has been closed and interrupting the writing thread does not stop the reading thread.

If you need to perform FileChannel operations in addition to memory mapped I/O, there is no problem in using multiple FileChannel instances, so closing one channel does not affect the other. E.g.

static volatile boolean running = true;

public static void main(String[] args) throws IOException {
    Path name = Paths.get("delete.me");
    try(FileChannel fc1 = FileChannel.open(name,READ,WRITE,CREATE_NEW,DELETE_ON_CLOSE);
        FileChannel fc2 = FileChannel.open(name,READ,WRITE)) {
        Thread thread1 = new Thread(() -> {
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
            try {
                MappedByteBuffer mapped = fc1.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                while(running && !Thread.interrupted()) {
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                    byte[] b = new byte[5];
                    mapped.position(4000);
                    mapped.get(b);
                    System.out.println("read from map "
                        +new String(b, StandardCharsets.US_ASCII)
                        +", file size "+fc1.size());
                }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread1.setDaemon(true);
        thread1.start();
        Thread thread2 = new Thread(() -> {
            byte[] b = "HELLO".getBytes(StandardCharsets.US_ASCII);
            try {
                MappedByteBuffer mapped = fc2.map(FileChannel.MapMode.READ_WRITE, 0, 4096);
                fc2.position(4096);
                try {
                    while(running && !Thread.interrupted()) {
                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
                        mapped.position(4000);
                        mapped.put(b);
                        System.out.println("wrote to mapped "
                            +new String(b, StandardCharsets.US_ASCII));
                        byte b1 = b[0];
                        System.arraycopy(b, 1, b, 0, b.length-1);
                        b[b.length-1] = b1;
                        fc2.write(ByteBuffer.wrap(b));
                    }
                } finally { mapped.force(); }
            }catch(IOException ex) {
                ex.printStackTrace();
            }
        });
        thread2.setDaemon(true);
        thread2.start();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
        thread2.interrupt();
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
        running = false;
    }
}

Here, the interruption of one thread does close its channel, but does not affect the other. Further, even when each thread acquires its own MappedByteBuffer from its own channel, changes show through to the other, even without the use of force(). Of course, the latter is defined to be a system dependent behavior, not guaranteed to work on every system.

But as shown with the first example, you still may create shared buffers from only one of the channels at the start, while performing the I/O operations on a different channel, one per thread, and it doesn’t matter whether and which channels get closed, the mapped buffers are not affected by it.

Daguerre answered 13/9, 2018 at 17:13 Comment(0)
G
10

You can use reflection to access the interruptor field illegaly and get the sun.nio.ch.Interruptible class type from there to create a proxy instance:

private void doNotCloseOnInterrupt(FileChannel fc) {
    try {
        Field field = AbstractInterruptibleChannel.class.getDeclaredField("interruptor");
        Class<?> interruptibleClass = field.getType();
        field.setAccessible(true);
        field.set(fc, Proxy.newProxyInstance(
                interruptibleClass.getClassLoader(), 
                new Class[] { interruptibleClass },
                new InterruptibleInvocationHandler()));
    } catch (final Exception e) {
        Jvm.warn().on(getClass(), "Couldn't disable close on interrupt", e);
    }
}

public class InterruptibleInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
    {
        // TODO: Check method and handle accordingly
        return null;
    }
}

In Java9 this works with a single warning, as it runs per default with --illegal-access=permit.

However this flag might be removed in future versions and the best way to ensure this works long-term is to use the flag --add-opens:

--add-opens java.base/sun.nio.ch=your-module
--add-opens java.base/java.nio.channels.spi=your-module

Or, if you aren't working with modules (not recommended):

--add-opens java.base/sun.nio.ch=ALL-UNNAMED
--add-opens java.base/java.nio.channels.spi=ALL-UNNAMED

This works with Java 9, Java 10 and the current JDK 11 Early-Access Build (28 (2018/8/23)).

Gradygrae answered 10/9, 2018 at 17:31 Comment(7)
sorry... not even those are long termGeorgetta
@Georgetta I rather trust official docs than a comment on SO: "In preparation for a JDK release that denies illegal access, applications and libraries should be tested with --illegal-access=deny." The code I posted works with --illegal-access=deny and --add-opens. I am not aware of any official information that --add-exports or --add-opens won't be supported in the forseeable future.Gradygrae
you are right, of course; unless that comment comes from the people who actually wrote jigsaw in the first place, of course :)Georgetta
Hm, fair point, didn't see that. Well, if there actually will be a future Java version where it's impossible to access the internals of the JDK, then what OP is asking is, well, impossible, except by reimplementing the required classes oneself. But until then, this solution works.Gradygrae
What is curious is why it delegates this functionality in the first place. It appears the only purpose for having this field is so it can be changed.Saltpeter
I would have given you a bounty also, except it won't let me. !?Saltpeter
@PeterLawrey I guess the system doesn't like my answer? xD I appreciate the gesture, though.Gradygrae
P
8

By using an AsynchronousFileChannel then ClosedByInterruptException is never thrown It just does not seem to care about the interrupt

Test done using jdk 1.8.0_72

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;

public class A {
    static volatile boolean running = true;

    public static void main(String[] args) throws IOException, InterruptedException {
        String name = "delete.me";
        Path path = new File(name).toPath();
        AtomicLong position = new AtomicLong(0);

        AsynchronousFileChannel fc = AsynchronousFileChannel.open(path, 
                StandardOpenOption.CREATE_NEW, StandardOpenOption.DELETE_ON_CLOSE ,
                StandardOpenOption.READ, StandardOpenOption.WRITE,
                StandardOpenOption.WRITE, StandardOpenOption.SYNC);

        CompletionHandler<Integer, Object> handler =
                new CompletionHandler<Integer, Object>() {
                @Override
                public void completed(Integer result, Object attachment) {
                    //System.out.println(attachment + " completed with " + result + " bytes written");
                    position.getAndAdd(result);
                }
                @Override
                public void failed(Throwable e, Object attachment) {
                    System.err.println(attachment + " failed with:");
                    e.printStackTrace();
                }
            };

        Runnable monitorRun = () -> {
            try {
                while (running) {
                    System.out.println(name + " is " + (fc.size() >> 10) + " KB");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                        System.out.println("Interrupt call failed so return");
                        return;
                    }
                }
            } catch (IOException e) {
                System.err.println("Monitor thread died");
                e.printStackTrace();
            }
        };

        Thread monitor = new Thread(monitorRun);
        monitor.setDaemon(true);
        monitor.start();

        Thread writer = new Thread(() -> {
            ByteBuffer bb = ByteBuffer.allocateDirect(32);
            try {
                while (running) {
                    bb.position(0).limit(32);
                    fc.write(bb,position.get(),null,handler);

                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        System.out.println("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (Exception e) {
                System.err.println("Writer thread died");
                e.printStackTrace();
            }
        });

        writer.setDaemon(true);
        writer.start();

        Thread.sleep(5000);
        monitor.interrupt();
        Thread.sleep(2000);
        monitor = new Thread(monitorRun);
        monitor.start();
        Thread.sleep(5000);
        running = false;
        fc.close();
    }
}

Generate the following output:

delete.me is 0 KB
delete.me is 3 KB
delete.me is 6 KB
delete.me is 9 KB
delete.me is 12 KB
Interrupted
Interrupt call failed so return
delete.me is 21 KB
delete.me is 24 KB
delete.me is 27 KB
delete.me is 30 KB
delete.me is 33 KB
Politic answered 11/9, 2018 at 14:9 Comment(4)
I need this to work with memory mapping. Can we avoid using a RandomAccessFile as well (two file handles is ok)Saltpeter
Sorry but AsynchronousFileChannel does not provide any map function unlike FileChannel. The solution already does not use any RandomAccessFile. What you're requesting seems to go beyond the scope of this question and may need more details about what you're trying to achieve, Please, provide a link to a new question and i will gladly participate.Politic
FYI: Creating a mappedByteBuffer from FileChannel.map and using it to write and getting the size of the already written content also solve the ClosedByInterruptException issuePolitic
It is helpful. I also need memory mapping but I suspect all the operations which could have this problem could be migrated.Saltpeter

© 2022 - 2024 — McMap. All rights reserved.