3

My cluster configuration, class details and jar versions are mentioned in the question org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

I have started Zookeeper-server, Kafka-server and Kafka REST server. Next I am deploying my spring-boot war file named spring-kafka-webhook-service.war file on tomcat.

As I am posting messages through Kafka REST proxy client, I am getting the below error, which probably suggests the @KafkaListener method is failing to read ConsumerRecord incoming message. Any inputs will be highly appreciated.

My Kafka-Rest properties is currently configured as below:

confluent-3.3.0/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-test-server
schema.registry.url=http://localhost:8081
zookeeper.connect=localhost:2181

Error Log after war deployment on tomcat

2017-12-26 09:11:01.143 ERROR 20430 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = inventory, partition = 0, offset = 3, CreateTime = 1514279460946, checksum = 1183108784, serialized key size = -1, serialized value size = 72, key = null, value = InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'])

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.psl.kafka.spring.InventoryEventReceiver.listenWithHeaders(com.psl.kafka.spring.InventoryEvent,java.lang.String,java.lang.Integer,int,java.lang.String)]
Bean [com.psl.kafka.spring.InventoryEventReceiver@798267fb]; nested exception is org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer], failedMessage=GenericMessage [payload=InventoryEvent [id=7798, eventType='inventory.transaction', qtyReq='5', qtyLevel='27'], headers={kafka_offset=3, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=inventory}]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:183) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:568) [spring-kafka-1.1.7.RELEASE.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.Integer]
        at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:100) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:103) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:135) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
        at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.7.RELEASE.jar:na]
        ... 8 common frames omitted

2 Answers 2

3

Used the @KafkaListener using a method with only POJO "InventoryEvent" as a param

InventoryEvent event

instead of

@Payload InventoryEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) String offset

This solved the issue as kafka_receivedMessageKey is never sent over Kafka as specified in this SO answer by Artem Bilan https://stackoverflow.com/a/32125453/786676

Sign up to request clarification or add additional context in comments.

2 Comments

It is sent there if the producer is Spring Integration Kafka. OTOH the @Header has required option to ignore absent header
Ok got it! Thanks
0

In order to make this work I had to pass a messageId(key) from the Producer and pass it in the send() method. Like so:

public void sendMessage(CallStack callStack) {
        String topicName = "my-topic-name";
        String messageId = UUID.randomUUID().toString();
        callStack.setId(messageId);
        ListenableFuture<SendResult<String, CallStack>> future = kafkaTemplate
          .send(topicName,messageId,callStack);

        //This will check producer result asynchronously to avoid thread blocking
        future.addCallback(new ListenableFutureCallback<SendResult<String, CallStack>>() {
            @Override
            public void onFailure(@NotNull Throwable throwable) {
                log.error("Failed to send message", throwable);
            }

            @Override
            public void onSuccess(SendResult<String, CallStack> stringStringSendResult) {

                log.info(String.format("Produced:\ntopic: %s\noffset: %d\npartition: %d\nvalue size: %d", stringStringSendResult.getRecordMetadata().topic(),
                        stringStringSendResult.getRecordMetadata().offset(),
                        stringStringSendResult.getRecordMetadata().partition(),
                        stringStringSendResult.getRecordMetadata().serializedValueSize()));
            }
        });
    }

Consumer Service:

 @KafkaListener(topics = "perfEvent", containerFactory = "kafkaListenerContainerFactory")
        public void consume(final @Payload CallStack callStack,
                            final @Header(KafkaHeaders.OFFSET) Integer offset,
                            final @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                            final @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            final @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                            final @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
            log.info(String.format("#### -> Consumed message -> TIMESTAMP: %d\n%s\noffset: %d\nkey: %s\npartition: %d\ntopic: %s",
                    ts, callStack, offset, key, partition, topic));
            log.info("Persisting message id: {}",callStack.getId());
            callStackRepository.insert(callStack);
    
    
        }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.