Apply filter condition on dataframe created from J

2019-03-03 00:46发布

问题:

I am working on the dataframe created by JSON and then I want to apply the filter condition over the dataframe.

val jsonStr = """{ "metadata": [{ "key": 84896, "value": 54 },{ "key": 1234, "value": 12 }]}"""
val rdd = sc.parallelize(Seq(jsonStr))
val df = sqlContext.read.json(rdd)

schema of df

root
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: long (nullable = true)
 |    |    |-- value: long (nullable = true)

now I need to filter the dataframe which I am trying to do as

val df1=df.where("key == 84896")

which throws error

ERROR Executor - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [metadata]; line 1 pos 0;
'Filter ('key = 84896)

The reason I want to use where clause is because of the expression string which I want to use directly eg ( (key == 999, value == 55) || (key == 1234, value == 12) )

回答1:

From what I have understood from your question and comment is that you are trying to apply ( (key == 999, value == 55) || (key == 1234, value == 12) ) expression to filter the dataframe rows.

First of all, the expression needs changes as it cannot be applied as expression to dataframe in spark so you need to change as

val expression = """( (key == 999, value == 55) || (key == 1234, value == 12) )"""
val actualExpression = expression.replace(",", " and").replace("||", "or")

which should give you new valid expression as

( (key == 999 and value == 55) or (key == 1234 and value == 12) )

Now that you have valid expression, your dataframe needs modification too as you can't query such expression on a column with array and struct as schema

So you would need explode function to explode the array elements to different rows and then use .* notation to select all the elements of struct on different columns.

val df1 = df.withColumn("metadata", explode($"metadata"))
  .select($"metadata.*")

which should give you dataframe as

+-----+-----+
|key  |value|
+-----+-----+
|84896|54   |
|1234 |12   |
+-----+-----+

And the finally use the valid expression on the dataframe generated as

df1.where(s"${actualExpression}")

I hope the answer is helpful



回答2:

First you should use explode to get an easy-to-work-with dataFrame. Then you can select both key and value of you given input:

val explodedDF = df.withColumn("metadata", explode($"metadata"))
  .select("metadata.key", "metadata.value")

Output:

+-----+-----+
|  key|value|
+-----+-----+
|84896|   54|
| 1234|   12|
+-----+-----+

This way you'll be able to perform your filtering logic as usual:

scala> explodedDF.where("key == 84896").show
+-----+-----+
|  key|value|
+-----+-----+
|84896|   54|
+-----+-----+

You can concatenate your filtering requirements, some examples below:

explodedDF.where("key == 84896 AND value == 54")
explodedDF.where("(key == 84896 AND value == 54) OR key = 1234")