I have a Dataframe with an array of bytes in spark (python)
DF.select(DF.myfield).show(1, False)
+----------------+
|myfield |
+----------------+
|[00 8F 2B 9C 80]|
+----------------+
i'm trying to convert this array to a string
'008F2B9C80'
then to the numeric value
int('008F2B9C80',16)/1000000
> 2402.0
I have found some udf sample, so i already can extract a part of the array like this :
u = f.udf(lambda a: format(a[1],'x'))
DF.select(u(DF['myfield'])).show()
+------------------+
|<lambda>(myfield) |
+------------------+
| 8f|
+------------------+
Now how to iterate over the whole array ?
Is it possible to do all the operations i have to code in the udf function ?
May be there is a best way to do the cast ???
Thanks for your help
Here is the scala df solution. You need to import the scala.math.BigInteger
scala> val df = Seq((Array("00","8F","2B","9C","80"))).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: array<string>]
scala> df.withColumn("idstr",concat_ws("",'id)).show
+--------------------+----------+
| id| idstr|
+--------------------+----------+
|[00, 8F, 2B, 9C, 80]|008F2B9C80|
+--------------------+----------+
scala> import scala.math.BigInt
import scala.math.BigInt
scala> def convertBig(x:String):String = BigInt(x.sliding(2,2).map( x=> Integer.parseInt(x,16)).map(_.toByte).toArray).toString
convertBig: (x: String)String
scala> val udf_convertBig = udf( convertBig(_:String):String )
udf_convertBig: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.withColumn("idstr",concat_ws("",'id)).withColumn("idBig",udf_convertBig('idstr)).show(false)
+--------------------+----------+----------+
|id |idstr |idBig |
+--------------------+----------+----------+
|[00, 8F, 2B, 9C, 80]|008F2B9C80|2402000000|
+--------------------+----------+----------+
scala>
There is no spark equivalent for scala's BigInteger, so I'm converting the udf() result to string.
I have found a python solution too
from pyspark.sql.functions import udf
spark.udf.register('ByteArrayToDouble', lambda x: int.from_bytes(x, byteorder='big', signed=False) / 10e5)
spark.sql('select myfield, ByteArrayToDouble(myfield) myfield_python, convert_binary(hex(myfield))/1000000 myfield_scala from my_table').show(1, False)
+-------------+-----------------+----------------+
|myfield |myfield_python |myfield_scala |
+-------------+-----------------+----------------+
|[52 F4 92 80]|1391.76 |1391.76 |
+-------------+-----------------+----------------+
only showing top 1 row
I'm now able to bench the two solutions
Thank you for your precious help
I came across this question while answering your newest one.
Suppose you have the df
as
+--------------------+
| myfield|
+--------------------+
|[00, 8F, 2B, 9C, 80]|
| [52, F4, 92, 80]|
+--------------------+
Now you can use the following lambda function
def func(val):
return int("".join(val), 16)/1000000
func_udf = udf(lambda x: func(x), FloatType())
And to create the output, use
df = df.withColumn("myfield1", func_udf("myfield"))
This yields,
+--------------------+--------+
| myfield|myfield1|
+--------------------+--------+
|[00, 8F, 2B, 9C, 80]| 2402.0|
| [52, F4, 92, 80]| 1391.76|
+--------------------+--------+