2

I followed the following instructions to set up a multi node kafka cluster. Now, how to connect to the zookeeper ? Is it okay to connect to just one zookeeper from the Producer/consumer side in JAVA or is there a way to connect all the zookeeper nodes ?

Setting a multi node Apache ZooKeeper cluster

On every node of the cluster add the following lines to the file kafka/config/zookeeper.properties

    server.1=zNode01:2888:3888
    server.2=zNode02:2888:3888
    server.3=zNode03:2888:3888
    #add here more servers if you want
    initLimit=5
    syncLimit=2

On every node of the cluster create a file called myid in the folder represented by the dataDir property (by default the folder is /tmp/zookeeper ). The myid file should only contains the id of the znode (‘1’ for zNode01, ‘2’ for ZNode02, etc… )

Setting a multi broker Apache Kafka cluster

On every node of the cluster modify modify the property zookeeper.connect from the file kafka/config/server.properties:

    zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181

On every node of the cluster modify the property host.name from the file kafka/config/server.properties: host.name=zNode0x

On every node of the cluster modify the property broker.id from the file kafka/config/server.properties (every broker in the cluster should have a unique id)

2 Answers 2

3

You can pass all the nodes in the producer or consumer. Kafka is intelligent enough that it will connect to the node that has the data you required based on the replication factor or the partition

Here is the consumer code :

Properties props = new Properties();
     props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

You can find more info here

Note: Problem with this approch is it will open multiple connection to find out the which node holds the data. For more robust and scalable systems you can maintain the map of partition number and node name , this will help in load balencing also.

Here is the producer sample

Properties props = new Properties();
 props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

more info here

Sign up to request clarification or add additional context in comments.

5 Comments

What about creating multiple partitions for a topic? how can this be done? Don't we need to delegate that through the ZkClient like this ? Discussed here: stackoverflow.com/questions/27036923/…
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration);
You can use AdminUtils to create topics.. But the better practice is to create it in the node itself with command coz it's a onetime task.. Command format /bin/kafka-topics.sh --zookeeper c6401.ambari.apache.org:2181 --create --topic test_topic --partitions 2 --replication-factor 2 Created topic "test_topic".
I have to create topics dynamically when i send the data over to producer..
1

No need to pass Zookeeper connection properties in the Kafka clients (Producer & Consumer).

From Kafka-v9 and above, Kafka Producer and Consumer doesn't communicate with the Zookeeper.

4 Comments

I'm using V9, im gettting the following exception, tht zookeeper info is required in the properties ..
Caused by: java.lang.IllegalArgumentException: requirement failed: Missing required property 'zookeeper.connect' at scala.Predef$.require(Predef.scala:233) at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177) at kafka.utils.ZKConfig.<init>(ZkUtils.scala:740)
Use KafkaProducer and KafkaConsumer from the kafka-clients library.
The only dependency to access a Kafka Broker / Server is kafka-clients jar. Remove all the other jars / dependencies from your project. And, then test again.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.