3

I am trying to read lines from a reader and group them to blocks that belong together.

Source text:

bla1
bla2
### block separator ###
bla3
bla4
### block separator ###
...

I need to get the two blocks (bla1, bla2) and (bla3, bla4).

Code:

import org.apache.commons.lang3.StringUtils;
import rx.Observable;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Reader;
import java.util.Iterator;

public class BlockBuilder {

  public static void main(String[] args) {

    try {
      FileReader fileReader = new FileReader("/path/to/some/file");
      LineIterable lineIterable = new LineIterable(fileReader);

      Observable.from(lineIterable)
          .buffer(100)
          // Needed instead of time/count: until line matches condition
          // something like .buffer(line -> line.equals("### block separator ###")
          .forEach(gatheredLines -> {
            String gatheredBlock = StringUtils.join(gatheredLines, '\n');
            System.out.println(gatheredBlock);
            System.out.println("###### ###### ###### ######");
          });
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  private static class LineIterable implements Iterable<String> {
    private final Iterator<String> iterator;
    public LineIterable(Reader reader) {
      iterator = new BufferedReader(reader).lines().iterator();
    }
    @Override
    public Iterator<String> iterator() {
      return iterator;
    }
  } 
}

It doesn't matter if buffer or window is used or if I'm completely wrong for thinking of those two.

I thought it must be possible with the bufferClosingSelector for the buffer or the closingSelector for the window. Both are functions that create an Observer which can trigger the closing of the current buffer or window but I can't see where I can get hold of the current line here.

1 Answer 1

3

You can publish your source and use it for both buffering and buffer boundary:

Observable<String> source = Observable.just(
        "a", "b", "#", 
        "c", "d", "e", "#", 
        "f", "g");

source.publish(p -> 
        p.filter(v -> !"#".equals(v))
        .buffer(() -> p.filter(v -> "#".equals(v))))
.subscribe(System.out::println);
Sign up to request clarification or add additional context in comments.

4 Comments

That would be great and it compiles with no error. But as soon as I try this code I get a huge error: Error:(16, 25) java: no suitable method found for buffer(()->p.filt[...]s(v))) method rx.Observable.<TClosing>buffer(rx.functions.Func0<? extends rx.Observable<? extends TClosing>>) is not applicable (cannot infer type-variable(s) TClosing (argument mismatch; bad return type in lambda expression rx.Observable<java.lang.String> cannot be converted to ? extends rx.Observable<? extends TClosing>)) [...]
Works for me from Eclipse 4.5. You may need to add explicit <String> before the second p.filter() call.
Can someone explain what is the use of publish() in this scenario?
The signals of the source must be multicast to multiple sub-flows without subscribing to the source multiple times (which is not guaranteed to emit the same data).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.