3

My data is stored in Cosmos and it is Hierarchical and stored in different tables.

Hiearchy of Data
OnBoardingDefinition -> List<FeatureOrder>
FeatureStepMappingDefinition -> List<Steps>
OnBoardingStepDefinition -> Step details

When I call cosmos I get Flux>, Flux> & Flux>. I need to construct a complete OnBoarding object which will have all details of Ids.

Optional<Flux<OnBoardingDefinition>> onBoardingDefinitionFlux = cosmosRepository.getCosmosDocuments(---);
Optional<Flux<FeatureStepMappingDefinition>> featureStepMappingDefinitionFlux = cosmosRepository.getCosmosDocuments(---);
Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux = cosmosRepository.getCosmosDocuments(----);

Flux<Flux<Mono<StepResponseDto>>> flux3 = onBoardingDefinitionFlux.get()
            .map(onBoardingDefinition -> onBoardingDefinition.getFeatureOrder())
            .flatMap(Flux::fromIterable)
            .filter(featureOrder -> features.contains(featureOrder.getFeatureCode()) && Objects.nonNull(featureOrder.getRequired()) && featureOrder.getRequired())
            .map(featureOrder ->
                {
                    return getFromFeature(featureStepMappingDefinitionFlux, onBoardingStepDefinitionFlux, featureOrder);
                }
            );


private Flux<Mono<StepResponseDto>> getFromFeature(Optional<Flux<FeatureStepMappingDefinition>> featureStepMappingDefinitionFlux, Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux, FeatureOrder featureOrder) {
    Flux<Mono<StepResponseDto>> flux1 = featureStepMappingDefinitionFlux.get()
        .filter(featureStepMappingDefinition -> featureStepMappingDefinition.getFeatureCode().equalsIgnoreCase(featureOrder.getFeatureCode()))
        .map(featureStepMappingDefinition -> featureStepMappingDefinition.getSteps())
        .flatMap(Flux::fromIterable)
        .map(step ->
            {
                return getStepResponseDtoMono(onBoardingStepDefinitionFlux, step);
            }
        );
    return flux1;
}

private Mono<StepResponseDto> getStepResponseDtoMono(Optional<Flux<OnBoardingStepDefinition>> onBoardingStepDefinitionFlux, Step step) {
    Mono<StepResponseDto> flux2 = onBoardingStepDefinitionFlux.get()
        .filter(onBoardingStepDefinition -> Objects.nonNull(onBoardingStepDefinition.getActive()) && onBoardingStepDefinition.getActive() && onBoardingStepDefinition.getStepCode().equalsIgnoreCase(step.getStepCode()))
        .map(onBoardingStepDef -> getStepResponseDto(onBoardingStepDef)).next();
    return flux2;
}

