13

I have the following code:

source
    .mapValues(value -> value + " Stream it!!!")
    .print(Printed.toSysOut());

as you can see, mapValues expects a lambda expression.

Now, I am using Java library but the application is written in Scala. How to pass Scala lambda to Java code?

I tried the following:

source
  .mapValues(value => value + "hello")
  .print(Printed.toSysOut)

But the compiler complains:

[error]   (x$1: org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)])Unit <and>
[error]   (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: ?0(in value x$1), String])Unit <and>
[error]   (x$1: String)Unit
[error]  cannot be applied to (org.apache.kafka.streams.kstream.Printed[Nothing,Nothing])
[error]       .print(Printed.toSysOut)
[error]        ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 2 s, completed Nov 19, 2017 7:53:44 PM
1
  • Generally Scala is more type strict than Java so quite often you need to specify the generic type parameter when calling Java function. Commented Sep 10, 2019 at 17:19

2 Answers 2

15

It depends on your version of Scala.

In 2.12 Scala functions can be used in places where Java functions are expected and vice versa.

App1.java

import java.util.function.Function;

public class App1 {
    public static void method(Function<String, String> function) {
        System.out.println(function.apply("a"));
    }

    public static void main(String[] args) {
        App.method1((String s) -> s.toUpperCase());
    }
}

App.scala

object App {
  def main(args: Array[String]): Unit = {
    App1.method((s: String) => s.toUpperCase)
  }

  def method1(function: String => String): Unit = {
    println(function("xyz"))
  }
}

In 2.11 you can use scala-java8-compat

libraryDependencies += "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"

App1.java

import java.util.function.Function;
import static scala.compat.java8.JFunction.func;

public class App1 {
    public static void method(Function<String, String> function) {
        System.out.println(function.apply("a"));
    }

    public static void main(String[] args) {
        App.method1(func((String s) -> s.toUpperCase()));
    }
}

App.scala

import scala.compat.java8.FunctionConverters._

object App {
  def main(args: Array[String]): Unit = {
    App1.method(((s: String) => s.toUpperCase).asJava)
  }

  def method1(function: String => String): Unit = {
    println(function("xyz"))
  }
}

Alternatively in 2.11 in Scala you can define implicit converters between java.util.function.Function and scala.Function1.

So if you use 2.11 try

source
  .mapValues((value => value + "hello").asJava)
  .print(Printed.toSysOut) 

or

source
  .mapValues(((value: String) => value + "hello").asJava)
  .print(Printed.toSysOut[String, String])
Sign up to request clarification or add additional context in comments.

Comments

5

The error message lists the types of arguments that print supports. One of them is:

org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)]

From the error message you can see that you're providing Printed.toSysOut with a type of:

org.apache.kafka.streams.kstream.Printed[Nothing,Nothing]

According to the Kafka 1 javadoc (Printed was not present in Kafka 1.1), toSysOut is defined as:

public static <K,V> Printed<K,V> toSysOut()

So the answer problem is that Scala is inferring K and V with types of Nothing. You need to provide the types explicitly.

The following will probably work:

source
  .mapValues[String](value -> value + " Stream it!!!")
  .print(Printed.toSysOut[String,String])

4 Comments

In 2.11 this line probably will not compile: .mapValues(value => value + "hello").
I've tried as you said, but the compiler still says [error] (x$1: org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)])Unit <and> [error] (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: ?0(in value x$1), String])Unit <and> [error] (x$1: String)Unit [error] cannot be applied to (org.apache.kafka.streams.kstream.Printed[String,String]) [error] .print(Printed.toSysOut[String, String]) [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed
Scala may be having trouble inferring the KStream type after the call to mapValues. According to the javadocs for mapValues you can supply a type to make this clearer. Can you try something like .mapValues[String](...) to help the Scala compiler work out the type?
You save my day. Thanks so much.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.