selecting a range of elements in an array spark sq

2019-01-25 16:06发布

I use spark-shell to do the below operations.

Recently loaded a table with an array column in spark-sql .

Here is the DDL for the same:

create table test_emp_arr{
    dept_id string,
    dept_nm string,
    emp_details Array<string>
}

the data looks something like this

+-------+-------+-------------------------------+
|dept_id|dept_nm|                     emp_details|
+-------+-------+-------------------------------+
|     10|Finance|[Jon, Snow, Castle, Black, Ned]|
|     20|     IT|            [Ned, is, no, more]|
+-------+-------+-------------------------------+

I can query the emp_details column something like this :

sqlContext.sql("select emp_details[0] from emp_details").show

Problem

I want to query a range of elements in the collection :

Expected query to work

sqlContext.sql("select emp_details[0-2] from emp_details").show

or

sqlContext.sql("select emp_details[0:2] from emp_details").show

Expected output

+-------------------+
|        emp_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

In pure Scala, if i have an array something as :

val emp_details = Array("Jon","Snow","Castle","Black")

I can get the elements from 0 to 2 range using

emp_details.slice(0,3)

returns me

Array(Jon, Snow,Castle)

I am not able to apply the above operation of the array in spark-sql.

Thanks

7条回答
beautiful°
2楼-- · 2019-01-25 16:38

Since Spark 2.4 you can use slice function. In Python):

pyspark.sql.functions.slice(x, start, length)

Collection function: returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

...

New in version 2.4.

from pyspark.sql.functions import slice

df = spark.createDataFrame([
    (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
    (20, "IT", ["Ned", "is", "no", "more"])
], ("dept_id", "dept_nm", "emp_details"))

df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

In Scala

def slice(x: Column, start: Int, length: Int): Column

Returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

import org.apache.spark.sql.functions.slice

val df = Seq(
    (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
    (20, "IT", Seq("Ned", "is", "no", "more"))
).toDF("dept_id", "dept_nm", "emp_details")

df.select(slice($"emp_details", 1, 3) as "empt_details").show
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

The same thing can be of course done in SQL

SELECT slice(emp_details, 1, 3) AS emp_details FROM df

Important:

Please note, that unlike Seq.slice, values are indexed from zero and the second argument is length, not end position.

查看更多
在下西门庆
3楼-- · 2019-01-25 16:41

You can use the function array to build a new Array out of the three values:

import org.apache.spark.sql.functions._

val input = sqlContext.sql("select emp_details from emp_details")

val arr: Column = col("emp_details")
val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")

val result.show()
// +-------------------+
// |        emp_details|
// +-------------------+
// |[Jon, Snow, Castle]|
// |      [Ned, is, no]|
// +-------------------+
查看更多
倾城 Initia
4楼-- · 2019-01-25 16:51

Edit2: For who wants to avoid udf at the expense of readability ;-)

If you really want to do it in one step, you will have to use Scala to create a lambda function returning an sequence of Column and wrap it in an array. This is a bit involved, but it's one step:

val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")

df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)    


+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+

The _:* works a bit of magic to pass an list to a so-called variadic function (array in this case, which construct the sql array). But I would advice against using this solution as is. put the lambda function in a named function

def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))

for code readability. Note that in general, sticking to Column expressions (without using `udf) has better performances.

Edit: In order to do it in a sql statement (as you ask in your question...), following the same logic you would generate the sql query using scala logic (not saying it's the most readable)

def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"

sqlContext.sql(sqlQuery).show

+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+

note that you can replace until by to in order to provide the last element taken rather than the element at which the iteration stops.

查看更多
淡お忘
5楼-- · 2019-01-25 16:57

Here is my generic slice UDF, support array with any type. A little bit ugly because you need to know the element type in advance.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
  if (arr == null) null else arr.slice(from, until)

def slice(elemType: DataType): UserDefinedFunction = 
  udf(arraySlice _, ArrayType(elemType)

fs.select(slice(StringType)($"emp_details", 1, 2))
查看更多
戒情不戒烟
6楼-- · 2019-01-25 17:00

Use nested split:

split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673

scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]

scala> df.createOrReplaceTempView("raw_data")

scala> df.show()
+-------+-------+--------------------+
|dept_id|dept_nm|         emp_details|
+-------+-------+--------------------+
|     10|Finance|[Jon, Snow, Castl...|
|     20|     IT| [Ned, is, no, more]|
+-------+-------+--------------------+


scala> val df2 = spark.sql(
     | s"""
     | |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
     | """)
df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]

scala> df2.show()
+-------+-------+-------------------+
|dept_id|dept_nm|        emp_details|
+-------+-------+-------------------+
|     10|Finance|[Jon, Snow, Castle]|
|     20|     IT|      [Ned, is, no]|
+-------+-------+-------------------+
查看更多
对你真心纯属浪费
7楼-- · 2019-01-25 17:01

use selecrExpr() and split() function in apache spark.

for example :

fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
查看更多
登录 后发表回答