I met problem with testing Kafka Producer after change custom Producer to KafkaTemplate.
For tests reason I wrote next class:
public class KafkaTestingTools {
static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();
static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
Properties properties = new Properties();
properties.put("schema.registry.url", "http://localhost:8081");
properties.put("bootstrap.servers", "http://localhost:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("linger.ms", 1);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
producer.send(record);
producer.close();
}
static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup" + UUID.randomUUID().toString(), "true", embeddedKafka);
consumerProps.put("schema.registry.url", "http://localhost:8081");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
consumers.put(topic, consumer);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);
CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
}
static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
return SpecificData.get().deepCopy(
CustomKafkaAvroDeserializer.getTopicScheme(topic),
KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
);
}
}
Producer example:
@Service
@CommonsLog
public class PointProducer {
private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
private final String topic;
@Autowired
public PointProducer(@Value("${kafka.producer.points}") String topic,
KafkaTemplate<String, ExportMessage> kafkaTemplate) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}
public void produce(Point point) {
var message = new ExportMessage();
message.setId(point.getId());
log.warn("produce point: " + message.toString());
kafkaTemplate.send(topic, point.getId().toString(), message);
kafkaTemplate.flush();
}
kafka config
spring:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
auto-offset-reset: latest
group-id: credit_file_test
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
schema-registry-url: http://localhost:8081
kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point
And tests looks like:
@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
@Order(1)
@WithMockUser(roles = "ADMIN")
void checkSendToKafka() throws Exception {
KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
Thread.sleep(3000);
prepareCustomizedLogisticOrder(t -> {
});
var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));
}
And on line with perform I caught:
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)
I tried to put it in application.yml, in KafkaTestingTools properties, but nothing changed, it looks like Spring looks for this property in another place.
Maybe someone met this situation and know solution?
Thanks in advance.
KafkaTemplateanywhere in the above configuration, so it's not clear what you are talking about.CustomKafkaAvroDeserializer, notspring-kafka...PointProduceris used when/orders/7000000/sendToKafkais done. Please, show your YAML forspring.kafkaconfiguration.