For recursion kind of use-cases, Reactor offers Mono.expand (and other variants like Flux.expand or Flux.expandDeep).
The expand operator is like flatMap, except that is it also re-applied on elements it produces (therefore, making a recursion effect).
For your use case, a pseudo-code could be:
int count = 2;
Mono<Response> firstPage = service.getFirstPage(count);
Flux<Response> allPages = firstPage.expand(response -> {
if (response.moreAvailable) return service.nextPage(response.offset, count);
else return Mono.empty();
});
Here is a complete minimal reproducible example that mock the service returning pages with in-memory records:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.List;
import static java.lang.Math.min;
public class ExpandPages {
// Data models
record Student(int id, String name) {}
record Page(int remaining, int nextOffset, List<Student> students) {}
/**
* Mock a service that sends pages of students
* @param all All available students
*/
record StudentRegistry(List<Student> all) {
StudentRegistry {
if (all == null || all.isEmpty()) throw new IllegalArgumentException("Null or empty student list");
all = Collections.unmodifiableList(all);
}
/**
* Request a page of students.
*
* @return A single page of students, starting at provided offset, with a maximum count of provided count.
*/
public Mono<Page> next(int offset, int count) {
if (offset < 0 || offset >= all.size()) throw new IllegalArgumentException("Bad offset");
if (count < 1) throw new IllegalArgumentException("Bad count");
count = min(count, all.size() - offset);
int nextOffset = offset + count;
int remaining = all.size() - nextOffset;
return Mono.just(new Page(remaining, nextOffset, all.subList(offset, offset + count)));
}
}
public static void main(String[] args) {
final var registry = new StudentRegistry(List.of(
new Student(1, "John"),
new Student(2, "Jane"),
new Student(3, "Jack"),
new Student(4, "Jules"),
new Student(5, "Julie"),
new Student(6, "James"),
new Student(7, "Joe"),
new Student(8, "Johanna"),
new Student(9, "Jolly Jumper")
));
final int queriedCount = 2;
Flux<Page> pages = registry
// Get first page
.next(0, queriedCount)
// Recurse on each received page: check if there's more, then ask for the next available page
.expand(response -> {
System.out.println("Received page "+response);
if (response.remaining() <= 0) {
System.out.println("No more page to fetch.");
return Mono.empty(); // Ends recursion
} else {
return registry.next(response.nextOffset(), min(queriedCount, response.remaining()));
}
});
// Trigger flow consumption: print all received students
pages.flatMapIterable(Page::students)
.doOnNext(System.out::println)
.blockLast();
}
}