how to introduce the schema in a Row in Spark?

2020-07-03 08:24发布

In the Row Java API there is a row.schema(), however there is not a row.set(StructType schema).

Also I tried to RowFactorie.create(objets), but I don't know how to proceed

UPDATE:

The problems is how to generate a new dataframe when I modify the structure in workers I put the example

DataFrame sentenceData = jsql.createDataFrame(jrdd, schema);
List<Row> resultRows2 = sentenceData.toJavaRDD()
            .map(new MyFunction<Row, Row>(parameters) {
            /** my map function **// 

                public Row call(Row row) {

                 // I want to change Row definition adding new columns
                    Row newRow = functionAddnewNewColumns (row);
                    StructType newSchema = functionGetNewSchema (row.schema);

                    // Here I want to insert the structure 

                    //
                    return newRow
                    }

                }

        }).collect();


JavaRDD<Row> jrdd = jsc.parallelize(resultRows);

// Here is the problema  I don't know how to get the new schema to create the   new modified dataframe

DataFrame newDataframe = jsql.createDataFrame(jrdd, newSchema);

3条回答
虎瘦雄心在
2楼-- · 2020-07-03 08:34

You can create a row with Schema by using:

Row newRow = new GenericRowWithSchema(values, newSchema);
查看更多
再贱就再见
3楼-- · 2020-07-03 08:53

This is a pretty old thread, but I just had a use case where I needed to generate data with Spark and quickly work with data on the row level and then build a new dataframe from the rows. Took me a bit to put it together so maybe it will help someone.

Here we're taking a "template" row, modifying some data, adding a new column with appropriate "row-level" schema and then using that new row and schema to create a new DF with appropriate "new schema", so going "bottom up" :) This is building on @Christian answer originally, so contributing a simplified snippet back.

def fillTemplateRow(row: Row, newUUID:String) = {
  var retSeq = Seq[Any]()
    (row.schema,row.toSeq).zipped.foreach(
      (s,r)=> {
        // println(s"s=${s},r=${r}")
        val retval = s.name match {
          case "uuid" => {
            newUUID
          }
          case _ => r
        }
        retSeq = retSeq :+ retval
      })

  var moreSchema = StructType(List(
    StructField("metadata_id", StringType, true)
  ))

  var newSchema = StructType(templateRow.schema ++ moreSchema)

  retSeq = retSeq :+ "newid"

  var retRow = new GenericRowWithSchema(
    retSeq.toArray,
    newSchema
  ): Row

  retRow
}

var newRow = fillTemplateRow(templateRow, "test-user-1")

var usersDF = spark.createDataFrame(
    spark.sparkContext.parallelize(Seq(newRow)),
    newRow.schema
)

usersDF.select($"uuid",$"metadata_id").show()
查看更多
神经病院院长
4楼-- · 2020-07-03 08:54

You do not set a schema on a row - that makes no sense. You can, however, create a DataFrame (or pre-Spark 1.3 a JavaSchemaRDD) with a given schema using the sqlContext.

DataFrame dataFrame = sqlContext.createDataFrame(rowRDD, schema)

The dataframe will have the schema, you have provided.

For further information, please consult the documentation at http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

EDIT: According to updated question

Your can generate new rows in your map-function which will get you a new rdd of type JavaRDD<Row>

DataFrame sentenceData = jsql.createDataFrame(jrdd, schema);
JavaRDD<Row> newRowRDD = sentenceData
   .toJavaRDD()
   .map(row -> functionAddnewNewColumns(row)) // Assuming functionAddnewNewColumns returns a Row

You then define the new schema

StructField[] fields = new StructField[] {
   new StructField("column1",...),
   new StructField("column2",...),
   ...
};
StructType newSchema = new StructType(fields);

Create a new DataFrame from your rowRDD with newSchema as schema

DataFrame newDataframe = jsql.createDataFrame(newRowRDD, newSchema)
查看更多
登录 后发表回答