I created an integration test for my pipeline to check if the right CSV file is generated:
class CsvBatchSinkTest {
@RegisterExtension
static SparkExtension spark = new SparkExtension();
@TempDir
static Path directory;
//this checks if the file is already available
static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
}
//this gets content of file
static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
return Files.readAllLines(
Files.walk(file.toPath())
.filter(f -> f.toString().endsWith(suffix))
.findFirst()
.orElseThrow(AssertionException::new));
}
@Test
@DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
void testWrite() throws IOException, InterruptedException {
File file = new File(directory.toFile(), "output");
List<Row> data =
asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));
Dataset<Row> dataset =
spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);
dataset.coalesce(1)
.write()
.option("header", "true")
.option("delimiter", ";")
.csv(file.getAbsolutePath());
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> isFileWithSuffixAvailable(file, ".csv"));
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.untilAsserted(
() ->
assertThat(extractFileWithSuffixContent(file, ".csv"))
.containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
}
}
The real code looks a little bit different, it is just an reproducible example.
Spark extension just starts local spark before every test and closes is after.
The test passes, but then when junit tries to clean up @TempDir
following exception is thrown:
Failed to delete temp directory C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194. The following paths could not be deleted
Can I somehow fix this error? I tried waiting for spark to stop using awaility
, but I didn't really help.
Maybe I can somehow ignore this error?
Quick guess: you need to close the stream returned by Files.walk. Quote from the docs: