3

So I started to play with the Advent of Code and I would like to use the project reactor for this to find the solutions in a reactive way.

I have implemented a solutions that works partially but not quite how I want it. Because it can also read lines partially if there is no more space in the buffer.

The Input to run the following function you can find here: https://adventofcode.com/2022/day/1/input

        public static Flux<String> getLocalInputForStackOverflow(String filePath) throws IOException {
    Path dayPath = Path.of(filePath);
    FileOutputStream resultDay = new FileOutputStream(basePath.resolve("result_day.txt").toFile());

    return DataBufferUtils
            .readAsynchronousFileChannel(
                    () -> AsynchronousFileChannel.open(dayPath),
                    new DefaultDataBufferFactory(),
                    64)
            .map(DataBuffer::asInputStream)
            .map(db -> {
                try {
                    resultDay.write(db.readAllBytes());
                    resultDay.write("\n".getBytes());
                    return db;
                } catch (FileNotFoundException e) {
                    throw new RuntimeException(e);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            })
            .map(InputStreamReader::new)
            .map(is ->new BufferedReader(is).lines())
            .flatMap(Flux::fromStream);
}

The point of this function is to read the lines of the files in a reactive way.

I used the FileOutputStream to write what I read into another file and the compare the resulted file with the original, because I noticed that some lines are only partially read if there is no more space in the buffer. So the try-catch .map() can be ignored

My questions here would:

  1. Is there a more optimal way to read files asynchronously in a Reactive way?

  2. Is there a more optimal way to read a file asyncronously line by line with a limited buffer and make sure that only whole lines are read?

Workarounds that I've found are:

  1. Increased the buffer to read the whole file in 1 run -> Not optimal solution
  2. Use the following functions, but this raise a warning: Possibly blocking call in non-blocking context could lead to thread starvation
    public static Flux<String> getLocalInput1(int day ) throws IOException {
        Path dayPath = getFilePath(day);
        return Flux.using(() -> Files.lines(dayPath),
                Flux::fromStream,
                BaseStream::close);
    }

1

1 Answer 1

2

You're almost there. Just use BufferedReader instead of Files.lines.

In Spring Webflux, the optimal way to read files asynchronously in a reactive way is to use the Reactor Core library's Flux.using method. It creates a Flux that consumes a resource, performs some operations on it, and then cleans up the resource when the Flux is complete.

Example of reading a file asynchronously and reactively:

Flux<String> flux = Flux.using(

                     // resource factory creates FileReader instance
    () -> new FileReader("/path/to/file.txt"),

                     // transformer function turns the FileReader into a Flux
    reader -> Flux.fromStream(new BufferedReader(reader).lines()),

                     // resource cleanup function closes the FileReader when the Flux is complete
    reader -> reader.close()
);

Subscribe to the Flux and consume the lines of the file as they are emitted; this will print each line of the file to the console as it is read from the file.

flux.subscribe(line -> System.out.println(line));

In similar way we can solve it controlling each line explicitly:

Flux<String> flux = Flux.generate( 

    () -> new BufferedReader(new FileReader("/path/to/file.txt")),

                 // generator function reads a line from the file and emits it
    (bufferedReader, sink) -> {
        String line = bufferedReader.readLine();
        if (line != null) {
            sink.next(line);
        } else {
            sink.complete();
        }
    },
   
    reader -> Mono.fromRunnable(() -> {
       try {
         reader.close();
       } catch (IOException e) {
          // Handle exception
       }
    })
);
Sign up to request clarification or add additional context in comments.

4 Comments

Hi, your solutions looks good, but i receive for reader -> reader.close() a warning that is an unhandled exception. Then if I surround it with a try/catch there is another warning that the reader.close() is a possible blocking call in a non blocking operation. Is there a way to optimise this behaviour?
Yes, for simplisity I did not include try-catch boilerplate. The "blocking call" warning can be avoided with Mono.fromRunnable. The answer is updated.
Isn't Flux.fromStream too, a blocking call ? And needs to be wrapped in a subscribe publish pair ?
For the Flux.generate case too, isn't bufferedReader.readLine() a blocking call ? And needs to be subscribed to on a blocking-friendly scheduler ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.