0

I have a n-quantity of Java classes with one superclass - data model. The list of the classes is input parameter to Scala method in which I want to create resultStreams and have to create call Java generic methods from process method. Could you write how to resolve it? I tried to use [_ <: SpecificRecordBase], [SpecificRecordBase] in the method calling, but had the same result.

Error

Error:(146, 88) type mismatch;
 found   : Class[_$3] where type _$3 <: org.apache.avro.specific.SpecificRecordBase
 required: Class[org.apache.avro.specific.SpecificRecordBase]
Note: _$3 <: org.apache.avro.specific.SpecificRecordBase, but Java-defined class Class is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: org.apache.avro.specific.SpecificRecordBase`. (SLS 3.2.10)
                AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))

Scala code

object GenerickRunnerStackOverFlow  {
  def process(inputClasses :  List[Class[_ <: SpecificRecordBase]],): Unit = {
    val newStream: DataStream[KafkaSourceType] = env.addSource(....)).uid(...).filter(...)

    val resultStreams = inputClasses .map(
      cl => newStream.map(record =>
                AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))

        ...
  }

    def main(args: Array[String]): Unit = {
        val topicToClasses: List[Class[_ <: SpecificRecordBase]] = List(Types.RECORD_1.getClassType, Types.RECORD_1.getClassType.getClassType)
        process(topicToClasses)

    }
}

Java method spec

public static <A extends SpecificRecord> A deSerializeAvroObject(byte[] object, Class<A> clazz){ ...}

Model

    public class Record1 extends SpecificRecordBase {}
    public class Record2 extends SpecificRecordBase {}
    ...
    public enum Types {
      RECORD_1(Record1.class),
      RECORD_2(Record2.class);
      ....

      private Class<? extends SpecificRecordBase> clazz;
      public Class<? extends SpecificRecordBase> getClassType() {return this.clazz;}
}

Also I have the same message error with Scala addSink method:

def addSink(sinkFunction : org.apache.flink.streaming.api.functions.sink.SinkFunction[T]) : org.apache.flink.streaming.api.datastream.DataStreamSink[T] = { /* compiled code */ }

I write the wrapper method:

def addSinkWithSpecificRecordBase[A <: SpecificRecordBase](
    stream: DataStream[A],
    sink: BucketingSink[A]): DataStreamSink[A] = stream.addSink(sink)

In result of execution:

val result = topicToSinkStream.foreach { el =>
  val stream: DataStream[_ <: SpecificRecordBase] = el._2._1
  val sink: BucketingSink[_ <: SpecificRecordBase] = el._2._2
  addSinkWithSpecificRecordBase(stream, sink)
}

There was an error:

Error:(209, 37) type mismatch;
 found   : org.apache.flink.streaming.api.scala.DataStream[_$9] where type _$9 <: org.apache.avro.specific.SpecificRecordBase
 required: org.apache.flink.streaming.api.scala.DataStream[org.apache.avro.specific.SpecificRecordBase]
Note: _$9 <: org.apache.avro.specific.SpecificRecordBase, but class DataStream is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
      addSinkWithSpecificRecordBase(stream, sink)

Where topicToSinkStream is:

Map[String, (DataStream[_ <: SpecificRecordBase], BucketingSink[_ <: SpecificRecordBase])]

I also tried to eliminate the SpecificRecordBase in method gemeric description and Add + and - to method parameters description. But there is no result.

1 Answer 1

0

The issue is that the type of AvroHelper.deSerializeAvroObject(record.value, cl) is SpecificRecordBase (_ <: SpecificRecordBase is only allowed in type parameters, not here). The fix is to extract a helper function:

def processClass[A <: SpecificRecordBase](cl: Class[A], newStream: DataStream[KafkaSourceType]) = 
  newStream.map(record => AvroHelper.deSerializeAvroObject(record.value, cl))(TypeInformation.of(cl)))

(if you define it locally you can also use newStream without making it an argument) and then

val resultStreams = inputClasses.map(cl => processClass(cl, newStream))
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you) Could you please help me with method addSink of DataStream[A]. I updated the questions.
The type of your topicToSinkStream allows having a DataStream of one record type and BucketingSink of another, so it needs to be fixed first.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.