1

I am using default server.properties/zookeeper.properties files provided by Kafka framework.

I am trying to create a simple NodeJS app which would send messages to Producer and consume them.

Below is NodeJS code.

config.js

module.exports = {
  kafka_topic: 'catalog',
  kafka_server: 'localhost:9092',
};

nodejs-producer.js

const kafka = require('kafka-node');
const config = require('./config');

try {

    // set the desired timeout in options
    const options = {
        timeout: 5000,
    };
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient({kafkaHost: config.kafka_server, requestTimeout: 5000});
    const producer = new Producer(client);
    const kafka_topic = config.kafka_topic;
    let payloads = [
        {
            topic: kafka_topic,
            messages: 'This is test message'
        }
    ];

    producer.on('ready', async function() {
        let push_status = producer.send(payloads, (err, data) => {
            if (err) {
                console.log(err.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
            } else {
                console.log(data.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
            }
        });
    });

    producer.on('error', function(err) {
        console.log(err);
        console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
        throw err;
    });
}
catch(e) {
    console.log(e);
}

kafka version = 2.8.0 kafka-node version = 5.0.0

I am getting the error - Error: LeaderNotAvailable

How to fix this? I tried playing with different values in server.properties file like advertised.listeners but didn't get solution.

1

3 Answers 3

0

I have already answered this problem here

In short: this problem happens when trying to produce messages to a topic that doesn't exist.

You may configure your kafka installation to automatically create topic in such case: what will then happen is - in order: you will still receive the error message and the framework will create the topic. In my case i then had to re-produce the same message a second time but this was on an old version of Kafka.

EDIT: here a link to a post which explains how to setup your kafka configuration to automatically create kafka topics.

Sign up to request clarification or add additional context in comments.

1 Comment

I have created the topic already via CLI command. Details of topic are as below: -> Command: /zookeeper-shell.sh ZooKeeper -server localhost:2181 get /brokers/topics/catalog -> Result: {"partitions":{"0":[3,1],"1":[1,2],"2":[2,3]},"topic_id":"1nKW2XXXXXXmtkNQ_A","adding_replicas":{},"removing_replicas":{},"version":3}
0

I have also faced same issue while sending a message. I solved the issue by adding a partition in the payload and same partition is used in the consumer also.

Code I have used

Comments

0

Since I got this error in the development environment. I solved this problem by deleting the zookeeper snapshot and Kafka consumer offset.

NOTE: Don't do this on production.

rm -rf /tmp/zookeeper
rm -rf /tmp/kafka-logs

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.