How call method based on Json Object scala spark?

2019-09-21 17:20发布

问题:

I Have two functions like below

def method1(ip:String,r:Double,op:String)={
val data = spark.read.option("header", true).csv(ip).toDF()
val r3= data.select("c", "S").dropDuplicates("C", "S").withColumn("R", lit(r))
r3.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(op)
}

def method2(ip:String,op:String)={
val data = spark.read.option("header", true).csv(ip).toDF()
val r3= data.select("c", "S").dropDuplicates("C", "StockCode")
r3.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(op)
}

I want to call this methods by based on Json object parameter . for example if my input json is like below

{"name":"method1","ip":"Or.csv","r":1.0,"op":"oppath"}

It has to call method1 and "Or.csv",1.0,”oppath" as parameters I.e. in json object name indicate method name, and reaming fields are parameters.

Please help me on this.

回答1:

First we need to read Json through spark into a dataframe.

val df = sqlContext.read.json("path to the json file")

which should be give you dataframe as

scala> df.show()
+------+-------+------+---+
|    ip|   name|    op|  r|
+------+-------+------+---+
|Or.csv|method1|oppath|1.0|
+------+-------+------+---+

Next

   scala> def method1(ip:String,r:Double,op:String)={
         | val data = spark.read.option("header", true).csv(ip).toDF()
         | val r3= data.select("c", "S").dropDuplicates("C", "S").withColumn("R", lit(r))
         | r3.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(op)
         | }

method1: (ip: String, r: Double, op: String)Unit

next

   scala> def method2(ip:String,op:String)={
         | val data = spark.read.option("header", true).csv(ip).toDF()
         | val r3= data.select("c", "S").dropDuplicates("C", "StockCode")
         | r3.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(op)
         | }

method2: (ip: String, op: String)Unit

next

scala>df.withColumn("methodCalling",when($"name" === "method1",method1(df.first().getString(1),df.first().getDouble(2),df.first().getString(3))).otherwise(when($"name" === "method2", method2(df.first().getString(1),df.first().getString(2)))))

it will call method1 or method2 based on Json Object.