2

I am trying to create a kafka connector after connect-distributed command is executed. I wrote a entrypoint.sh script and used it with CMD. I have docker file like this:

FROM confluentinc/cp-kafka
RUN mkdir /plugins
RUN mkdir /config
COPY kafka-connect-couchbase-*.jar /plugins/
COPY config /config/
RUN chmod +x /config/stage/entrypoint.sh
ENV EXPOSED_PORT 8083
CMD /config/stage/entrypoint.sh

I have entrypoint script file as:

connect-distributed config/"${DEPLOY_ENV}"/connect-distributed.properties
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors

deploy_env is irrelevant, it is coming from jenkins. config file and distributed.properties is irrelevant too, and it is correct, I tried it manually.

Kafka connect starts without a problem, however curl command for creating connector has no effect.

In short, I want to create a connector after connect-distributed is started without executing any rest request outside of container. How do I achieve that?

2 Answers 2

11

You need to make sure that you're waiting for the Kafka Connect worker to full start up.

BTW you're better off starting with the Kafka Connect base image

FROM confluentinc/cp-kafka-connect-base:5.5.0

Normally you'd use Confluent Hub to install the connector but it looks like the Couchbase one isn't there, so you'll have to copy in the JAR like you've done.

The actual script to fire up Kafka Connect in the Connect image is /etc/confluent/docker/run, so your /config/stage/entrypoint.sh should look like this:

# Launch the worker
/etc/confluent/docker/run &

# Wait for it to start running
# Change the port here if not using the default
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"

# Now create your connector
## Inline config example: 
curl -i -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-file-jsonschema-as-json/config \
    -d '{
            "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "tasks.max": 1,
            "file": "/jsonschema-as-json.txt",
            "topics": "test-jsonschema"
}'
## External file example: 
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors

See also https://rmoff.net/2018/12/15/docker-tips-and-tricks-with-ksql-and-kafka/

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for the answer. Where do you pass connect-distributed.properties file in this example?
You don’t - it’s done in the environment variables which build the properties file at runtime
With little tweak for checking if connector exists like this one, I got it working. if [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors/cbconnector) -ne 200 ] then curl -X POST -H "Content-Type: application/json" -d @config/stage/config.json http://localhost:8083/connectors fi Just one additional question, since we are running kafka as backround in here, in kubernetes it is not seen as running state, do you have any example for kube? I didn't find it in your blog.
Instead of a check and POST, just use PUT :) I don't have any examples for kube, sorry
I figured it out and added it as an answer. Thanks.
1

Thanks to Robin Moffatt's brilliant solution, I combined it with my own needs and it worked.

Since I deploy image to the kubernetes, /etc/confluent/docker/run & background command causes container to pass to completed state instead of running. This makes container unreachable from outside with Rest interface like this:

http://some-ip:31682/connectors

In order to solve this, I used Dockerfile in original question, however modified Robin's script by removing confluent docker run command and adding additional if to check whether connector exist or not beforehand.

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"

if [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors/cbconnector2) -ne 200 ]
then
  curl -X POST -H "Content-Type: application/json" -d @config/stage/config.json http://localhost:8083/connectors
fi'

After that, I modified Kubernetes deployment file by adding post start lifecycle and give entrypoint.sh script as a command to be executed like this:

lifecycle:
  postStart:
    exec:
      command: ["/bin/sh", "/config/stage/entrypoint.sh"] 

Basically, it first starts kafka-connect and after process(pod) is started, I simply execute my custom shell script to create kafka connectors.

Hope this helps for anyone who has similar usage scenario. I am also open for other(better) solution ideas. Many thanks to Robin Moffatt.

1 Comment

This is exactly what I was looking for. Works well when deploying in K8S. I used envsubst to substitute environment variables in my config.json.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.