Using AutoClosable interfaces inside Stream API [duplicate]
Asked Answered
M

3

14

Today I tried to refactor this code, that reads ids from files in a directory,

Set<Long> ids = new HashSet<>();
for (String fileName : fileSystem.list("my-directory")) {
    InputStream stream = fileSystem.openInputStream(fileName);
    BufferedReader br = new BufferedReader(new InputStreamReader(stream));
    String line;
    while ((line = br.readLine()) != null) {
        ids.add(Long.valueOf(line.trim()));
    }
    br.close();
}

using stream api

Set<Long> ids = fileSystem.list("my-directory").stream()
    .map(fileName -> fileSystem::openInputStream)
    .map(is -> new BufferedReader(new InputStreamReader(is)))
    .flatMap(BufferedReader::lines)
    .map(String::trim)
    .map(Long::valueOf)
    .collect(Collectors.toSet());

Then I found that IO streams will not be closed and I don't see a simple way to close them, because they are created inside the pipeline.

Any ideas?

upd: FileSystem in example is HDFS, Files#lines and similar methods can't be used.

Mcgruter answered 25/4, 2017 at 11:10 Comment(3)
Also, don't forget that you need to close the Stream created by fileSystem.list("my-directory").stream() if you are accessing the actual file system. These are auto-closable so can be done with a try-with-resources block.Autobahn
@GeraldMücke I checked it, both solutions would not help here, try-with-resources has exact same problem as Luan Nico's answerMcgruter
@JarrodRoberson since you have time to close the question, maybe you'll elaborate how wrapper solution or try-with-resources from that answer will help here?Mcgruter
C
17

It is possible to hook into the stream to 'close' resources once all elements of the stream have been consumed. So it is possible to close the reader after all lines have been read with the following modification:

.flatMap(reader -> reader.lines().onClose(() -> close(reader)))

Where close(AutoClosable) handles the IOException.

As a proof of concept, the following code and output has been tested:

import java.util.stream.Stream;

class Test {
    public static void main(String[] args) {
        Stream.of(1, 2, 3).flatMap(i ->
                Stream.of(i, i * 2).onClose(() ->
                        System.out.println("Closed!")
                )
        ).forEach(System.out::println);
    }
}

1
2
Closed!
2
4
Closed!
3
6
Closed!
Cinder answered 25/4, 2017 at 11:39 Comment(7)
oh, maybe it's what I was looking forMcgruter
great, it worksMcgruter
@Cinder yes definitely plus one. this is exactly why onClose exists - to close the resources if you otherwise can't.Fdic
@Cinder a good example showing defer to close a AutoCloseable resource when using as a stream, I want to plus more but I can't.Kulun
This is correct, but if I were performing a code review, I would suggest that a plain loop is far more readable.Consecutive
@Consecutive It would probably be good to know about this feature if you have a component that returns a Stream object, so it cleans up after use.Cinder
Using the UncheckedCloseable of this answer, you may simply use .flatMap(br -> br.lines().onClose(UncheckedCloseable.wrap(br))).Terat
F
4

Why not a bit simpler, via Files.lines:

try (Stream<String> s = Files.lines(Paths.get("yourpath" + fileName))) {
    s.map(String::trim)
      .map(Long::valueOf)
      .collect(Collectors.toSet());
}
Fdic answered 25/4, 2017 at 11:22 Comment(5)
because my filesystem is hdfs :)Mcgruter
it operates on other abstractions, I can't use Files.lines method, because I don't have java.nio.file.Path and other stuffMcgruter
@Fdic Paths (and therefore Files.lines) works only with some standard file systems by default. The fileSystem in the question probably not the best example because it seems to be something custom or from some framework.Prehensile
@Kapep thank you, makes sense.Fdic
yes, I updated the question to clarify. I'm looking for solution on java-stream level, rather than file-system workaroundsMcgruter
F
0

I haven't tested the actual code, but maybe something along these lines?

Set<Long> ids = fileSystem.list("my-directory").stream()
.map(fileName -> fileSystem::openInputStream)
.flatMap(is -> {
    try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
      return is.lines().map(String::trim).map(Long::valueOf);
    }
 })
.collect(Collectors.toSet());

Of course not as pretty as yours, but I believe it's the closest that allows you to close it.

Florio answered 25/4, 2017 at 11:14 Comment(3)
will not the IO stream be closed inside flatMap before actually returning lines? need to test thatMcgruter
I'm not sure, @AdamSkywalker, that's a very good point. Can you test it in your code? I'm not sure what is this fileSystem variable.Florio
yes, I tested in on local file system and got java.io.UncheckedIOException: java.io.IOException: Stream closed, seems that my first comment is correctMcgruter

© 2022 - 2024 — McMap. All rights reserved.