1

We have source cluster with security protocol SASL_SSL and target cluster without any authorization. Just PLAINTEXT. MirrorMaker2 in kafka conenct mode writes messages about connection problem:

INFO [AdminClient clientId=src->tgt|my_mm2_connector|replication-source-admin] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -1 being disconnected (elapsed time since creation: 334ms, elapsed time since send: 334ms, request timeout: 3600000ms) (org.apache.kafka.clients.NetworkClient:344)

I've noticed two different AdminClientConfigs in logs:

  1. client.id = src->tgt|my_mm2_connector|offset-syncs-target-admin that connects to kafka_plaintext:9092 bootstrap server
  2. client.id = src->tgt|my_mm2_connector|replication-source-admin that connects to kafa_sasl_ssl:9092 bootstrap server

Both of them was created with security.protocol=PLAINTEXT. If I change admin.security.protocol to SASL_SSL It will affect on both AdminClientConfigs as well.

Does in possible to replicate topics between secured and unsecured kafka ?

Here is my mm2 config:

{
  "name": "my_mm2_connector",
  "config": {
    "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "source.cluster.alias": "src",
    "target.cluster.alias": "tgt",
    "source.cluster.bootstrap.servers": "kafa_sasl_ssl:9092",
    "target.cluster.bootstrap.servers": "kafka_plaintext:9092",
    "topics": "test_topic",
    "tasks.max": 1,
    "replication.factor": 1,
    "offset-syncs.topic.replication.factor": 1,
    "offset-syncs.topic.location": "target",
    "enabled": true,
    "source.security.protocol": "SASL_SSL",
    "source.ssl.keystore.type": "JKS",
    "source.ssl.truststore.type": "JKS",
    "source.ssl.truststore.location": "/opt/ssl/kafka.truststore.jks",
    "source.ssl.keystore.password": "changeit",
    "source.ssl.keystore.location": "/opt/ssl/kafka.keystore.jks",
    "source.ssl.truststore.password": "changeit",
    "source.sasl.mechanism": "PLAIN",
    "source.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-password\"",
    "target.security.protocol": "PLAINTEXT",
    "admin.security.protocol": "PLAINTEXT"
  }
}

2 Answers 2

1

It turns out there is a group of specific parameters like that:

source.consumer.security.protocol
target.producer.security.protocol
source.admin.security.protocol
target.admin.security.protocol

and so on. So now my config looks like this and it works:

//mm2.json
{
  "name": "my_mm2_connector",
  "config":
{
  "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
  "source.cluster.alias": "src",
  "target.cluster.alias": "tgt",
  "source.cluster.bootstrap.servers": "kafa_sasl_ssl:9092",
  "target.cluster.bootstrap.servers": "kafka_plaintext:9092",
  "topics": "test_topic",
  "tasks.max": 1,
  "replication.factor": 1,
  "offset-syncs.topic.replication.factor": 1,
  "offset-syncs.topic.location": "target",
  "enabled": true,
  "source.security.protocol": "SASL_SSL",
  "source.ssl.keystore.type": "JKS",
  "source.ssl.truststore.type": "JKS",
  "source.ssl.truststore.location": "/opt/ssl/kafka.truststore.jks",
  "source.ssl.keystore.password": "changeit",
  "source.ssl.keystore.location": "/opt/ssl/kafka.keystore.jks",
  "source.ssl.truststore.password": "changeit",
  "source.sasl.mechanism": "PLAIN",
  "source.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-password\";",
  "source.consumer.security.protocol": "SASL_SSL",
  "source.consumer.ssl.keystore.type": "JKS",
  "source.consumer.ssl.truststore.type": "JKS",
  "source.consumer.ssl.truststore.location": "/opt/ssl/kafka.truststore.jks",
  "source.consumer.ssl.keystore.password": "changeit",
  "source.consumer.ssl.keystore.location": "/opt/ssl/kafka.keystore.jks",
  "source.consumer.ssl.truststore.password": "changeit",
  "source.consumer.sasl.mechanism": "PLAIN",
  "source.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-password\";",
  "source.admin.security.protocol": "SASL_SSL",
  "source.admin.ssl.keystore.type": "JKS",
  "source.admin.ssl.truststore.type": "JKS",
  "source.admin.ssl.truststore.location": "/opt/ssl/kafka.truststore.jks",
  "source.admin.ssl.keystore.password": "changeit",
  "source.admin.ssl.keystore.location": "/opt/ssl/kafka.keystore.jks",
  "source.admin.ssl.truststore.password": "changeit",
  "source.admin.sasl.mechanism": "PLAIN",
  "source.admin.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"kafka\" password=\"kafka-password\";",
  "target.security.protocol": "PLAINTEXT"
}
}

