DataFrame to RDD[(String, String)] conversion

2020-05-06 09:04发布

I want to convert an org.apache.spark.sql.DataFrame to org.apache.spark.rdd.RDD[(String, String)] in Databricks. Can anyone help?

Background (and a better solution is also welcome): I have a Kafka stream which (after some steps) becomes a 2 column data frame. I would like to put this into a Redis cache, first column as a key and second column as a value.

More specifically the type of the input is this: lastContacts: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: bigint]. I try to put into Redis as follows:

sc.toRedisKV(lastContacts)(redisConfig)

The error message looks like this:

notebook:20: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
    (which expands to)  org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.rdd.RDD[(String, String)]
sc.toRedisKV(lastContacts)(redisConfig)

I already played around with some ideas (like function .rdd) but none helped.

1条回答
祖国的老花朵
2楼-- · 2020-05-06 09:23

You can use df.map(row => ...) to convert the dataframe to a RDD if you want to map a row to a different RDD element.

For example:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

Please refer https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html regarding AnalysisException: Queries with streaming sources must be executed with writeStream.start()

You need to wait for the termination of the query using query.awaitTermination() To prevent the process from exiting while the query is active.

查看更多
登录 后发表回答