0

I'm running a Flink cluster in Docker in my local env, and I've copied these jar files to the /opt/flink/lib/ of the image:

flink-cdc-dist-3.3.0.jar
flink-cdc-pipeline-connector-mysql-3.3.0.jar
flink-sql-connector-mysql-cdc-3.3.0.jar
mysql-connector-java-8.0.27.jar

And then I write a python file like this:

t_env = StreamTableEnvironment.create()

t_env.execute_sql(f"""
        CREATE TABLE my_table (
            xxx xxx
            ...
        ) WITH (
            'connector' = 'mysql-cdc',
            'hostname' = 'xxx',
            'port' = 'xxx',
            'username' = 'xxx',
            'password' = 'xxx',
            'database-name' = 'xxx',
            'table-name' = 'xxx'
        )
    """)

# Define the S3 sink table
t_env.execute_sql(f"""
        CREATE TABLE s3_table (
            xxx xxx,
            xxx xxx
            ...
        ) WITH (
            'connector' = 'filesystem',
            'path' = 's3://xxx/xxx',
            'format' = 'json'
        )
    """)

    # Insert data from MySQL CDC source to S3 sink
    t_env.execute_sql(f"""
        INSERT INTO s3_table
        SELECT * FROM my_table
    """)

# Execute the Flink job
env.execute('MySQL CDC to S3 Flink SQL J

And finally, I try to submit this to the Flink by executing the command on my local:

flink run -py flink_job.py -pyFiles requirements.txt 

But Flink returns an error:

org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.my_table'.
Table options are:

'connector'='mysql-cdc'
'database-name'='xxx'
'hostname'='xxx'
'password'='******'
'port'='xxx'
'table-name'='xxx'
'username'='xxx'
...
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
python-input-format
...

Why I get this error even I've copy thoese .jar files to the lib dir of Flink? How should I submit the job to Flink?

2
  • Did you restart your Flink cluster after adding the JARs? Did you add the JARs to each node? Commented Mar 4 at 11:02
  • @RobinMoffatt I've did both Commented Mar 5 at 1:08

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.