Issue with Kafka stream filtering

2019-07-31 04:12发布

问题:

I'm trying to run a basic app from the following example:

https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala

However I'm getting an exception at this line:

// Variant 1: using `mapValues`
val uppercasedWithMapValues: KStream[Array[Byte], String] = textLines.mapValues(_.toUpperCase())

Error:(33, 25) missing parameter type for expanded function ((x$1) => x$1.toUpperCase()) textLines.mapValues(_.toUpperCase())

Error I'm getting if I hover cursor over the code:

Type mismatch, expected: ValueMapper[_ >: String, _ <: NotInferedVR], actual: (Any) => Any Cannot resolve symbol toUpperCase

Contents of my sbt file:

name := "untitled1"

version := "0.1"

scalaVersion := "2.11.11"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "0.11.0.0"

// https://mvnrepository.com/artifact/org.apache.kafka/connect-api
libraryDependencies += "org.apache.kafka" % "connect-api" % "0.11.0.0"

I'm really not sure how to proceed with that as I'm quite new to Scala. I'd like to know what's the issue and how to fix it.

回答1:

From http://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type-t

The root cause of this problem is Scala-Java interoperability – the Kafka Streams API is implemented in Java, but your application is written in Scala. Notably, this problem is caused by how the type systems of Java and Scala interact. Generic wildcards in Java, for example, are often causing such Scala issues.

To fix the problem you would need to declare types explicitly in your Scala application in order for the code to compile. For example, you may need to break a single statement that chains multiple DSL operations into multiple statements, where each statement explicitly declares the respective return types. The StreamToTableJoinScalaIntegrationTest demonstrates how the types of return variables are explicitly declared.

Update

Kafka 2.0 (will be released in June) contains a proper Scala API that avoid those issues. Compare https://cwiki.apache.org/confluence/display/KAFKA/KIP-270+-+A+Scala+Wrapper+Library+for+Kafka+Streams