I am loading a file of JSON objects as a PySpark SchemaRDD
. I want to change the "shape" of the objects (basically, I'm flattening them) and then insert into a Hive table.
The problem I have is that the following returns a PipelinedRDD
not a SchemaRDD
:
log_json.map(flatten_function)
(Where log_json
is a SchemaRDD
).
Is there either a way to preserve type, cast back to the desired type, or efficiently insert from the new type?
The solution is
applySchema
:Where flat_schema is a
StructType
representing the schema in the same way as you would obtain fromlog_json.schema()
(but flattened, obviously).It looks like
select
is not available in python, so you will have toregisterTempTable
and write it as a SQL statement, likeafter setting up the function for use in SQL
As @zero323 brought up, a function against * is probably not supported...so you can just create a function that takes in your data types and pass all of that in.
you can try this one... a bit long but works
More an idea than a real solution. Let's assume your data looks like this:
First lets load it and check schema:
Now we register table as suggested by Justin Pihony and extract schema:
Instead of flattening data we can flatten schema using something similar to this:
add small helper to format query string:
and finally results:
Disclaimer: I didn't try to get very deep into schema structure so most likely there are some cases not covered by
flatten_schema
.