I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append
mode I need to specify id
for each DataFrame.Row
. Is there any way for Spark to create primary keys?
相关问题
- How to maintain order of key-value in DataFrame sa
- NOT DISTINCT query in mySQL
- Spark on Yarn Container Failure
- Flush single app django 1.9
- Django distinct is not working
Note that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .
Scala:
If all you need is unique numbers you can use
zipWithUniqueId
and recreate DataFrame. First some imports and dummy data:Extract schema for further usage:
Add id field:
Create DataFrame:
The same thing in Python:
If you prefer consecutive number your can replace
zipWithUniqueId
withzipWithIndex
but it is a little bit more expensive.Directly with
DataFrame
API:(universal Scala, Python, Java, R with pretty much the same syntax)
Previously I've missed
monotonicallyIncreasingId
function which should work just fine as long as you don't require consecutive numbers:While useful
monotonicallyIncreasingId
is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.Note:
It is also possible to use
rowNumber
window function:Unfortunately:
So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.
I found the following solution to be relatively straightforward for the case where zipWithIndex() is the desired behavior, i.e. for those desirng consecutive integers.
In this case, we're using pyspark and relying on dictionary comprehension to map the original row object to a new dictionary which fits a new schema including the unique index.