1

I am getting an Exception in thread "main" java.lang.IllegalAccessError: class org.apache.flink.state.api.runtime.SavepointLoader tried to access protected method org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(Ljava/lang/String;)Lorg/apache/flink/runtime/state/CompletedCheckpointStorageLocation; (org.apache.flink.state.api.runtime.SavepointLoader and org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage are in unnamed module of loader 'app')

Using flink 1.8. Using below maven repo :

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-state-processor-api_2.12</artifactId>
      <version>1.9.1</version>
    </dependency>

Source code snippet

        ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(bEnv, "/home/utlesh/Documents/savepoint", new MemoryStateBackend()) ;
        savepoint.readListState("input-events-source-01", "Custom Source", TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>(){}));

Getting exception on second line which calls below function

    public static ExistingSavepoint load(ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
    org.apache.flink.runtime.checkpoint.savepoint.Savepoint savepoint = SavepointLoader.loadSavepoint(path);
    ...
    ...
}

Which calls below function :

    package org.apache.flink.state.api.runtime;

    public static Savepoint loadSavepoint(String savepointPath) throws IOException {
        CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
            .resolveCheckpointPointer(savepointPath);

        try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
            return Checkpoints.loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader());
        }
    }

which calls below function :

    package org.apache.flink.runtime.state.filesystem;

    protected static CompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
        checkNotNull(checkpointPointer, "checkpointPointer");
        checkArgument(!checkpointPointer.isEmpty(), "empty checkpoint pointer");
       ...
       ...
}

If we see carefully, protected function of different package is called here. Is this a bug in flink maven repo or it's me using it wrong way ? Is there any other way to deserialize or read flink savepoint and checkpoint ?

2 Answers 2

1

There seems to be an dependency version mismatch for your flink.

Add the below dependencies to the pom.xml and build again, also remove the old version dependency of the flink-clients from same file.

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.11</artifactId>
      <version>1.9.1</version>
</dependency>
Sign up to request clarification or add additional context in comments.

Comments

1

The State Processor API can only be used in batch jobs running Flink 1.9 or greater, but it can be used to read savepoints and checkpoints that were written by streaming jobs running older versions of Flink (back to Flink 1.6).

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.