I have two comma-separated string columns (sourceAuthors
and targetAuthors
).
val df = Seq(
("Author1,Author2,Author3","Author2,Author3,Author1")
).toDF("source","target")
I'd like to add another column nCommonAuthors
with the number of common Authors.
I've tried doing this by this way:
def myUDF = udf { (s1: String, s2: String) =>
s1.split(",")
s2.split(",")
s1.intersect(s2).length
}
val newDF = myDF.withColumn("nCommonAuthors", myUDF($"source", $"target"))
I get the following error :
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type Unit is not supported
Any idea why I get this error? How to find the common elements among two columns?
Based on SCouto answer, I give you the complete solution that worked for me:
def myUDF: UserDefinedFunction = udf(
(s1: String, s2: String) => {
val splitted1 = s1.split(",")
val splitted2 = s2.split(",")
splitted1.intersect(splitted2).length
})
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = Seq(("Author1,Author2,Author3","Author2,Author3,Author1")).toDF("source","target")
df.show(false)
+-----------------------+-----------------------+
|source |target |
+-----------------------+-----------------------+
|Author1,Author2,Author3|Author2,Author3,Author1|
+-----------------------+-----------------------+
val newDF: DataFrame = df.withColumn("nCommonAuthors", myUDF('source,'target))
newDF.show(false)
+-----------------------+-----------------------+--------------+
|source |target |nCommonAuthors|
+-----------------------+-----------------------+--------------+
|Author1,Author2,Author3|Author2,Author3,Author1|3 |
+-----------------------+-----------------------+--------------+
Unless I misunderstood your problem, there are standard functions that can help you (so you don't have to write a UDF), i.e. split
and array_intersect
.
Given the following dataset:
val df = Seq(("Author1,Author2,Author3","Author2,Author3"))
.toDF("source","target")
scala> df.show(false)
+-----------------------+---------------+
|source |target |
+-----------------------+---------------+
|Author1,Author2,Author3|Author2,Author3|
+-----------------------+---------------+
You could write the following structured query:
val intersect = array_intersect(split('source, ","), split('target, ","))
val solution = df.select(intersect as "common_elements")
scala> solution.show(false)
+------------------+
|common_elements |
+------------------+
|[Author2, Author3]|
+------------------+
That error means that your udf is returning unit ( no return at all, as void un Java )
Try this. You are applying the intersect over the original s1 and S2 rather than over the splitted ones.
def myUDF = udf((s1: String, s2: String) =>{
val splitted1 = s1.split(",")
val splitted2= s2.split(",")
splitted1.intersect(splitted2).length
}
)