I have code that reads a file and converts and returns a flux of a list of hashmaps in groups of 10. Initially I was taking an input of the entire file content as a string, but I felt like it would use too much memory, and want to read the file directly. Is my implementation actually saving any memory? I feel like using .collectList contradicts what I am doing.
public Flux<List<Map<String, String>>> processCsvFile(File csvFile, String delimiter) {
notBlank(csvFile.getPath(), "the file must not be empty");
notBlank(delimiter, "the delimiter must not be empty");
return Flux.using(
() -> Files.lines(csvFile.toPath()),
Flux::fromStream,
stream -> stream.close()
)
.collectList()
.flatMapMany(lines -> {
final var headers = Splitter.on(delimiter)
.trimResults()
.splitToList(lines.get(0).replaceAll(Pattern.quote(delimiter) + "+$", ""));
return Flux.fromIterable(lines.subList(1, lines.size()))
.map(line -> {
final var values =
Splitter.on(delimiter).trimResults().splitToList(line.replaceAll(Pattern.quote(delimiter) + "+$", ""));
Map<String, String> outputMap = new HashMap<>();
for (var i = 0; i < headers.size(); i++) {
outputMap.put(headers.get(i), i < values.size() ? values.get(i) : "");
}
return outputMap;
})
.buffer(10);
})
.onErrorResume(e -> {
log.atError()
.addKeyValue("csvFile", csvFile.getName())
.setCause(e)
.log("Error processing CSV file");
return Flux.error(new RuntimeException("Error processing CSV file", e));
});
}
Here is an example file content:
id|name|description|created_at|
id1|name1|desc1|created1|
id2|name2|desc2|created2|
id3|name3|desc3|created3|
id4|name4|desc4|created4|
id5|name5|desc5|created5|
id6|name6|desc6|created6|
id7|name7|desc7|created7|
id8|name8|desc8|created8|
id9|name9|desc9|created9|
id10|name10|desc10|created10|
id11|name11|desc11|created11|
id12|name12|desc12|created12|
Fluxclass you are using appears to bereactor.core.publisher.Flux. Although Reactor integrates with Spring, this "Flux" is not part of Spring WebFlux. If I'm reading that correctly then please update your tags appropriately -- the question is about project-reactor, not spring-webflux.