2

I want to replace the following for-each loop using Java 8 Streams:

for (Rule rule : this.rules) {

    if (rule.getCondition().evaluate(rule, record)) {
        Records.emit(collector, outputStreamMapping.get(rule.getOutputStream()), tuple, recordId, record);
        collector.ack(tuple);

        ruleApplied = true;
        break;
    }
}

if (!ruleApplied) {
    LOGGER.warn("No rule was applied to record {}", record);
    LOGGER.debug("Rules: {}", this.rules);
    ErrorReporter.emitErrorNode(this.collector, recordId, componentName,
            "No matching rule for record " + record, record);
    collector.fail(tuple);
}

I want to iterate over a set of Rules, and evaluate each Rule's Condition. If the condition applies, I act on that record and stop processing. If no Rules are applied, then I want to log that and perform different processing of the record.

I'm not exactly sure how to do this, though. Any help and explanation would be greatly appreciated.

EDIT:

I've tried this:

    this.rules.stream().filter(rule -> rule.getCondition().evaluate(rule, record)).forEach((rule) -> {
        Records.emit(collector, outputStreamMapping.get(rule.getOutputStream()), tuple, recordId, record);
        collector.ack(tuple);

        ruleApplied = true;
        break;
    });

It doesn't like the break statement, of course, and also complains about ruleApplied not being final since it's declared outside the scope of the lambda.

Based on the answers I'm seeing, it seems as though the loop is the cleanest way to do this. I wasn't sure if there were other stream constructs that would allow me to encapsulate the logic (i.e., the break and the tracking boolean) in a different manner than how I did in the basic loop.

EDIT2:

Here's my solution based on this thread's advice:

Optional<Rule> possibleRule = rules.stream().filter(rule -> rule.getCondition().evaluate(rule, record))
        .findFirst();

if (possibleRule.isPresent()) {
    Records.emit(collector, outputStreamMapping.get(possibleRule.get().getOutputStream()), tuple, recordId,
            record);
    collector.ack(tuple);
} else {
    LOGGER.warn("No rule was applied to record {}", record);
    LOGGER.debug("Rules: {}", this.rules);
    ErrorReporter.emitErrorNode(this.collector, recordId, componentName,
            "No matching rule for record " + record, record);
    collector.fail(tuple);
}
3
  • 3
    The ruleApplied = true line in particular is going to be difficult. The result is not going to be as nice as the code you already have. Commented Aug 25, 2015 at 16:55
  • @LouisWasserman agreed. But it's still possible Commented Aug 25, 2015 at 16:56
  • 1
    @LouisWasserman: Filter, then project with a side-effect, and count the result? Horrible, but it would work... Commented Aug 25, 2015 at 16:56

2 Answers 2

6

You can do

Optional<Rule> rule = rules.stream().
                       .filter(rule -> rule.getCondition().evaluate(rule, record))
                       .findFirst();
if (rule.isPresent()) {
    Records.emit(collector, outputStreamMapping.get(rule.get().getOutputStream()), 
                 tuple, recordId, record);
    collector.ack(tuple);

    ruleApplied = true;
}
Sign up to request clarification or add additional context in comments.

2 Comments

You don't need separate variable ruleApplied. The logic you want to execute if no rule is applied can simply be put inside else block.
@MrinalK.Samanta Correct, You would need to see how ruleApplied is being used, but the code which checks it might be merged into this if or else block. +1
1

You should be able to do this with Stream's anyMatch method and a Predicate that performs the necessary side effect of Records.emit, etc.

boolean ruleApplied = this.rules.stream().anyMatch(
    rule ->
    {
        boolean match = rule.getCondition().evaluate(rule, record);
        if (match)
        {
            Records.emit(collector, outputStreamMapping.get(rule.getOutputStream()), tuple, recordId, record);
            collector.ack(tuple);
        }
       return match;
   }
);

The anyMatch method is a short-circuiting operation that will stop processing when the Predicate returns true. The short-circuit will fulfill the break behavior you have.

Returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then false is returned and the predicate is not evaluated.

This is a short-circuiting terminal operation.

3 Comments

this seems like exactly what i want. i'll try it out and update with my findings.
@PeterLawrey's solution is much more straightforward and doesn't abuse side effects. I strongly recommend that solution instead; side effects in Predicates are deeply frowned upon by the documentation.
reading into docs more, you're right. for my specific use-case, the coupling of the expression evaluation and the action based on that is not problematic, but the practice is. thanks for the caution

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.