我在S3约200文件,例如, a_file.json.bz2
,这些文件的每一行是一个JSON格式的记录,但某些字段被序列化pickle.dumps
,例如datetime
字段。 每个文件约1GB后bzip
压缩。 现在我需要的火花,处理这些文件(pyspark,实际上),但我不能甚至每一个记录了。 那么,什么是最好的做法吗?
所述ds.take(10)
给出
[(0, u'(I551'),
(6, u'(dp0'),
(11, u'Vadv_id'),
(19, u'p1'),
(22, u'V479883'),
(30, u'p2'),
(33, u'sVcpg_id'),
(42, u'p3'),
(45, u'V1913398'),
(54, u'p4')]
显然,拆分是不是每个记录。
谢谢。
我有这个问题阅读GPG加密文件 。 您可以使用wholeTextFiles
丹尼尔暗示,但是你必须阅读大量文件时作为整个文件将处理前被加载到内存要小心。 如果文件过大,它可以崩溃的执行人。 我用parallelize
和flatMap
。 也许沿东西线
def read_fun_generator(filename):
with bz2.open(filename, 'rb') as f:
for line in f:
yield line.strip()
bz2_filelist = glob.glob("/path/to/files/*.bz2")
rdd_from_bz2 = sc.parallelize(bz2_filelist).flatMap(read_fun_generator)
您可以通过访问输入文件的文件(而不是行由行) SparkContext.wholeTextFiles
。 然后,您可以使用flatMap
解压缩和解析自己的代码行。
事实上,它是由有问题pickle
。 通过查看压缩后的文件内容,这的确是
(I551
(dp0
Vadv_id
p1
V479883
p2
sVcpg_id
p3
V1913398
p4
这给了我麻烦解析。 我知道我可以只pick.load(file)
多次获得对象,但是无法找到星火迅速解决,我只能通过行访问加载的文件一致。 此外,在该文件中的记录具有可变域和长度,这使得它更难以破解。
我结束了重新生成这些bz2
从源文件,因为它实际上是更容易和更快。 而且我知道,Spark和Hadoop的支持bz2
压缩完美所以不需要额外的动作。