How to convert Row to json in Spark 2 Scala

2019-01-24 11:15发布

Is there a simple way to converting a given Row object to json?

Found this about converting a whole Dataframe to json output: Spark Row to JSON

But I just want to convert a one Row to json. Here is pseudo code for what I am trying to do.

More precisely I am reading json as input in a Dataframe. I am producing a new output that is mainly based on columns, but with one json field for all the info that does not fit into the columns.

My question what is the easiest way to write this function: convertRowToJson()

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom's Solutions:

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

only works if the Row only has one level not with nested Row. This is the schema:

StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

Also tried Artem suggestion, but that did not compile:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}

7条回答
叼着烟拽天下
2楼-- · 2019-01-24 12:15

I combining the suggestion from: Artem, KiranM and Psidom. Did a lot of trails and error and came up with this solutions that I tested for nested structures:

def row2Json(row: Row, sqlContext: SQLContext): String = {
  import sqlContext.implicits
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataframe = sqlContext.createDataFrame(rowRDD, row.schema)
  dataframe.toJSON.first
}

This solution worked, but only while running in driver mode.

查看更多
登录 后发表回答