3

I'm trying to get an example working that uses two different timeout values. An initial larger value for the first emission and then shorter values for all subsequent emissions. The example is converted to Kotlin from Java for RxJava v1x, although I am attempting this is v2x (not sure if that makes any difference).

The problem is that the timeout for the first event doesn't throw a TimeoutException. With the value set below 500ms, I'm expecting a stack trace to be printed, but I get output as if no timeout has occurred (subsequent emissions with timeout set to 40ms results in stack trace as expected). What is wrong with the following example that prevents the initial timeout from succeeding?

fun nextSolarEclipse(after: LocalDate): Observable<LocalDate> {
    return Observable
            .just(
                    LocalDate.of(2016, Month.MARCH, 9),
                    LocalDate.of(2016, Month.SEPTEMBER, 1),
                    LocalDate.of(2017, Month.FEBRUARY, 26),
                    LocalDate.of(2017, Month.AUGUST, 21),
                    LocalDate.of(2018, Month.FEBRUARY, 15),
                    LocalDate.of(2018, Month.JULY, 13),
                    LocalDate.of(2018, Month.AUGUST, 11),
                    LocalDate.of(2019, Month.JANUARY, 6),
                    LocalDate.of(2019, Month.JULY, 2),
                    LocalDate.of(2019, Month.DECEMBER, 26)
            )
            .skipWhile { date ->
                !date.isAfter(after)
            }
            .zipWith(
                    Observable.interval(500, 50, TimeUnit.MILLISECONDS),
                    { date, _ -> date }
            )
}

fun main(args: Array<String>) {
    nextSolarEclipse(LocalDate.now())
            .timeout<Long, Long>(
                    { Observable.timer(400, TimeUnit.MILLISECONDS) },
                    { Observable.timer(40, TimeUnit.MILLISECONDS) }
            )
            .subscribe(
                    { println(it) },
                    { it.printStackTrace() },
                    { println("Completed") }
            )

    TimeUnit.MILLISECONDS.sleep(2000)
}

Edit: 20-Jun-17

With Kotlin 1.1.2-5, using IntelliJ, with the suggested alteration applied, I still get the error. Attempting to run the code anyway results, as I would expect, with:

Error:(34, 21) Kotlin: Interface Function does not have constructors

Interface Function does not have constructors

1 Answer 1

4

This is yet another case of Kotlin implying you need a function instead of a Observable instance. Try this:

.timeout<Long, Long>(
     Observable.timer(400, TimeUnit.MILLISECONDS),
     Function { Observable.timer(40, TimeUnit.MILLISECONDS) }
)
Sign up to request clarification or add additional context in comments.

6 Comments

If I do that, I get an interface Function does not have constructors, and I don't think I want to implement it as an anonymous object.
I'm not experienced with Kotlin and without that Function prefix the code won't compile for me. I'm guessing it's the notorious SAM issue. Perhaps you can write { it -> ... } instead.
If I try it any other way other than as originally posted, I get the error None of the following functions can be called with the arguments supplied. The function I think I want is @CheckReturnValue @SchedulerSupport public final fun <U : Any!, V : Any!> timeout(firstTimeoutIndicator: ObservableSource<Long!>!, itemTimeoutIndicator: Function<in LocalDate!, out ObservableSource<Long!>!>!): Observable<LocalDate!>! defined in io.reactivex.Observable, which is as you suggest in your initial comment, but with/without Function I still get the aforementioned errors.
The example I gave in the answer works for me with Kotlin 1.1.2.
You may have to manually import io.reactivex.functions.Function.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.