Creating a SparkSQL UDF in Java outside of SQLCont

2019-01-25 22:49发布

Problem

I would like to create a User-Defined Function in Java that can be called as a Java method within a chain of Apache Spark operators. I'm having trouble finding Java examples that don't require the UDF to exist inside of a SQL query.

Versions

  • Java 8
  • Scala 2.10.6
  • Apache Spark 1.6.0 Pre-built for Hadoop 2.6.0

What I've Tried That Works

I can successfully create a UDF in Java. However, I can't use this unless it's in a SQL query:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");

Where I'm Stuck

I would expect a non-SQL method-call-style UDF in Java to look something like this:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));

Compiling this leads to a compiler error on the line beginning with "UserDefinedFunction", so obviously my attempt at guessing the right signature is incorrect:

error: no suitable method found for udf((String st[...]ase(),DataType)
    UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
    (cannot infer type-variable(s) RT#1
    (argument mismatch; Function0 is not a functional interface
    multiple non-overriding abstract methods found in interface Function0))

This error continues with detail for each of the inferred udf() signatures attempted.

What I Need

I need to fix the Java code so that I can define and use the udfUppercase UDF without embedding it in a SQL query. I feel like I'm missing something very simple, fundamental, and possibly syntax-y, but could be completely off base.

Working Solution (courtesy of zero323 below)

There's no good way to register and use a Java UDF as a Java method, but a UDF registered in the SQLContext can be inserted into a chain of operators using callUDF().

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));

Also, be sure to use callUDF() and not the deprecated callUdf() which has a different method signature.

1条回答
霸刀☆藐视天下
2楼-- · 2019-01-25 23:28

Spark >= 2.3

SPARK-22945 (add java UDF APIs in the functions object) adds simplified udf API, similar to Scala and Python:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction udfUppercase = udf(
  (String s) -> s.toUpperCase(), DataTypes.StringType
);

df.select(udfUppercase.apply(col("name")));

Spark < 2.3

Long story short functions.udf methods are not designed for Java interoperability. All variants require TypeTags and while it is possible to generate these manually (I am pretty sure I've seen Daniel Darabos showing how to do it on SO) it is something you probably want to avoid.

If for some reason you want to avoid writing UDF in Scala the simplest thing is to register UDF and call it by name:

sqlContext.udf().register("udfUppercase",
  (String string) -> string.toUpperCase(), DataTypes.StringType);

df.select(callUDF("udfUppercase", col("name")));
查看更多
登录 后发表回答