1

I am trying to write a while loop in a functional way in scala. What I want to do is populate a list with the messages from a queue (Kafka in this case but doesn't really matter).

I'm doing this for an integration test and since Kafka is running remotely when the tests are running in CI, the test fails some times because Kafka does not return any messages. So I wrote a loop that will query Kafka until I get back all the results I expect (otherwise the test will timeout after a while and fail). I have this right now:

var result = List[Int]()
while (result.size < expectedNumberOfMessages) {
    result = result ++ kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator().toList.map(_.value.getPayload)
}

This works fine but it looks horrible to me. Plus if it was production code it would also be inefficient. Can anyone suggest a better way of doing this functionally?

0

2 Answers 2

2

If you plan on keeping the while loop, I would first suggest you use a scala.collection.mutable.ListBuffer instead of an immutable List. This will prevent making copies of the whole list in memory on each iteration.

If you want a more "functional" way of writing the above code while keeping the Consumer API (instead of Kafka Streams API), you could manually define a scala Stream like so:

import scala.util.Random

// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
    val size = Random.nextInt(10)
    println("fetching messages")
    Thread.sleep(1000)
    (1 to size).map(_ => Random.nextInt(10)).toList
}

lazy val s: Stream[Int] = Stream.continually(poll()).flatten

// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:

/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/
Sign up to request clarification or add additional context in comments.

Comments

2

Something like this perhaps?

def pollKafka = kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator.map(_.value.getPayload)

Iterator
  .continually(pollKafka)
  .flatten
  .take(expectedNumberOfMessages)
  .toList

Iterator is internally mutable but if you use its high level functional interface and don't reuse an Iterator it's perfectly fine IMHO.

If you want to go functional streams all the way down, you could consider a library like fs2.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.