I have the following code:
source
.mapValues(value -> value + " Stream it!!!")
.print(Printed.toSysOut());
as you can see, mapValues
expects a lambda expression.
Now, I am using Java library but the application is written in Scala. How to pass Scala lambda to Java code?
I tried the following:
source
.mapValues(value => value + "hello")
.print(Printed.toSysOut)
But the compiler complains:
[error] (x$1: org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)])Unit <and>
[error] (x$1: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: ?0(in value x$1), String])Unit <and>
[error] (x$1: String)Unit
[error] cannot be applied to (org.apache.kafka.streams.kstream.Printed[Nothing,Nothing])
[error] .print(Printed.toSysOut)
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 2 s, completed Nov 19, 2017 7:53:44 PM
The error message lists the types of arguments that print
supports. One of them is:
org.apache.kafka.streams.kstream.Printed[String,?0(in value x$1)]
From the error message you can see that you're providing Printed.toSysOut
with a type of:
org.apache.kafka.streams.kstream.Printed[Nothing,Nothing]
According to the Kafka 1 javadoc (Printed
was not present in Kafka 1.1), toSysOut
is defined as:
public static <K,V> Printed<K,V> toSysOut()
So the answer problem is that Scala is inferring K
and V
with types of Nothing
. You need to provide the types explicitly.
The following will probably work:
source
.mapValues[String](value -> value + " Stream it!!!")
.print(Printed.toSysOut[String,String])
It depends on your version of Scala.
In 2.12 Scala functions can be used in places where Java functions are expected and vice versa.
App1.java
import java.util.function.Function;
public class App1 {
public static void method(Function<String, String> function) {
System.out.println(function.apply("a"));
}
public static void main(String[] args) {
App.method1((String s) -> s.toUpperCase());
}
}
App.scala
object App {
def main(args: Array[String]): Unit = {
App1.method((s: String) => s.toUpperCase)
}
def method1(function: String => String): Unit = {
println(function("xyz"))
}
}
In 2.11 you can use scala-java8-compat
libraryDependencies += "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0"
App1.java
import java.util.function.Function;
import static scala.compat.java8.JFunction.func;
public class App1 {
public static void method(Function<String, String> function) {
System.out.println(function.apply("a"));
}
public static void main(String[] args) {
App.method1(func((String s) -> s.toUpperCase()));
}
}
App.scala
import scala.compat.java8.FunctionConverters._
object App {
def main(args: Array[String]): Unit = {
App1.method(((s: String) => s.toUpperCase).asJava)
}
def method1(function: String => String): Unit = {
println(function("xyz"))
}
}
Alternatively in 2.11 in Scala you can define implicit converters between java.util.function.Function
and scala.Function1
.
So if you use 2.11 try
source
.mapValues((value => value + "hello").asJava)
.print(Printed.toSysOut)
or
source
.mapValues(((value: String) => value + "hello").asJava)
.print(Printed.toSysOut[String, String])