Skip to content

Commit b803b84

Browse files
committed
Added example of usage
1 parent ceaf4f2 commit b803b84

File tree

17 files changed

+451
-50
lines changed

17 files changed

+451
-50
lines changed

README.md

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
# Prioritizing Event Processing with Apache Kafka
22

3-
Implement message prioritization in [Apache Kafka](https://kafka.apache.org) is often a hard task because Kafka doesn't support broker-level reordering of messages like some messaging technologies do. Though some developers see this as a limitation, the reality is that it isn't because Kafka is not supposed to allow message reordering. Kafka is a distributed [commit log](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying) and therefore messages are immutable and so their ordering is within partitions. This doesn't change the fact the developers may need to implement message prioritization in Kafka.
3+
Implement event processing prioritization in [Apache Kafka](https://kafka.apache.org) is often a hard task because Kafka doesn't support broker-level reordering of messages like some messaging technologies do. Though some developers see this as a limitation, the reality is that it isn't because Kafka is not supposed to allow message reordering. Kafka is a distributed [commit log](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying) and therefore messages are immutable and so their ordering is within partitions. This doesn't change the fact the developers may need to implement event processing prioritization in Kafka.
44

5-
This project aims to address this problem while still proving a way to keep the implementation code simple. In Kafka, [partitions are a unit-of-parallelism, unit-of-storage, and unit-of-durability](https://www.buildon.aws/posts/in-the-land-of-the-sizing-the-one-partition-kafka-topic-is-king/01-what-are-partitions). However, when developers write code to handle partitions directly they end up writing a rather more complex code, and often need to give up of some facilities that the Kafka architecture provides such as automatic rebalancing of consumers when new partitions are added and/or when a group leader fails. This becomes even more important when developers are interacting with Kafka via frameworks like [Kafka Connect](https://kafka.apache.org/documentation/#connect) and [Kafka Streams](https://kafka.apache.org/documentation/streams/) that, by design, don't expect that partitions are handled directly.
6-
7-
This project addresses message prioritization by grouping partitions into simpler abstractions called buckets that express priority given their size. Bigger buckets mean a higher priority, and smaller buckets mean less priority. The project also addresses code simplicity by providing a way to do all of this with the pluggable architecture of Kafka.
5+
In Kafka, [partitions are a unit-of-parallelism, unit-of-storage, and unit-of-durability](https://www.buildon.aws/posts/in-the-land-of-the-sizing-the-one-partition-kafka-topic-is-king/01-what-are-partitions). However, when developers write code to handle partitions directly they end up writing a rather more complex code, and often need to give up of some facilities that the Kafka architecture provides such as automatic rebalancing of consumers when new partitions are added and/or when a group leader fails. This becomes even more important when developers are interacting with Kafka via frameworks like [Kafka Connect](https://kafka.apache.org/documentation/#connect) and [Kafka Streams](https://kafka.apache.org/documentation/streams/) that, by design, don't expect that partitions are handled directly. This project addresses event processing prioritization via the bucket pattern. It groups partitions into simpler abstractions called buckets that express priority given their size. Bigger buckets mean a higher priority, and smaller buckets mean less priority. The project also addresses code simplicity by providing a way to do all of this with the pluggable architecture of Kafka.
86

97
Let's understand how this works with an example.
108

@@ -16,7 +14,7 @@ To ensure that each message will end up in their respective bucket, use the `Buc
1614

1715
![Assignor Overview](images/assignor-overview.png)
1816

19-
With the bucket priority, you can implement message prioritization by having more consumers working on buckets with higher priorities, while buckets with less priority can have fewer consumers. Message prioritization can also be obtained by executing these consumers in an order that gives preference to processing high priority buckets before the less priority ones. While coordinating this execution might involve some extra coding from your part (perhaps using some sort of scheduler) you don't have to implement low-level code to manage partition assignment and keep your consumers simple by leveraging the standard `subscribe()` and `poll()` methods.
17+
With the bucket priority, you can implement event processing prioritization by having more consumers working on buckets with higher priorities, while buckets with less priority can have fewer consumers. Event processing prioritization can also be obtained by executing these consumers in an order that gives preference to processing high priority buckets before the less priority ones. While coordinating this execution may involve some extra coding from you (perhaps using some sort of scheduler) you don't have to implement low-level code to manage partition assignment and keep your consumers simple by leveraging the standard `subscribe()` and `poll()` methods.
2018

2119
## Building the project
2220

@@ -80,7 +78,7 @@ Discarding any message that can't be sent to any of the buckets is also possible
8078
8179
```bash
8280
configs.setProperty(BucketPriorityConfig.FALLBACK_PARTITIONER_CONFIG,
83-
"blog.buildon.aws.streaming.kafka.DiscardPartitioner");
81+
"code.buildon.aws.streaming.kafka.DiscardPartitioner");
8482
```
8583
8684
## Using the assignor

docker-compose.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
services:
2+
3+
kafka:
4+
image: confluentinc/cp-kafka:7.3.2
5+
hostname: kafka
6+
container_name: kafka
7+
ports:
8+
- "9092:9092"
9+
environment:
10+
KAFKA_NODE_ID: 1
11+
KAFKA_BROKER_ID: 1
12+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
13+
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
14+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
15+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
16+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
17+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
18+
KAFKA_PROCESS_ROLES: 'broker,controller'
19+
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
20+
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
21+
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
22+
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
23+
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
24+
volumes:
25+
- ./scripts/workaround.sh:/tmp/workaround.sh
26+
command: "bash -c '/tmp/workaround.sh && /etc/confluent/docker/run'"
27+
healthcheck:
28+
test: echo srvr | nc kafka 9092 || exit 1
29+
interval: 5s
30+
retries: 10

pom.xml

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
<?xml version="1.0" ?>
1+
<?xml version="1.0"?>
22

3-
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
4-
http://maven.apache.org/xsd/maven-4.0.0.xsd"
3+
<project
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
55
xmlns="http://maven.apache.org/POM/4.0.0"
66
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
77

88
<modelVersion>4.0.0</modelVersion>
99

10-
<groupId>blog.buildon.aws.streaming.kafka</groupId>
10+
<groupId>code.buildon.aws.streaming.kafka</groupId>
1111
<artifactId>bucket-priority-pattern</artifactId>
1212
<description>Pattern that groups topic partitions into buckets so these buckets can be processed in a given priority order.</description>
1313
<version>1.0.0</version>
@@ -25,6 +25,7 @@
2525

2626
<properties>
2727
<kafka.clients.version>3.4.1</kafka.clients.version>
28+
<slf4j.api.version>2.0.7</slf4j.api.version>
2829
<junit.jupiter.version>5.9.3</junit.jupiter.version>
2930
</properties>
3031

@@ -33,7 +34,11 @@
3334
<groupId>org.apache.kafka</groupId>
3435
<artifactId>kafka-clients</artifactId>
3536
<version>${kafka.clients.version}</version>
36-
<scope>provided</scope>
37+
</dependency>
38+
<dependency>
39+
<groupId>org.slf4j</groupId>
40+
<artifactId>slf4j-api</artifactId>
41+
<version>${slf4j.api.version}</version>
3742
</dependency>
3843
<dependency>
3944
<groupId>org.junit.jupiter</groupId>
@@ -49,45 +54,29 @@
4954
<groupId>org.apache.maven.plugins</groupId>
5055
<artifactId>maven-compiler-plugin</artifactId>
5156
<version>3.11.0</version>
52-
<inherited>true</inherited>
5357
<configuration>
54-
<release>11</release>
58+
<release>17</release>
5559
</configuration>
5660
</plugin>
5761
<plugin>
5862
<groupId>org.apache.maven.plugins</groupId>
59-
<artifactId>maven-source-plugin</artifactId>
60-
<version>3.3.0</version>
61-
<executions>
62-
<execution>
63-
<id>attach-sources</id>
64-
<goals>
65-
<goal>jar</goal>
66-
</goals>
67-
</execution>
68-
</executions>
69-
</plugin>
70-
<plugin>
71-
<groupId>org.apache.maven.plugins</groupId>
72-
<artifactId>maven-surefire-plugin</artifactId>
73-
<version>3.1.2</version>
63+
<artifactId>maven-assembly-plugin</artifactId>
64+
<version>3.6.0</version>
7465
<configuration>
75-
<argLine>
76-
--illegal-access=permit
77-
</argLine>
78-
</configuration>
79-
</plugin>
80-
<plugin>
81-
<groupId>org.apache.maven.plugins</groupId>
82-
<artifactId>maven-failsafe-plugin</artifactId>
83-
<version>3.1.2</version>
84-
<configuration>
85-
<argLine>
86-
--illegal-access=permit
87-
</argLine>
66+
<descriptorRefs>
67+
<descriptorRef>jar-with-dependencies</descriptorRef>
68+
</descriptorRefs>
8869
</configuration>
70+
<executions>
71+
<execution>
72+
<id>make-assembly</id>
73+
<phase>package</phase>
74+
<goals>
75+
<goal>single</goal>
76+
</goals>
77+
</execution>
78+
</executions>
8979
</plugin>
9080
</plugins>
9181
</build>
92-
93-
</project>
82+
</project>

scripts/workaround.sh

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/bin/sh
2+
3+
##########################################################################
4+
################################ Important ###############################
5+
##########################################################################
6+
## This script implements workarounds for the current Docker image of ##
7+
## Apache Kafka from Confluent. Eventually, newer images will fix the ##
8+
## issues found here, and this script will no longer be required. ##
9+
##########################################################################
10+
11+
# Workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
12+
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
13+
14+
# Workaround: Ignore cub zk-ready
15+
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure
16+
17+
# KRaft required: Format the storage directory with a new cluster ID
18+
export KAFKA_CLUSTER_ID="p8fFEbKGQ22B6M_Da_vCBw"
19+
echo "kafka-storage format --ignore-formatted -t $KAFKA_CLUSTER_ID -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package blog.buildon.aws.streaming.kafka;
2+
3+
import java.time.Duration;
4+
import java.util.ArrayList;
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import java.util.Properties;
8+
9+
import org.apache.kafka.clients.consumer.ConsumerConfig;
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.apache.kafka.clients.consumer.ConsumerRecords;
12+
import org.apache.kafka.clients.consumer.KafkaConsumer;
13+
import org.apache.kafka.common.serialization.StringDeserializer;
14+
15+
import static blog.buildon.aws.streaming.kafka.utils.KafkaUtils.ALL_ORDERS;
16+
import static blog.buildon.aws.streaming.kafka.utils.KafkaUtils.createTopic;
17+
import static blog.buildon.aws.streaming.kafka.utils.KafkaUtils.getConfigs;
18+
19+
public class AllOrdersConsumer {
20+
21+
private class ConsumerThread extends Thread {
22+
23+
private String threadName;
24+
private KafkaConsumer<String, String> consumer;
25+
26+
public ConsumerThread(String threadName, Properties configs) {
27+
28+
this.threadName = threadName;
29+
30+
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
31+
StringDeserializer.class.getName());
32+
33+
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
34+
StringDeserializer.class.getName());
35+
36+
configs.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
37+
configs.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ALL_ORDERS + "-group");
38+
39+
consumer = new KafkaConsumer<>(configs);
40+
consumer.subscribe(Arrays.asList(ALL_ORDERS));
41+
42+
}
43+
44+
@Override
45+
public void run() {
46+
for (;;) {
47+
ConsumerRecords<String, String> records =
48+
consumer.poll(Duration.ofSeconds(Integer.MAX_VALUE));
49+
for (ConsumerRecord<String, String> record : records) {
50+
System.out.println(String.format("[%s] Key = %s, Partition = %d",
51+
threadName, record.key(), record.partition()));
52+
}
53+
}
54+
}
55+
56+
}
57+
58+
private final List<ConsumerThread> consumerThreads = new ArrayList<>();
59+
60+
private void run(int numberOfThreads, Properties configs) {
61+
for (int i = 0; i < numberOfThreads; i++) {
62+
String threadName = String.format("Consumer-Thread-%d", i);
63+
consumerThreads.add(new ConsumerThread(threadName, configs));
64+
}
65+
consumerThreads.stream().forEach(ct -> ct.start());
66+
}
67+
68+
public static void main(String[] args) {
69+
createTopic(ALL_ORDERS, 6, (short) 3);
70+
if (args.length >= 1) {
71+
int numberOfThreads = Integer.parseInt(args[0]);
72+
new AllOrdersConsumer().run(numberOfThreads, getConfigs());
73+
}
74+
}
75+
76+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package blog.buildon.aws.streaming.kafka;
2+
3+
import java.util.Properties;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
import org.apache.kafka.common.utils.Utils;
11+
12+
import static blog.buildon.aws.streaming.kafka.utils.KafkaUtils.ALL_ORDERS;
13+
import static blog.buildon.aws.streaming.kafka.utils.KafkaUtils.createTopic;
14+
import static blog.buildon.aws.streaming.kafka.utils.KafkaUtils.getConfigs;
15+
16+
public class AllOrdersProducer {
17+
18+
private void run(Properties configs) {
19+
20+
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
21+
StringSerializer.class.getName());
22+
23+
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
24+
StringSerializer.class.getName());
25+
26+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
27+
28+
AtomicInteger counter = new AtomicInteger(0);
29+
String[] buckets = {"Platinum", "Gold"};
30+
31+
for (;;) {
32+
33+
int value = counter.incrementAndGet();
34+
int index = Utils.toPositive(value) % buckets.length;
35+
String recordKey = buckets[index] + "-" + value;
36+
37+
ProducerRecord<String, String> record =
38+
new ProducerRecord<>(ALL_ORDERS, recordKey, "Value");
39+
40+
producer.send(record, (metadata, exception) -> {
41+
System.out.println(String.format(
42+
"Record with key '%s' was sent to partition %d",
43+
recordKey, metadata.partition()));
44+
});
45+
46+
try {
47+
Thread.sleep(1000);
48+
} catch (InterruptedException ie) {
49+
}
50+
51+
}
52+
53+
}
54+
55+
}
56+
57+
public static void main(String[] args) {
58+
createTopic(ALL_ORDERS, 6, (short) 3);
59+
new AllOrdersProducer().run(getConfigs());
60+
}
61+
62+
}

0 commit comments

Comments
 (0)