I need to simplify the above process to avoid nested Flux & Monos. I tried multiple approaches but they didn't work. I have created sample code and output which I am expecting using Flux. I have created using Java 8 the same I want to achieve using Flux.

    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.UUID;
    import java.util.stream.Collectors;
    import reactor.core.publisher.Flux;

    //Sample code
    public class MRE {

        public static class Step {
            private String stepCode;
            private Boolean active;
            private String name;

            public Step(String stepCode, Boolean active, String name) {
                this.stepCode = stepCode;
                this.active = active;
                this.name = name;
            }

            public Boolean getActive() {
                return active;
            }

            public void setActive(Boolean active) {
                this.active = active;
            }

            public String getStepCode() {
                return stepCode;
            }

            public void setStepCode(String stepCode) {
                this.stepCode = stepCode;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            @Override
            public String toString() {
                return "Step{" +
                    "stepCode='" + stepCode + '\'' +
                    ", active=" + active +
                    ", name='" + name + '\'' +
                    '}';
            }
        }

        public static class Steps {
            private String stepCode;

            public Steps(String stepCode) {
                this.stepCode = stepCode;
            }

            public String getStepCode() {
                return stepCode;
            }

            public void setStepCode(String stepCode) {
                this.stepCode = stepCode;
            }

            @Override
            public String toString() {
                return "Steps{" +
                    "stepCode='" + stepCode + '\'' +
                    '}';
            }
        }

        public static class FeatureStepMapping {
            public FeatureStepMapping(String featureCode, List<Steps> steps) {
                this.featureCode = featureCode;
                this.steps = steps;
            }

            private String featureCode;
            private List<Steps> steps = new ArrayList<>();

            public String getFeatureCode() {
                return featureCode;
            }

            public void setFeatureCode(String featureCode) {
                this.featureCode = featureCode;
            }

            public List<Steps> getSteps() {
                return steps;
            }

            public void setSteps(List<Steps> steps) {
                this.steps = steps;
            }

            @Override
            public String toString() {
                return "FeatureStepMapping{" +
                    "featureCode='" + featureCode + '\'' +
                    ", steps=" + steps +
                    '}';
            }
        }

        public static class StepResponse {
            private String stepCode;
            private String name;

            public StepResponse(String stepCode, String name) {
                this.stepCode = stepCode;
                this.name = name;
            }

            public String getStepCode() {
                return stepCode;
            }

            public void setStepCode(String stepCode) {
                this.stepCode = stepCode;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            @Override
            public String toString() {
                return "StepResponse{" +
                    "stepCode='" + stepCode + '\'' +
                    ", name='" + name + '\'' +
                    '}';
            }
        }


        public static class OnBoarding {
            private List<String> featureCodes = new ArrayList<>();

            public OnBoarding(List<String> featureCodes) {
                this.featureCodes = featureCodes;
            }

            public List<String> getFeatureCodes() {
                return featureCodes;
            }

            public void setFeatureCodes(List<String> featureCodes) {
                this.featureCodes = featureCodes;
            }

            @Override
            public String toString() {
                return "OnBoarding{" +
                    "featureCodes=" + featureCodes +
                    '}';
            }
        }

        public static class OnBoardingResponse {
            private String id;
            private List<StepResponse> stepResponses = new ArrayList<>();

            public OnBoardingResponse(String id, List<StepResponse> stepResponses) {
                this.id = id;
                this.stepResponses = stepResponses;
            }

            public String getId() {
                return id;
            }

            public void setId(String id) {
                this.id = id;
            }

            public List<StepResponse> getStepResponses() {
                return stepResponses;
            }

            public void setStepResponses(List<StepResponse> stepResponses) {
                this.stepResponses = stepResponses;
            }

            @Override
            public String toString() {
                return "OnBoardingResponse{" +
                    "id='" + id + '\'' +
                    ", stepResponses=" + stepResponses +
                    '}';
            }
        }

        public static void main(String[] args) {

            Step step1 = new Step("S1", true, "Step1");
            Step step2 = new Step("S2", true, "Step2");
            Step step3 = new Step("S3", true, "Step3");

            List<Step> stepList = Arrays.asList(step1, step2, step3);
            Flux<List<Step>> stepFlux = Flux.just(stepList);


            FeatureStepMapping featureStepMapping1 = new FeatureStepMapping("f1", new ArrayList<>());
            featureStepMapping1.getSteps().add(new Steps("S1"));
            featureStepMapping1.getSteps().add(new Steps("S2"));

            FeatureStepMapping featureStepMapping2 = new FeatureStepMapping("f2", new ArrayList<>());
            featureStepMapping1.getSteps().add(new Steps("S3"));

            List<FeatureStepMapping> featureStepMappingList = Arrays.asList(featureStepMapping1, featureStepMapping2);
            Flux<List<FeatureStepMapping>> stepFeature = Flux.just(featureStepMappingList);


            List<OnBoarding> onBoardingList = Arrays.asList(new OnBoarding(Arrays.asList("f1", "f2")));
            Flux<List<OnBoarding>> onBoardingFlux = Flux.just(onBoardingList);

            // Get Mono<OnBoardingResponse> Don't change Flux<List<>> assume this is I get from database directly.

            //With Plain Java8
            List<StepResponse> stepResponses = onBoardingList.stream().flatMap(onBoarding -> onBoarding.getFeatureCodes().stream())
            .map(feature ->
                featureStepMappingList.stream()
                    .filter(featureStep -> featureStep.getFeatureCode().equalsIgnoreCase(feature))
                    .map(obj -> obj.getSteps()).flatMap(step -> step.stream()).collect(Collectors.toList())
            ).flatMap(obj -> obj.stream())
                .map(step -> stepList.stream().filter(definedStep -> definedStep.getStepCode().equalsIgnoreCase(step.getStepCode())).collect(Collectors.toList()))
            .flatMap(step -> step.stream()).map(step -> new StepResponse(step.getStepCode(), step.getName())).collect(Collectors.toList());

            OnBoardingResponse onBoardingResponse = new OnBoardingResponse(UUID.randomUUID().toString(), stepResponses);

            System.out.println(onBoardingResponse);
        }
    }

//Output 
OnBoardingResponse{id='77b647c5-84ae-4fde-8039-ea5deed1a2bd', stepResponses=[StepResponse{stepCode='S1', name='Step1'}, StepResponse{stepCode='S2', name='Step2'}, StepResponse{stepCode='S3', name='Step3'}]}
18
  • The first thing I'd usually do is to try to extract steps as methods. Your map(featureOrder -> looks like a good candidate. Commented May 14, 2020 at 2:36
  • Initially, I created a Map of FeatureCode and Feature but my main issue is how to handle nested Flux and Monos. Once that is resolved I can extract them as methods. I have extracted as methods. Commented May 14, 2020 at 2:55
  • 1
    @cody123 Cheers I'll have a look later Commented May 14, 2020 at 6:36
  • 1
    Why is the database returning Flux<List<>> ? A Flux is basically a reactive list, are you creating these entities your self, as no packages I know of would do this? Commented May 14, 2020 at 6:44
  • 1
    Dont return Optional<FluxT> skip the optional. If there is no result you return a Flux.empty() instead. Wrapping fluxes in optionals is unecassary Commented May 14, 2020 at 7:11

1 Answer 1

2

Using you example you could do something similar to this to prevent nesting

    Flux<Step> stepFlux = Flux.just(
            new Step("S1", true, "Step1"),
            new Step("S2", true, "Step2"),
            new Step("S3", true, "Step3"));

    FeatureStepMapping featureStepMapping1 = new FeatureStepMapping("f1", new ArrayList<>());
    featureStepMapping1.getSteps().add(new Steps("S1"));
    featureStepMapping1.getSteps().add(new Steps("S2"));

    FeatureStepMapping featureStepMapping2 = new FeatureStepMapping("f2", new ArrayList<>());
    featureStepMapping2.getSteps().add(new Steps("S3"));

    Flux<FeatureStepMapping> stepFeature = Flux.just(featureStepMapping1,featureStepMapping2);
    Flux<OnBoarding> onBoarding = Flux.just(new OnBoarding(Arrays.asList("f1", "f2")));

    onBoarding
            .map(OnBoarding::getFeatureCodes)
            //Convert flux to be flux of the feature codes
            .flatMap(Flux::fromIterable)
            //Create tuple matching every feature code to every stepFeature
            //Effectively the same as nested lists
            .join(stepFeature,s -> Flux.never(),s -> Flux.never(),Tuples::of)
            //filter out features that don't match
            .filter(t -> t.getT2().getFeatureCode().equalsIgnoreCase(t.getT1()))
            //Convert flux back to just stepFeatures
            .map(Tuple2::getT2)
            //Convert flux to list of steps
            .map(FeatureStepMapping::getSteps)
            //Convert to individual steps from list
            .flatMap(Flux::fromIterable)
            //Same as before but with stepFlux
            .join(stepFlux,s -> Flux.never(),s -> Flux.never(),Tuples::of)
            .filter(t -> t.getT2().getStepCode().equalsIgnoreCase(t.getT1().getStepCode()))
            //Probably get the point by now
            .map(Tuple2::getT2)
            //Create step responses 
            .map(step -> new StepResponse(step.getStepCode(), step.getName()))
            //Collect to Mono<List<StepResponse>>
            .collectList()
            //Create onboarding response
            .map(stepList -> new OnBoardingResponse(UUID.randomUUID().toString(),stepList))
            //Don't have to subscribe, return to whatever or do whatever with resultant Mono
            //Just to see result as don't know your intentions
            .subscribe(System.out::println);

Also if you genuinely get Flux<List<>> you can convert to a normal Flux by using

fluxList.flatMap(Flux::fromIterable)
Sign up to request clarification or add additional context in comments.

2 Comments

It worked like charm. Could you please share me some resources so I can learn more about react. I started few days back.
@cody123 You can use projectreactor.io/docs/core/release/reference for a lot of the basic stuff. Everything else I just look at the source code, it's suprisingly well commented for the higher functions.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.