I have some 17,000 files in S3 that look like this:
{"hour": "00", "month": "07", "second": "00", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "01", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "02", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "03", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "04", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
I have one file per day. Each file contains a record for each second. ∴ 86,000 records in a file. Each file has a file name like "YYYY-MM-DD".
Using boto3 I generate a list of the files in the bucket. Here I am selecting only 10 files using the prefix.
import boto3
s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1972-05-1):
s3_list.append(object.key)
This function returns a list of files(S3 keys). I then define a function to fetch a file and return the rows:
def FileRead(s3Key):
s3obj = boto3.resource('s3').Object(bucket_name='bucket', key=s3Key)
contents = s3obj.get()['Body'].read().decode('utf-8')
yield Row(**contents)
I then distribute this function using flatMap:
job = sc.parallelize(s3_list)
foo = job.flatMap(FileRead)
Problem
I'm not able to work out how to properly pump these rows into a Dataframe however.
>>> foo.toDF().show()
+--------------------+
| _1|
+--------------------+
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
+--------------------+
>>> foo.toDF().count()
10
Please could someone show me how to do this?
In the end I got it working with:
Thanks @user6910411 for the inspiration.
You should probably use
json
reader directly (spark.read.json
/sqlContext.read.json
) but if you know the schema you can try parsing JSON string manually:You can also use
get_json_object
: