0

I met problem with testing Kafka Producer after change custom Producer to KafkaTemplate.

For tests reason I wrote next class:

public class KafkaTestingTools {

    static private Map<String, Consumer<Long, GenericData.Record>> consumers = new HashMap<>();

    static public void sendMessage (String topic, String key, Object message, Schema schema) throws InterruptedException{
        Properties properties = new Properties();
        properties.put("schema.registry.url", "http://localhost:8081");
        properties.put("bootstrap.servers", "http://localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("linger.ms", 1);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "com.logistics.mock.CustomKafkaAvroSerializer");
        KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);

        CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);

        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, message);
        producer.send(record);
        producer.close();
    }

    static public void registerConsumerContainer(EmbeddedKafkaBroker embeddedKafka, String topic, Schema schema) throws InterruptedException{
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup" + UUID.randomUUID().toString(), "true", embeddedKafka);
        consumerProps.put("schema.registry.url", "http://localhost:8081");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "com.logistics.mock.CustomKafkaAvroDeserializer");
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        ConsumerFactory<Long, GenericData.Record> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Long, GenericData.Record> consumer = cf.createConsumer();
        consumers.put(topic, consumer);

        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic);

        CustomKafkaAvroDeserializer.setTopicScheme(topic, schema);
    }

    static public Object getSingleRecordFromRegisteredContainer(EmbeddedKafkaBroker embeddedKafka, String topic){
        return SpecificData.get().deepCopy(
            CustomKafkaAvroDeserializer.getTopicScheme(topic),
            KafkaTestUtils.getSingleRecord(consumers.get(topic), topic).value()
        );
    }
    
}

Producer example:

@Service
@CommonsLog
public class PointProducer {

    private final KafkaTemplate<String, ExportMessage> kafkaTemplate;
    private final String topic;

    @Autowired
    public PointProducer(@Value("${kafka.producer.points}") String topic,
            KafkaTemplate<String, ExportMessage> kafkaTemplate) {
        this.topic = topic;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void produce(Point point) {
        var message = new ExportMessage();
        message.setId(point.getId());
        log.warn("produce point: " + message.toString());
        kafkaTemplate.send(topic, point.getId().toString(), message);
        kafkaTemplate.flush();
    }

kafka config

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      point-deserializer: com.logistics.mock.CustomKafkaAvroDeserializer
      auto-offset-reset: latest
      group-id: credit_file_test
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: com.logistics.mock.CustomKafkaAvroSerializer
    schema-registry-url: http://localhost:8081

kafka.consumer.points: points_export
kafka.producer.files: common.file
kafka.producer.orders: common.order
kafka.producer.points: common.point

And tests looks like:

@SpringBootTest
@TestMethodOrder(OrderAnnotation.class)
@EmbeddedKafka(partitions = 1, topics = { "topic1", "topic2" }, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ApplicationLogisticOrderTest {
@Test
    @Order(1)
    @WithMockUser(roles = "ADMIN")
    void checkSendToKafka() throws Exception {
        KafkaTestingTools.registerConsumerContainer(this.embeddedKafka, TOPIC1, Message.SCHEMA$);
        Thread.sleep(3000);
        prepareCustomizedLogisticOrder(t -> {
        });
        var mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
        mockMvc.perform(MockMvcRequestBuilders.put("/orders/7000000/sendToKafka"));

}

And on line with perform I caught:

Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
    at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
    at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.<init>(AbstractKafkaSchemaSerDeConfig.java:177)
    at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:376)

I tried to put it in application.yml, in KafkaTestingTools properties, but nothing changed, it looks like Spring looks for this property in another place.

Maybe someone met this situation and know solution?

Thanks in advance.

7
  • Spring doesn't "look" for any Kafka or serializer properties; it just supplies them to the Kafka clients from the properties. You don't show a KafkaTemplate anywhere in the above configuration, so it's not clear what you are talking about. Commented Nov 17, 2021 at 20:44
  • Please, show the whole stack trace. I fully might be related to your CustomKafkaAvroDeserializer, not spring-kafka... Commented Nov 17, 2021 at 20:46
  • @GaryRussell I add producer to topic Commented Nov 17, 2021 at 20:56
  • @ArtemBilan add stack trace Commented Nov 17, 2021 at 20:58
  • I guess your PointProducer is used when /orders/7000000/sendToKafka is done. Please, show your YAML for spring.kafka configuration. Commented Nov 17, 2021 at 21:17

1 Answer 1

2

The problem is here:

spring: kafka: schema-registry-url: http://localhost:8081

There is no such a property managed by Spring Boot. More over this schema-registry-url doesn't fit to that schema.registry.url.

You have to consider to change it into this:

spring:
  kafka:
    producer:
      properties:
        "schema.registry.url": http://localhost:8081

See docs for more info: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.kafka.additional-properties

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.