1

I am trying to connect Kafka to Flink and run via sql-client.sh. However, no matter what I do with .yaml and libraries, I keep getting the error:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.

Catalog options are:
'type'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
        at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
        at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
        ... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.

Available factory identifiers are:

generic_in_memory
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
        at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
        ... 11 more

My sql-conf is pretty simple (I didn't include sensitive information such as bootstrap servers):

catalogs:
 - name: myKafka
   type: kafka

In addition, the library folder includes the following jars:

  • flink-avro-confluent-registry-1.13.2.jar
  • flink-connector-kafka_2.12-1.13.2.jar
  • flink-sql-connector-kafka_2.12-1.13.2.jar
  • kafka-clients-2.0.0-cdh6.1.1.jar

The Flink version: 1.13.2. The Kafka version: 2.0.0-cdh6.1.1.

Solution (thanks to @Niko for pointing me in the right direction): I modified the sql-conf.yaml to use hive catalog and created Kafka table inside of the SQL. So, my sql-conf.yaml looks like:

execution:
  type: streaming
  result-mode: table
  planner: blink
  current-database: default
  current-catalog: myhive

catalogs:
  - name: myhive
    type: hive
    hive-version: 2.1.1-cdh6.0.1
    hive-conf-dir: /etc/hive/conf
  
deployment:
  m: yarn-cluster
  yqu: ABC_XYZ

ran it and inside of SQL-client.sh, created Kafka table using necessary connections.

1
  • In your case, I think you should not config catalogs type as kafka, all you need is creating a source table with connector type set to kafka. Commented Nov 30, 2021 at 12:36

1 Answer 1

1

All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box:

  • generic_in_memory
  • hive

You can read more about in official doc

You can create your so-called initialization SQL file like:

CREATE CATALOG MyCatalog WITH (
    'type' = 'hive',
    'default-database' = 'my_database',
    'hive-conf-dir' = '/etc/hive/conf'
  );

USE CATALOG MyCatalog;

CREATE TABLE MyTable(
  MyField1 INT,
  MyField2 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'YOUR_TOPIC',
  'properties.bootstrap.servers' = 'localhost',
  'properties.group.id' = 'some_id',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
)
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you. I avoided the init file all together but regardless, it worked.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.