0

I am trying to use TestContainers to set up integration testing for kafka. I am using generic apache kafka client in my code for setting up kafka producer and consumer.Now my scenario is to test a flow ,which internally calls kafka ,so while integration testing it,I want to set-up the kafka container,but encountering issues,which I have added below.Any other alternative is also fine

I have set-up below codes.

KafkaTestConfig:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestConfiguration(proxyBeanMethods = false)
open class KafkaTestConfig {

companion object {
    val kafkaNetwork = Network.newNetwork()
}

@Bean
@ServiceConnection
open fun kafkaContainer(): ConfluentKafkaContainer {
    val kafkaContainer = ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0")
        .apply {
            withNetwork(kafkaNetwork)
            //   withListener("kafka:9092")

            withExposedPorts(9092)
            withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT")
            withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092")
            withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092")
            withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
            withEnv("KAFKA_NUM_PARTITIONS", "1")
            withNetworkAliases("kafka")
            waitingFor(Wait.forListeningPort())

        }
    println("kafkaContainer: ${kafkaContainer.containerId} - ${kafkaContainer.getMappedPort(9092)}")
    return kafkaContainer
}

@Bean
@DependsOn("kafkaContainer")
open fun schemaRegistryContainer(kafkaContainer: ConfluentKafkaContainer): GenericContainer<Nothing> {
    val schemaRegistryContainer = GenericContainer<Nothing>("confluentinc/cp-schema-registry:7.8.0")
        .apply {
            withExposedPorts(8081)
            withNetwork(kafkaNetwork)
            withNetworkAliases("schema-registry")
            withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
            withEnv("SCHEMA_REGISTRY_CUB_KAFKA_MIN_BROKERS", "1")
            withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://localhost:9092")
            withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
            waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
        }
    println("schemaRegistryContainer: ${schemaRegistryContainer.containerId} - ${schemaRegistryContainer.firstMappedPort}")
    return schemaRegistryContainer
}

@Bean
open fun setKafkaProperties(
    kafkaContainer: ConfluentKafkaContainer,
    schemaRegistryContainer: GenericContainer<Nothing>
): DynamicPropertyRegistrar {
    return DynamicPropertyRegistrar { registry ->
        // registry.add("spring.kafka.bootstrap-servers") { kafkaContainer.bootstrapServers }

        registry.add("spring.kafka.bootstrap-servers") {
            "${kafkaContainer.host}:${kafkaContainer.getMappedPort(9092)}"
        }

        registry.add("kafka.schema-registry-url") { "http://${schemaRegistryContainer.host}:${schemaRegistryContainer.firstMappedPort}" }
        registry.add("schema.registry.url") { "http://${schemaRegistryContainer.host}:${schemaRegistryContainer.firstMappedPort}" }
    }
}}

KafkaTestUtils

object KafkaTestUtils {


fun producerProps(): Properties = Properties().apply {
    put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") // Replace with actual Kafka bootstrap server
    put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
    put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer::class.java.name)
    put("schema.registry.url", "http://localhost:8081")
}


fun <K, V> createProducer(): KafkaProducer<K, V> =
    KafkaProducer(producerProps())}

In my cucumber step def:


When("^ kafka producer is called with id (\\S+)$") { id: String ->
    val producer = createProducer<String, TaskEvent>()

    val record = ProducerRecord(
        "TaskList", id,
        TaskEvent.newBuilder()
            .setClientId(profileId)
            //other attributes
            .setProviderUniqueId(id)
            .build()
    )
    producer.send(record).get()
    producer.close()
}

But while running this integration test I get error:

kafka-producer-network-thread ö producer-1Å DEBUG o.apache.kafka.clients.NetworkClient ÄProducer clientId=producer-1Å Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=8, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.8.1')
2025-06-27 10:35:24,205 Äkafka-producer-network-thread ö producer-1Å ERROR o.a.k.c.producer.internals.Sender ÄProducer clientId=producer-1Å Uncaught error in kafka producer I/O thread:  ö org.apache.kafka.common.protocol.types.SchemaException: Buffer underflow while parsing response for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=8, headerVersion=2) ö      at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:777) ö  at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:949) ö     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:604) ö        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) ö   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ö    at java.base/java.lang.Thread.run(Thread.java:840) ö Caused by: java.nio.BufferUnderflowException: null ö  at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:707) ö    at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:435) ö  at org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) ö       at org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:90) ö        at org.apache.kafka.common.message.ResponseHeaderData.<init>(ResponseHeaderData.java:66) ö         at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:100) ö     at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:100) ö         at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:775)

How can I fix it.I already tried changing the versions of image but still not working

4
  • Are you trying to test the setup of the test-container (the progression of the kafka message from sender to receiver, or are you trying to test the handling of the message via a kafka consumer, and you are basically emulating the kafka in order to get a message into your consumption pipeline? Because if you are 'merely' trying to test your consumer, you can bypass the entire emulator. Commented Jun 27 at 10:20
  • In my flow, I get an http request,for which after doing certain operations on it,I need to send it to kafka topic,so I need some way to simulate scenario where it publishes to kafka,currently it fails when the code tries to send it ,since it tries to invoke the actual code Commented Jun 27 at 10:26
  • Are both your container image and the library you are using the latest ones available? There seem to be broken versions that can yield this exception accidentally. Commented Jun 27 at 10:42
  • I can also see, when going through the current version of the NetworkClient, that the exception you get should be thrown in line 828, not 777 of the NetworkClient class, if you were using the latest version, which suggestes, that there should be a newer version already. Commented Jun 27 at 10:51

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.