2

Flink version is 1.10.0

codes as follow:

public class WorkerOnlineStatusRun {
  private static String datahubEndpoint = GlobalParameter.getPublicEndpoint();
  private static String redisServer = GlobalParameter.getRedisServer();
  private static long datahubStartInMs;


  public static void main(String[] args) throws Exception {
    if (args.length != 0) {
      if (args[0] == null) {
        datahubStartInMs = GlobalParameter.getDatahubStartInMs();
      } else {
        datahubStartInMs =
            TimeUtils.convertLocalDateTimeStr2Long(args[0]); // ""2022-05-26T08:55:00.000"
      }
    }


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


    // checkpoint every 10 min
    env.getCheckpointConfig().setCheckpointInterval(1_000L * 60 * 10);


    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


    // env.setParallelism(1);


    env.getConfig().setAutoWatermarkInterval(GlobalParameter.getWatermarkInterval());


    long currTS = System.currentTimeMillis();


    KeyedStream<WorkSign, Object> workStatus =
        env.fromElements(
                new WorkSign("worker_id1", currTS, WorkTimestampFlagType.STARTWORK),
                new WorkSign(
                    "worker_id1", currTS + 1_000L * 60 * 20, WorkTimestampFlagType.STOPWORK))
            .uid("worksign")
            .returns(new TypeHint<WorkSign>() {})
            .keyBy(r -> r.getWorker_uniqe_id()); // this wrong


    DatahubSourceTrace.getSingleOutputStreamOperatorWorker(env, datahubEndpoint, datahubStartInMs)
        .uid("worker")
        .returns(new TypeHint<Worker>() {})
        .keyBy(r -> r.getWorker_uniqe_id())
        .connect(workStatus)
        .process(new checkWorkerOnlineStatusFunction());


    env.execute();
  }
}

The following error is :

Exception in thread "main" java.lang.UnsupportedOperationException: Key types if input KeyedStreams don't match: String and GenericType<java.lang.Object>.
    at org.apache.flink.streaming.api.datastream.ConnectedStreams.transform(ConnectedStreams.java:366)
    at org.apache.flink.streaming.api.datastream.ConnectedStreams.process(ConnectedStreams.java:339)
    at org.apache.flink.streaming.api.datastream.ConnectedStreams.process(ConnectedStreams.java:307)
    at com.kursk.WorkerOnlineStatusRun.main(WorkerOnlineStatusRun.java:60)

But if the lambda expression is written by replacing it with double colons

public class WorkerOnlineStatusRun {
  private static String datahubEndpoint = GlobalParameter.getPublicEndpoint();
  private static String redisServer = GlobalParameter.getRedisServer();
  private static long datahubStartInMs;

  public static void main(String[] args) throws Exception {
    if (args.length != 0) {
      if (args[0] == null) {
        datahubStartInMs = GlobalParameter.getDatahubStartInMs();
      } else {
        datahubStartInMs =
            TimeUtils.convertLocalDateTimeStr2Long(args[0]); // ""2022-05-26T08:55:00.000"
      }
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // checkpoint every 10 min
    env.getCheckpointConfig().setCheckpointInterval(1_000L * 60 * 10);

    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // env.setParallelism(1);

    env.getConfig().setAutoWatermarkInterval(GlobalParameter.getWatermarkInterval());

    long currTS = System.currentTimeMillis();

    KeyedStream<WorkSign, Object> workStatus =
        env.fromElements(
                new WorkSign("worker_id1", currTS, WorkTimestampFlagType.STARTWORK),
                new WorkSign(
                    "worker_id1", currTS + 1_000L * 60 * 20, WorkTimestampFlagType.STOPWORK))
            .uid("worksign")
            .returns(new TypeHint<WorkSign>() {})
            .keyBy(WorkSign::getWorker_uniqe_id);  // modify only here ,replace with ClassName:instanceMethodName

    DatahubSourceTrace.getSingleOutputStreamOperatorWorker(env, datahubEndpoint, datahubStartInMs)
        .uid("worker")
        .returns(new TypeHint<Worker>() {})
        .keyBy(r -> r.getWorker_uniqe_id())
        .connect(workStatus)
        .process(new checkWorkerOnlineStatusFunction());

    env.execute();
  }
}

And it works fine!

Of course, this getWorker_uniqe_id returns a String type, but that still doesn't explain why the lambda expression doesn't work?

public String getWorker_uniqe_id() {
    return worker_uniqe_id;
}

I google java mehtod reference, there is no mention of the difference between the effect of lamdba and double colons, who can tell me what the cause of this error is

1 Answer 1

0

o,I may have found the answer to my question

The problem should be with implicit type conversion, this code

KeyedStream<WorkSign, Object> workStatus.....

No type conversion is done in lambda, while double colons with implicit type conversion

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.