1

I have an Observable[T] which emits continuously and I want an Observable[List[T]] which emits the last elements in a specified duration for each element the source emits. Example

Observable. range(0, 100)   
    .delayExecution(1.second)
    // -- add here some operator combination which takes the parameter of 3 seconds

//should output:
// t0: List(0)
// t+1s: List(0,1)
// t+2s: List(0,1,2)
// t+3s: List(1,2,3)
// t+4s: List(2,3,4)

Note that the emmited List contains the last accumulated elements in the specified duration, and is emitted on every source item. The bufferTimed operator does not emit on every source item.

I am thinking of implemting an operator similar to monix.reactive.internal.operators.BufferTimedObservable, but with the logic described above, but I don't want to put in that effort if there is an easier way

2
  • Is there a scan or fold method available? You'd add new item at the end but limit to 3 max. Commented Nov 7, 2024 at 6:22
  • I want a similar thing to what you said, only it should be time based, not number-of-items based. Imagine the source not emitting items at equal intervals. Commented Nov 7, 2024 at 6:28

2 Answers 2

1

Your own solution is perfectly fine, but I wanted to propose a minor improvement:

import monix.reactive._
import scala.concurrent.duration._
import scala.collection.mutable
import scala.jdk.DurationConverters.JavaDurationOps
import java.time

def tailTimed[A](
    source: Observable[A],
    timespan: FiniteDuration,
): Observable[Seq[A]] = {
  source
    .scan(mutable.ArrayDeque.empty[(time.Instant, A)])((deque, next) => {
      val timestamp = time.Instant.now()
      deque.dropWhileInPlace {
        case (t, _) =>
          time.Duration.between(t, timestamp).toScala > timespan
      }
      deque += timestamp -> next
    })
    .map(_.view.map(_._2).toIndexedSeq)
}

It is mostly equivalent, but internally uses an ArrayDeque to have fast access to the two ends of the queue. It takes advantage of the fact that older elements are always at the start to avoid scanning the whole collection and new elements can be added to it efficiently.

Out of curiosity, I also implemented this as its own Observable:

import monix.execution.Ack.Continue
import monix.execution.{Ack, Cancelable, Scheduler}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber

import java.time.Instant
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.jdk.DurationConverters.ScalaDurationOps

private final class TailTimedObservable[+A](
    source: Observable[A],
    timespan: FiniteDuration,
) extends Observable[Seq[A]] {

  require(timespan > Duration.Zero, "timespan must be strictly positive")

  def unsafeSubscribeFn(out: Subscriber[Seq[A]]): Cancelable = {
    source.unsafeSubscribeFn(new Subscriber[A] {
      implicit val scheduler: Scheduler = out.scheduler

      private[this] val deque = mutable.ArrayDeque.empty[(Instant, A)]

      override def onNext(elem: A): Future[Ack] = {
        val tElem = java.time.Instant.now()
        deque.dropWhileInPlace {
          case (t, _) =>
            java.time.Duration
              .between(t, tElem)
              .minus(timespan.toJava)
              .isPositive
        }
        deque += ((tElem, elem))
        out.onNext(deque.view.map(_._2).toIndexedSeq)
        Continue
      }

      override def onError(ex: Throwable): Unit = out.onError(ex)

      override def onComplete(): Unit = out.onComplete()
    })
  }
}

Since this does not require a periodic action, this is a lot simpler than BufferTimedObservable.

It should be noted that there's a lot of optimization that can be potentially done here (mainly avoiding copying the deque before emitting it to the consumer), as well as testability improvements (e.g. avoiding calling Instant.now() and accepting a Clock as a parameter instead).

While the earlier implementation is probably more than good for most use cases, if these latter possible improvements are meaningful to you, then the custom Observable probably puts you in a better position to achieve them.

Sign up to request clarification or add additional context in comments.

2 Comments

Ok, after some experimentation I think copying is more or less unavoidable. If you have some sort of "view" of the underlying deque the previous output might change (which might be fine if you only need to interact with the "latest" item, but might otherwise just be a cause of confusion.
This is nice work, would merit its place as an out-of-the box operator in the rx implementations
1

After some suggestion in the comments, I came up with something that does the job, maybe someone has a more elegant solution?

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import java.time.ZonedDateTime
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

val events = Observable.range(0, 100).delayOnNext(1 second)

val timeWindow = 5 seconds

val timestamps = events.map(_ => ZonedDateTime.now())

events.zip(timestamps).scan(Seq[(Long, ZonedDateTime)]())((acc, event) => {
  (acc :+ event).filter(_._2.isAfter(ZonedDateTime.now().minusSeconds(timeWindow.toSeconds)))
}).map(_.map(_._1)).foreach(println)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.