root@kafka$>curl -X POST -H "Content-Type: application/json" http://kafka-connect:8083/connectors -d @mm2.json
Sign up to request clarification or add additional context in comments.

1 Comment

My understanding is that without specifying the consumer or admin config, all properties are passed into all client types
0

I tried a test for you, Accually there're two important things:

1-) Created a admin user:

/opt/kafka/bin/kafka-acls.sh \
    --bootstrap-server broker1:9092,broker2:9092,broker3:9092 \
    --command-config /opt/kafka/config/admin_client.conf \
    --add \
    --allow-principal "User:kafka" \
    --operation All \
    --topic '*' \
    --group '*' \
    --cluster

2-) security.protocol:

  • security.protocol => we're tring to make SASL_SSL ==> PLAINTEXT, but you used to:

    "source.sasl.mechanism": "PLAIN", << # must be GSSAPI OR SCRAM
    
  • also you made a typo:

    "source.ecurity.protocol": "SASL_SSL",  << # ecurity = security
    

heres what I did (I check topic to topic. All data migrated)

# Define the clusters to be used
clusters = A, B

# Connection info for each cluster
A.bootstrap.servers = broker1:9092,broker2:9092,broker3:9092
B.bootstrap.servers = kafka1:9092,kafka2:9092,kafka3:9092

# Source cluster (A) - SASL_SSL with SCRAM-SHA-256
A.security.protocol = SASL_SSL
A.sasl.mechanism = SCRAM-SHA-256
A.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="cagri3541";
A.ssl.keystore.location = /opt/kafka/config/jks/broker1.keystore.jks
A.ssl.keystore.password = cagri3541
A.ssl.key.password = cagri3541
A.ssl.truststore.location = /opt/kafka/config/jks/broker1.truststore.jks
A.ssl.truststore.password = cagri3541

# Target cluster (B) - PLAINTEXT
B.security.protocol = PLAINTEXT

# Enable/disable replication flows
A->B.enabled = true
B->A.enabled = false

# Replication topic filter
A->B.topics = cumba-test-topic

# Offset and heartbeat management
A->B.exclude.internal.topics = true
A->B.sync.group.offsets.enabled = true
A->B.emit.heartbeats.enabled = true
A->B.emit.checkpoints.enabled = true

# Replication policy - keep topic names unchanged
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy

# Converters
key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
header.converter = org.apache.kafka.connect.converters.ByteArrayConverter

# Interval tuning (in seconds)
emit.heartbeats.interval.seconds = 1
emit.checkpoints.interval.seconds = 1
refresh.topics.interval.seconds = 5
refresh.groups.interval.seconds = 5
sync.topic.configs.interval.seconds = 5
sync.group.offsets.interval.seconds = 5

# Internal task tuning
tasks.max = 12

# Replication factor for new and internal topics
replication.factor = 3
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3

# Replication factor for Kafka Connect internal topics
config.storage.replication.factor = 3
offset.storage.replication.factor = 3
status.storage.replication.factor = 3

# Flush timeout for offset commits
offset.flush.timeout.ms = 10800000

# Producer idempotence
producer.override.enable.idempotence = true
producer.override.acks = all
producer.override.retries = 2147483647
producer.override.max.in.flight.requests.per.connection = 5
producer.override.retry.backoff.ms = 1000
producer.override.request.timeout.ms = 5000
producer.override.delivery.timeout.ms = 120000

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.