Read a bytes column in spark

2020-06-04 04:16发布

I have a data set which contains an ID field that is in an unknown (and not friendly) encoding. I can read the single column using plain python and verify that the values are distinct and consistent across multiple data sets (i.e. it can be used as a primary key for joining).

When loading the file using spark.read.csv, it seems that spark is converting the column to utf-8. However, some of the multibyte sequences are converted to the Unicode character U+FFFD REPLACEMENT CHARACTER. (EF BF BD in hex).

Is there a way to force Spark to read the column as bytes and not as a string?

Here is some code that can be used to recreate my issue (let column a be the ID field):

Create a File with Sample Data

data = [
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb0'), '1', 'a'),
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb1'), '2', 'b'),
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb2'), '3', 'c')
]

with open('sample.csv', 'wb') as f:
    header = ["a", "b", "c"]
    f.write(",".join(header)+"\n")
    for d in data:
        f.write(",".join(d) + "\n")

Read using Pandas

import pandas as pd
df = pd.read_csv("sample.csv", converters={"a": lambda x: x.encode('hex')})
print(df)
#                  a  b  c
#0  baed858e91d4c7b0  1  a
#1  baed858e91d4c7b1  2  b
#2  baed858e91d4c7b2  3  c

Try reading the same file using Spark

spark_df = spark.read.csv("sample.csv", header=True)
spark_df.show()
#+-----+---+---+
#|a    |b  |c  |
#+-----+---+---+
#|�텎��ǰ|1  |a  |
#|�텎��DZ|2  |b  |
#|�텎��Dz|3  |c  |
#+-----+---+---+

Yikes! OK, so how about converting to hex?

import pyspark.sql.functions as f
spark_df.withColumn("a", f.hex("a")).show(truncate=False)
#+----------------------------+---+---+
#|a                           |b  |c  |
#+----------------------------+---+---+
#|EFBFBDED858EEFBFBDEFBFBDC7B0|1  |a  |
#|EFBFBDED858EEFBFBDEFBFBDC7B1|2  |b  |
#|EFBFBDED858EEFBFBDEFBFBDC7B2|3  |c  |
#+----------------------------+---+---+

(In this example the values are distinct, but that's not true in my larger file)

As you can see, the values are close, but some of the bytes have been replaced by EFBFBD

Is there any way to read the file in Spark (maybe using rdd?) so that my output looks like the pandas version:

#+----------------+---+---+
#|a               |b  |c  |
#+----------------+---+---+
#|baed858e91d4c7b0|1  |a  |
#|baed858e91d4c7b1|2  |b  |
#|baed858e91d4c7b2|3  |c  |
#+----------------+---+---+

I've tried casting to byte and specifying the schema so that this column is ByteType() but that didn't work.

Edit

I am using Spark v 2.1.

2条回答
祖国的老花朵
2楼-- · 2020-06-04 04:57

How about storing it as base 64 encoding and decode it when reading it?

Storing

import base64

data = [
    (base64.b64encode(bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb0')), '1', 'a'),
    (base64.b64encode(bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb1')), '2', 'b'),
    (base64.b64encode(bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb2')), '3', 'c')
]

with open('sample.csv', 'wb') as f:
    header = ["a", "b", "c"]
    f.write(",".join(header)+"\n")
    for d in data:
        f.write(",".join(d) + "\n")

Reading

import pyspark.sql.functions as f
import base64

spark_df.withColumn("a", base64.b64decode("a"))
查看更多
孤傲高冷的网名
3楼-- · 2020-06-04 05:05

The problem is rooted in the fact that delimited files are poorly suited to binary data.

If there is a known, consistent encoding for the text, use the charset option. See https://github.com/databricks/spark-csv#features (I don't know of a good place in the 2.x docs where delimited reading options are described so I still go back to the 1.x docs). I would recommend experimenting with 8-bit ASCII, e.g., ISO-8859-1 or US-ASCII.

If there is no such encoding, you would need to either transform the input to a different format, e.g., base64 encoding the first column, or manipulate the read data to get it back to what you need.

查看更多
登录 后发表回答