2

Let's assume that you want to store events from some stream into the database and a client library to that database is asynchronous.

E.g. there is a method writeEvent(event: MyEvent): Future[Boolean] that you have to call inside onNext of your observer when the next event is emitted. Is there a good way to do this otherwise than blocking on the Future?

The only way that I currently see on how to implement this, is to create some custom Scheduler that allows me to return thread to the pool, until async code inside onNext is complete.

1 Answer 1

2

You do not want to block like that inside your onNext subscriber callback, that would defeat Rx. It can be chained together in a more idiomatic way.

I don't work with Futures much, but I wonder if Observable.from(Future) would work:

someStream
    .flatMap(evt => Observable.from(writeEvent(evt)))
    .subscribe(
        next => ...,
        err => ...
)
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your answer. It seems that it could work, I'll try it and report later.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.