I am trying to use TestContainers to set up integration testing for kafka. I am using generic apache kafka client in my code for setting up kafka producer and consumer.Now my scenario is to test a flow ,which internally calls kafka ,so while integration testing it,I want to set-up the kafka container,but encountering issues,which I have added below.Any other alternative is also fine
I have set-up below codes.
KafkaTestConfig:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestConfiguration(proxyBeanMethods = false)
open class KafkaTestConfig {
companion object {
val kafkaNetwork = Network.newNetwork()
}
@Bean
@ServiceConnection
open fun kafkaContainer(): ConfluentKafkaContainer {
val kafkaContainer = ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0")
.apply {
withNetwork(kafkaNetwork)
// withListener("kafka:9092")
withExposedPorts(9092)
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT")
withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092")
withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092")
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
withEnv("KAFKA_NUM_PARTITIONS", "1")
withNetworkAliases("kafka")
waitingFor(Wait.forListeningPort())
}
println("kafkaContainer: ${kafkaContainer.containerId} - ${kafkaContainer.getMappedPort(9092)}")
return kafkaContainer
}
@Bean
@DependsOn("kafkaContainer")
open fun schemaRegistryContainer(kafkaContainer: ConfluentKafkaContainer): GenericContainer<Nothing> {
val schemaRegistryContainer = GenericContainer<Nothing>("confluentinc/cp-schema-registry:7.8.0")
.apply {
withExposedPorts(8081)
withNetwork(kafkaNetwork)
withNetworkAliases("schema-registry")
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
withEnv("SCHEMA_REGISTRY_CUB_KAFKA_MIN_BROKERS", "1")
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://localhost:9092")
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
waitingFor(Wait.forHttp("/subjects").forStatusCode(200))
}
println("schemaRegistryContainer: ${schemaRegistryContainer.containerId} - ${schemaRegistryContainer.firstMappedPort}")
return schemaRegistryContainer
}
@Bean
open fun setKafkaProperties(
kafkaContainer: ConfluentKafkaContainer,
schemaRegistryContainer: GenericContainer<Nothing>
): DynamicPropertyRegistrar {
return DynamicPropertyRegistrar { registry ->
// registry.add("spring.kafka.bootstrap-servers") { kafkaContainer.bootstrapServers }
registry.add("spring.kafka.bootstrap-servers") {
"${kafkaContainer.host}:${kafkaContainer.getMappedPort(9092)}"
}
registry.add("kafka.schema-registry-url") { "http://${schemaRegistryContainer.host}:${schemaRegistryContainer.firstMappedPort}" }
registry.add("schema.registry.url") { "http://${schemaRegistryContainer.host}:${schemaRegistryContainer.firstMappedPort}" }
}
}}
KafkaTestUtils
object KafkaTestUtils {
fun producerProps(): Properties = Properties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") // Replace with actual Kafka bootstrap server
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer::class.java.name)
put("schema.registry.url", "http://localhost:8081")
}
fun <K, V> createProducer(): KafkaProducer<K, V> =
KafkaProducer(producerProps())}
In my cucumber step def:
When("^ kafka producer is called with id (\\S+)$") { id: String ->
val producer = createProducer<String, TaskEvent>()
val record = ProducerRecord(
"TaskList", id,
TaskEvent.newBuilder()
.setClientId(profileId)
//other attributes
.setProviderUniqueId(id)
.build()
)
producer.send(record).get()
producer.close()
}
But while running this integration test I get error:
kafka-producer-network-thread ö producer-1Å DEBUG o.apache.kafka.clients.NetworkClient ÄProducer clientId=producer-1Å Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=8, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.8.1')
2025-06-27 10:35:24,205 Äkafka-producer-network-thread ö producer-1Å ERROR o.a.k.c.producer.internals.Sender ÄProducer clientId=producer-1Å Uncaught error in kafka producer I/O thread: ö org.apache.kafka.common.protocol.types.SchemaException: Buffer underflow while parsing response for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=producer-1, correlationId=8, headerVersion=2) ö at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:777) ö at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:949) ö at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:604) ö at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) ö at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) ö at java.base/java.lang.Thread.run(Thread.java:840) ö Caused by: java.nio.BufferUnderflowException: null ö at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:707) ö at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:435) ö at org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) ö at org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:90) ö at org.apache.kafka.common.message.ResponseHeaderData.<init>(ResponseHeaderData.java:66) ö at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:100) ö at org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:100) ö at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:775)
How can I fix it.I already tried changing the versions of image but still not working