可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I use spark-shell
to do the below operations.
Recently loaded a table with an array column in spark-sql .
Here is the DDL for the same:
create table test_emp_arr{
dept_id string,
dept_nm string,
emp_details Array<string>
}
the data looks something like this
+-------+-------+-------------------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------------------+
| 10|Finance|[Jon, Snow, Castle, Black, Ned]|
| 20| IT| [Ned, is, no, more]|
+-------+-------+-------------------------------+
I can query the emp_details column something like this :
sqlContext.sql("select emp_details[0] from emp_details").show
Problem
I want to query a range of elements in the collection :
Expected query to work
sqlContext.sql("select emp_details[0-2] from emp_details").show
or
sqlContext.sql("select emp_details[0:2] from emp_details").show
Expected output
+-------------------+
| emp_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
In pure Scala, if i have an array something as :
val emp_details = Array("Jon","Snow","Castle","Black")
I can get the elements from 0 to 2 range using
emp_details.slice(0,3)
returns me
Array(Jon, Snow,Castle)
I am not able to apply the above operation of the array in spark-sql.
Thanks
回答1:
Here is a solution using a User Defined Function which has the advantage of working for any slice size you want. It simply builds a UDF function around the scala builtin slice
method :
import sqlContext.implicits._
import org.apache.spark.sql.functions._
val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))
Example with a sample of your data :
val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show
Produces the expected output
+--------------------+-------------------+
| emp_details| slice|
+--------------------+-------------------+
|[Jon, Snow, Castl...|[Jon, Snow, Castle]|
+--------------------+-------------------+
You can also register the UDF in your sqlContext
and use it like this
sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon','Snow','Castle','Black','Ned'),0,3)")
You won't need lit
anymore with this solution
回答2:
Edit2: For who wants to avoid udf at the expense of readability ;-)
If you really want to do it in one step, you will have to use Scala to create a lambda function returning an sequence of Column
and wrap it in an array. This is a bit involved, but it's one step:
val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")
df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)
+-------------------------------+-------------------+
|emp_details |slice |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+
The _:*
works a bit of magic to pass an list to a so-called variadic function (array
in this case, which construct the sql array). But I would advice against using this solution as is. put the lambda function in a named function
def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))
for code readability. Note that in general, sticking to Column
expressions (without using `udf) has better performances.
Edit: In order to do it in a sql statement (as you ask in your question...), following the same logic you would generate the sql query using scala logic (not saying it's the most readable)
def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"
sqlContext.sql(sqlQuery).show
+-------------------------------+-------------------+
|emp_details |slice |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+
note that you can replace until
by to
in order to provide the last element taken rather than the element at which the iteration stops.
回答3:
You can use the function array
to build a new Array out of the three values:
import org.apache.spark.sql.functions._
val input = sqlContext.sql("select emp_details from emp_details")
val arr: Column = col("emp_details")
val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")
val result.show()
// +-------------------+
// | emp_details|
// +-------------------+
// |[Jon, Snow, Castle]|
// | [Ned, is, no]|
// +-------------------+
回答4:
Since Spark 2.4 you can use slice
function. In Python):
pyspark.sql.functions.slice(x, start, length)
Collection function: returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.
...
New in version 2.4.
from pyspark.sql.functions import slice
df = spark.createDataFrame([
(10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
(20, "IT", ["Ned", "is", "no", "more"])
], ("dept_id", "dept_nm", "emp_details"))
df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
+-------------------+
| empt_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
In Scala
def slice(x: Column, start: Int, length: Int): Column
Returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.
import org.apache.spark.sql.functions.slice
val df = Seq(
(10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
(20, "IT", Seq("Ned", "is", "no", "more"))
).toDF("dept_id", "dept_nm", "emp_details")
df.select(slice($"emp_details", 1, 3) as "empt_details").show
+-------------------+
| empt_details|
+-------------------+
|[Jon, Snow, Castle]|
| [Ned, is, no]|
+-------------------+
The same thing can be of course done in SQL
SELECT slice(emp_details, 1, 3) AS emp_details FROM df
Important:
Please note, that unlike Seq.slice
, values are indexed from zero and the second argument is length, not end position.
回答5:
use selecrExpr() and split() function in apache spark.
for example :
fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
回答6:
Here is my generic slice UDF, support array with any type. A little bit ugly because you need to know the element type in advance.
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
if (arr == null) null else arr.slice(from, until)
def slice(elemType: DataType): UserDefinedFunction =
udf(arraySlice _, ArrayType(elemType)
fs.select(slice(StringType)($"emp_details", 1, 2))
回答7:
Use nested split:
split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673
scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
scala> df.createOrReplaceTempView("raw_data")
scala> df.show()
+-------+-------+--------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+--------------------+
| 10|Finance|[Jon, Snow, Castl...|
| 20| IT| [Ned, is, no, more]|
+-------+-------+--------------------+
scala> val df2 = spark.sql(
| s"""
| |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
| """)
df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]
scala> df2.show()
+-------+-------+-------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+-------------------+
| 10|Finance|[Jon, Snow, Castle]|
| 20| IT| [Ned, is, no]|
+-------+-------+-------------------+