3

I'm trying to execute the python UDF function in SQL DDL(1.14.0)

Python file here:

from pyflink.table import DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a: int):
    return a + 1

And start flink cluster:

➜  flink-1.14.0 ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host magiclian-ubuntu.
Starting taskexecutor daemon on host magiclian-ubuntu.

Java code here:

public class PyUDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //set cfg
        tEnv.getConfig().getConfiguration().setString("python.files",
                "/home/magic/workspace/python/flinkTestUdf/udfTest.py");
        tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
        tEnv.executeSql(
                "CREATE TEMPORARY SYSTEM FUNCTION add1 AS 'udfTest.add_one' LANGUAGE PYTHON"
        );

        TableResult ret1 = tEnv.executeSql("select add1(3)");
        ret1.print();

        env.execute();
    }
}

And then run the job through Flink client:

flink run /home/magic/workspace/flink-jobs/UDF/pythonUDF/target/pythonUDF-1.0.0.jar

Error is :

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: SQL validation failed. Cannot instantiate user-defined function 'add1'.

But when i use sql-client to execute my py UDF, it runs successfully.

Start sql-client:

PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3  ./sql-client.sh embedded -pyexec /usr/bin/python3 -pyfs home/magic/workspace/python/flinkTestUdf/udfTest.py

Then

create temporary system function add1 as 'udfTest.add_one' language python;

Then

select add1(3);

I got the correct result 4 and is there something wrong with my code?

I see that the py UDF function was supported in version 1.11 https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL, but now i'm using 1.14.0.

Who can help me out!

2
  • Did you end up solving this? Commented Apr 12, 2022 at 4:53
  • Is there any update, could you solve the problem?. Commented Jul 13, 2022 at 12:49

2 Answers 2

1

Make sure the dependencies all installed.

Java :

  • 8 or 11

  • maven 3.5+

  • flink jars:

      <dependencies>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-java</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-streaming-java_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-clients_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-common</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-table-planner_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-python_2.11</artifactId>
                 <version>${flink.version}</version>
             </dependency>
         </dependencies>
    

Python :

  • Python 3.6+
  • Apache Beam(== 2.19.0)
  • pip(>= 7.1.0)
  • setupTools(>= 37.0.0)
  • apache-fink (1.14.0)
Sign up to request clarification or add additional context in comments.

Comments

0

Make sure that the Pyflink version and Flink version in Java match. For the new commers, the current pom.xml should be

<properties>
        (...)
    <flink.version>1.15.1</flink.version>
</properties>
<dependencies>
    <!-- Apache Flink dependencies -->
    <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Add connector dependencies here. They must be in the default scope (compile). -->
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- Add logging framework, to produce console output when running in the IDE. -->
    <!-- These dependencies are excluded from the application JAR by default. -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.17.2</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.17.2</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.17.2</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/net.sf.py4j/py4j -->
    <dependency>
        <groupId>net.sf.py4j</groupId>
        <artifactId>py4j</artifactId>
        <version>0.10.9.5</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <version>${flink.version}</version>
        <type>pom</type>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- added as chunk -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table</artifactId>
        <version>${flink.version}</version>
        <type>pom</type>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-python -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-python_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!-- added as chunk -->
</dependencies>

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.