I have a piece of code that monitors a directory for addition of files. Whenever a new file is added to the directory, the contents of the file are picked and published on kafka and then the file is deleted.
This works when I make a single request but as soon as I subject my code to 5 or 10 user request from jMeter, the contents are published on kafka successfully but the code isn't able to delete the file. I get a FileSystemException
with a message that The process cannot access the file because it is being used by another process.
.
I guess there is some concurrency issue which I am unable to see.
public void monitor() throws IOException, InterruptedException {
Path faxFolder = Paths.get(TEMP_FILE_LOCATION);
WatchService watchService = FileSystems.getDefault().newWatchService();
faxFolder.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
boolean valid = true;
do {
WatchKey watchKey = watchService.take();
for (WatchEvent<?> event : watchKey.pollEvents()) {
if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
String fileName = event.context().toString();
publishToKafka(new File(TEMP_FILE_LOCATION + fileName).toPath(), "topic");
}
}
valid = watchKey.reset();
} while (valid);
}
private void publishToKafka(Path path, String topic) {
try (BufferedReader reader = Files.newBufferedReader(path)) {
String input = null;
while ((input = reader.readLine()) != null) {
kafkaProducer.publishMessageOnTopic(input, topic);
}
} catch (IOException e) {
LOG.error("Could not read buffered file to send message on kafka.", e);
} finally {
try {
Files.deleteIfExists(path); // This is where I get the exception
} catch (IOException e) {
LOG.error("Problem in deleting the buffered file {}.", path.getFileName(), e);
}
}
}
Exception Log :
java.nio.file.FileSystemException: D:\upload\notif-1479974962595.csv: The process cannot access the file because it is being used by another process.
at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source)
at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(Unknown Source)
at java.nio.file.Files.deleteIfExists(Unknown Source)
at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.publishToKafka(MonitorDirectory.java:193)
at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.sendData(MonitorDirectory.java:125)
at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.run(MonitorDirectory.java:113)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
newWatchService()
inside a try-with-resources. Also, since you're using nio everywhere else, for consistency instead ofnew File(...).toPath()
dofaxFolder.resolve(filename)
. – RallyWatchEvent
’s context is already aPath
when watching a file system, even the conversion to string is obsolete andfaxFolder.resolve((Path)event.context())
is sufficient. I think, you’re right, reacting on the creation event immediately doesn’t give the creator enough time to close the file. Interestingly, on my system, that has the opposite effect: reading the file fails as it is still be written, but deleting works, as the system postpones the actual removal to the point, the creator closes the file. – AbcpublishToKafka()
method would findnull
in between and consider that an end of input and then the code goes ahead for deletion which is not possible right now. – Tackett