Transfer data from database to Spark using sparkly

2019-01-23 21:26发布

I have some data in a database, and I want to work with it in Spark, using sparklyr.

I can use a DBI-based package to import the data from the database into R

dbconn <- dbConnect(<some connection args>)
data_in_r <- dbReadTable(dbconn, "a table") 

then copy the data from R to Spark using

sconn <- spark_connect(<some connection args>)
data_ptr <- copy_to(sconn, data_in_r)

Copying twice is slow for big datasets.

How can I copy data directly from the database into Spark?

sparklyr has several spark_read_*() functions for import, but nothing database related. sdf_import() looks like a possibility, but it isn't clear how to use it in this context.

1条回答
萌系小妹纸
2楼-- · 2019-01-23 22:03

Sparklyr >= 0.6.0

You can use spark_read_jdbc.

Sparklyr < 0.6.0

I hope there is a more elegant solution out there but here is a minimal example using low level API:

  • Make sure that Spark has access to the required JDBC driver, for example by adding its coordinates to spark.jars.packages. For example with PostgreSQL (adjust for current version) you could add:

    spark.jars.packages org.postgresql:postgresql:9.4.1212
    

    to SPARK_HOME/conf/spark-defaults.conf

  • Load data and register as temporary view:

    name <- "foo"
    
    spark_session(sc) %>% 
      invoke("read") %>% 
      # JDBC URL and table name
      invoke("option", "url", "jdbc:postgresql://host/database") %>% 
      invoke("option", "dbtable", "table") %>% 
      # Add optional credentials
      invoke("option", "user", "scott") %>%
      invoke("option", "password", "tiger") %>% 
      # Driver class, here for PostgreSQL
      invoke("option", "driver", "org.postgresql.Driver") %>% 
      # Read and register as a temporary view
      invoke("format", "jdbc") %>% 
      invoke("load") %>% 
      # Spark 2.x, registerTempTable in 1.x
      invoke("createOrReplaceTempView", name)
    

    You can pass multiple options at once using an environment:

    invoke("options", as.environment(list(
      user="scott", password="tiger", url="jdbc:..."
    )))
    
  • Load temporary view with dplyr:

    dplyr::tbl(sc, name)
    
  • Be sure to read about further JDBC options, with focus on partitionColumn, *Bound and numPartitions.

  • For additional details see for example How to use JDBC source to write and read data in (Py)Spark? and How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

查看更多
登录 后发表回答