Call a function with each element a stream in Data

2020-04-30 03:07发布

问题:

I have a DataFrame stream in Databricks, and I want to perform an action on each element. On the net I found specific purpose methods, like writing it to the console or dumping into memory, but I want to add some business logic, and put some results into Redis.

To be more specific, this is how it would look like in non-stream case:

val someDataFrame = Seq(
  ("key1", "value1"),
  ("key2", "value2"),
  ("key3", "value3"),
  ("key4", "value4")
).toDF()

def someFunction(keyValuePair: (String, String)) = {
  println(keyValuePair)
}

someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))

But if the someDataFrame is not a simple data frame but a stream data frame (indeed coming from Kafka), the error message is this:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Could anyone please help me solving this problem?

Some important notes:

  • I've read the relevant documentation, like Spark Streaming or Databricks Streaming and a few other descriptions as well.

  • I know that there must be something like start() and awaitTermination, but I don't know the exact syntax. The descriptions did not help.

  • It would take pages to list all the possibilities I tried, so I rather not provide them.

  • I do not want to solve the specific problem of displaying the result. I.e. please do not provide a solution to this specific case. The someFunction would look like this:

val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
  doSomething(keyValuePair);
}

(Question What is the purpose of ForeachWriter in Spark Structured Streaming? does not provide a working example, therefore does not answer my question.)

回答1:

Here is an example of reading using foreachBatch to save every item to redis using the streaming api.

Related to a previous question (DataFrame to RDD[(String, String)] conversion)

// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._

import com.redislabs.provider.redis._

// schema of csv files
val userSchema = new StructType()
    .add("name", "string")
    .add("age", "string")

// create a data stream reader from a dir with csv files
val csvDF = spark
  .readStream
  .format("csv")
  .option("sep", ";")
  .schema(userSchema)
  .load("./data") // directory where the CSV files are 

// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default

csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
  .writeStream // create a data stream writer
  .foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
  .start // start processing