2

I have code that reads a file and converts and returns a flux of a list of hashmaps in groups of 10. Initially I was taking an input of the entire file content as a string, but I felt like it would use too much memory, and want to read the file directly. Is my implementation actually saving any memory? I feel like using .collectList contradicts what I am doing.

public Flux<List<Map<String, String>>> processCsvFile(File csvFile, String delimiter) {

    notBlank(csvFile.getPath(), "the file must not be empty");
    notBlank(delimiter, "the delimiter must not be empty");

    return Flux.using(
            () -> Files.lines(csvFile.toPath()),
            Flux::fromStream,
            stream -> stream.close()
        )
        .collectList()
        .flatMapMany(lines -> {
            final var headers = Splitter.on(delimiter)
                    .trimResults()
                    .splitToList(lines.get(0).replaceAll(Pattern.quote(delimiter) + "+$", ""));

            return Flux.fromIterable(lines.subList(1, lines.size()))
                    .map(line -> {
                        final var values =
                                Splitter.on(delimiter).trimResults().splitToList(line.replaceAll(Pattern.quote(delimiter) + "+$", ""));
                        Map<String, String> outputMap = new HashMap<>();
                        for (var i = 0; i < headers.size(); i++) {
                            outputMap.put(headers.get(i), i < values.size() ? values.get(i) : "");
                        }
                        return outputMap;
                    })
                    .buffer(10);
        })
        .onErrorResume(e -> {
            log.atError()
                    .addKeyValue("csvFile", csvFile.getName())
                    .setCause(e)
                    .log("Error processing CSV file");
            return Flux.error(new RuntimeException("Error processing CSV file", e));
        });
}

Here is an example file content:

id|name|description|created_at|
id1|name1|desc1|created1|
id2|name2|desc2|created2|
id3|name3|desc3|created3|
id4|name4|desc4|created4|
id5|name5|desc5|created5|
id6|name6|desc6|created6|
id7|name7|desc7|created7|
id8|name8|desc8|created8|
id9|name9|desc9|created9|
id10|name10|desc10|created10|
id11|name11|desc11|created11|
id12|name12|desc12|created12|
1
  • 1
    The Flux class you are using appears to be reactor.core.publisher.Flux. Although Reactor integrates with Spring, this "Flux" is not part of Spring WebFlux. If I'm reading that correctly then please update your tags appropriately -- the question is about project-reactor, not spring-webflux. Commented Feb 20 at 14:45

2 Answers 2

2

Yep, using .collectList() will load the entire file content in memory before processing, defeating your purpose of saving memory. On the other hand, using buffer() is good.

Sign up to request clarification or add additional context in comments.

Comments

1

I feel like using .collectList contradicts what I am doing.

It does. That consumes all elements from the initial flux until that completes, storing them all into a List that is then post-processed to produce the final flux of lists of maps. That defeats your purpose of not needing to read the whole file at once. What's more, the whole file is read before processCsvFile() even returns -- not, as one might expect, when someone subscribes to the returned Flux.

You can group incoming lines by buffer()ing the original flux, yielding a flux of Lists of lines. You seem to already be aware of this, so it's unclear why you are use it in the convoluted way you do. Just buffer the flux directly.

You can also transform the elements of a flux by map()ing it. In your case, it makes more sense to do that before grouping than after.

The main issue to overcome here is that you are treating the first line of your CSV specially, such that its contents inform the processing of all the other lines. One way to deal with that is as you have done: collect all of the lines and process them together. If you want to avoid that then you need a way to separate that initial line and capture its contents in a way that is visible when processing each other line, and here is where filter() comes in, and the fact that lambdas close over their context.

Overall, then, maybe something more like this:

public Flux<List<Map<String, String>>> processCsvFile(File csvFile, String delimiter) {

    notBlank(csvFile.getPath(), "the file must not be empty");
    notBlank(delimiter, "the delimiter must not be empty");

    List<String> headers = new ArrayList<>();
    Pattern delimPattern = Pattern.compile("\s*" + Pattern.quote(delimiter) + "\s*");

    return Flux.using(
            () -> Files.lines(csvFile.toPath()),
            Flux::fromStream,
            stream -> stream.close()
        ).filter(line -> {
            if (headers.isEmpty()) {
                headers.addAll(Arrays.asList(delimPattern.split(line)));
                return false;
            }
            return true;
        })
        .map(line -> {
            String[] columns = delimPattern.split(line);
            return IntStream.range(0, headers.size())
                .collect(
                    HashMap::new,
                    (map, i) -> { map.put(headers.get(i), (i < columns.length) ? columns[i] : null); },
                    HashMap::putAll
                );
        })
        .buffer(10)
        .onErrorResume(e -> {
            log.atError()
                    .addKeyValue("csvFile", csvFile.getName())
                    .setCause(e)
                    .log("Error processing CSV file");
            return Flux.error(new RuntimeException("Error processing CSV file", e));
        });
}

Because lambdas close over their context, those used by the various fluxes involved in the above will capture references to the headers and delimPattern objects, so that they can continue to use -- and share! -- them after even after the function returns.

Do note also that although I have mimicked your approach to CSV parsing, that should not be taken as an endorsement. Most CSV dialects are more complicated than that naive approach supports. Unless you have very close control over the input, such that you can be sure that an approach like that is sufficient, you should instead choose and use a well-designed CSV parser that accounts for issues such as quotation and escaping. There are several available. In addition to being more robust, that would surely simplify the above, probably by a lot, or perhaps even moot it altogether.

2 Comments

One thing that comes to mind is that the Files.lines() could be pulled out of the chain to be able to up front read one line from it and parse it, and then feed it back into the Flux code where the header parsing logic is then no longer needed.
That sounds good, @Gimby, but I'm having trouble seeing a good way to implement it, given that it involves pulling the first element from a stream while preserving access to the tail. (I can think of at least one bad way of doing it.) I'm not inclined to devote much more thought to it, though, because a better solution would leverage a well-built CSV parser instead of hand-rolling a quick & dirty one, and that would completely change the picture.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.