Junit cannot delete @TempDir with file created by Spark Structured Streaming
Asked Answered
L

1

7

I created an integration test for my pipeline to check if the right CSV file is generated:

class CsvBatchSinkTest {

    @RegisterExtension
    static SparkExtension spark = new SparkExtension();

    @TempDir
    static Path directory;

    //this checks if the file is already available
    static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
        return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
    }

    //this gets content of file
    static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
        return Files.readAllLines(
                Files.walk(file.toPath())
                        .filter(f -> f.toString().endsWith(suffix))
                        .findFirst()
                        .orElseThrow(AssertionException::new));
    }

    @Test
    @DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
    void testWrite() throws IOException, InterruptedException {

        File file = new File(directory.toFile(), "output");


        List<Row> data =
                asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));

        Dataset<Row> dataset =
                spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);

         dataset.coalesce(1)
                .write()
                .option("header", "true")
                .option("delimiter", ";")
                .csv(file.getAbsolutePath());

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> isFileWithSuffixAvailable(file, ".csv"));

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(
                        () ->
                                assertThat(extractFileWithSuffixContent(file, ".csv"))
                                        .containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
    }
}

The real code looks a little bit different, it is just an reproducible example.

Spark extension just starts local spark before every test and closes is after.

The test passes, but then when junit tries to clean up @TempDir following exception is thrown:

Failed to delete temp directory C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194. The following paths could not be deleted

enter image description here

Can I somehow fix this error? I tried waiting for spark to stop using awaility, but I didn't really help.

Maybe I can somehow ignore this error?

Locality answered 24/5, 2019 at 10:0 Comment(1)
@jannis I added it into try and it works now, thanks. Can you add an answer, so I can accept it?Gosling
H
5

Quick guess: you need to close the stream returned by Files.walk. Quote from the docs:

If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's close method is invoked after the stream operations are completed.

-- https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#walk-java.nio.file.Path-java.nio.file.FileVisitOption...-

To fix this add a try-with-resources in the isFileWithSuffixAvailable method:

static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
    try (Stream<Path> walk = Files.walk(directory.toPath())) {
        return walk.anyMatch(f -> f.toString().endsWith(suffix));
    }
}
Hanahanae answered 24/5, 2019 at 14:23 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.