how to introduce the schema in a Row in Spark?
Asked Answered
K

3

15

In the Row Java API there is a row.schema(), however there is not a row.set(StructType schema).

Also I tried to RowFactorie.create(objets), but I don't know how to proceed

UPDATE:

The problems is how to generate a new dataframe when I modify the structure in workers I put the example

DataFrame sentenceData = jsql.createDataFrame(jrdd, schema);
List<Row> resultRows2 = sentenceData.toJavaRDD()
            .map(new MyFunction<Row, Row>(parameters) {
            /** my map function **// 

                public Row call(Row row) {

                 // I want to change Row definition adding new columns
                    Row newRow = functionAddnewNewColumns (row);
                    StructType newSchema = functionGetNewSchema (row.schema);

                    // Here I want to insert the structure 

                    //
                    return newRow
                    }

                }

        }).collect();


JavaRDD<Row> jrdd = jsc.parallelize(resultRows);

// Here is the problema  I don't know how to get the new schema to create the   new modified dataframe

DataFrame newDataframe = jsql.createDataFrame(jrdd, newSchema);
Kylander answered 26/11, 2015 at 9:17 Comment(1)
Welcome to SO! Please share a MCVE so we can try to help. This is a very low quality question.Audit
A
32

You can create a row with Schema by using:

Row newRow = new GenericRowWithSchema(values, newSchema);
Ambulator answered 21/2, 2017 at 8:12 Comment(0)
R
4

You do not set a schema on a row - that makes no sense. You can, however, create a DataFrame (or pre-Spark 1.3 a JavaSchemaRDD) with a given schema using the sqlContext.

DataFrame dataFrame = sqlContext.createDataFrame(rowRDD, schema)

The dataframe will have the schema, you have provided.

For further information, please consult the documentation at http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

EDIT: According to updated question

Your can generate new rows in your map-function which will get you a new rdd of type JavaRDD<Row>

DataFrame sentenceData = jsql.createDataFrame(jrdd, schema);
JavaRDD<Row> newRowRDD = sentenceData
   .toJavaRDD()
   .map(row -> functionAddnewNewColumns(row)) // Assuming functionAddnewNewColumns returns a Row

You then define the new schema

StructField[] fields = new StructField[] {
   new StructField("column1",...),
   new StructField("column2",...),
   ...
};
StructType newSchema = new StructType(fields);

Create a new DataFrame from your rowRDD with newSchema as schema

DataFrame newDataframe = jsql.createDataFrame(newRowRDD, newSchema)
Redbreast answered 26/11, 2015 at 11:44 Comment(4)
thanks a lot, I just updated my question, my problem is that I am trying to modify the the schem of one dataframe but I don't know how to return the new schema to master to generate the new dataframe.Kylander
My problem is that I don't know the new schema after the function map(row -> functionAddnewNewColumns(row))... is inside the map function where is defined the newschema (worker), so this information is not available in general main scope of the application.Kylander
@Glennie "You do not set a schema on a row - that makes no sense. " -- why not? A Row has a schema: org.apache.spark.sql.Row.schema(). I don't see how it is fundamentally any different from the same question but with a DataFrame.Gyronny
Yes, a row has a schema, but it is inherited from the dataframe. You cannot have a dataframe containing rows with different schemas.Redbreast
C
1

This is a pretty old thread, but I just had a use case where I needed to generate data with Spark and quickly work with data on the row level and then build a new dataframe from the rows. Took me a bit to put it together so maybe it will help someone.

Here we're taking a "template" row, modifying some data, adding a new column with appropriate "row-level" schema and then using that new row and schema to create a new DF with appropriate "new schema", so going "bottom up" :) This is building on @Christian answer originally, so contributing a simplified snippet back.

def fillTemplateRow(row: Row, newUUID:String) = {
  var retSeq = Seq[Any]()
    (row.schema,row.toSeq).zipped.foreach(
      (s,r)=> {
        // println(s"s=${s},r=${r}")
        val retval = s.name match {
          case "uuid" => {
            newUUID
          }
          case _ => r
        }
        retSeq = retSeq :+ retval
      })

  var moreSchema = StructType(List(
    StructField("metadata_id", StringType, true)
  ))

  var newSchema = StructType(templateRow.schema ++ moreSchema)

  retSeq = retSeq :+ "newid"

  var retRow = new GenericRowWithSchema(
    retSeq.toArray,
    newSchema
  ): Row

  retRow
}

var newRow = fillTemplateRow(templateRow, "test-user-1")

var usersDF = spark.createDataFrame(
    spark.sparkContext.parallelize(Seq(newRow)),
    newRow.schema
)

usersDF.select($"uuid",$"metadata_id").show()
Cavalry answered 15/10, 2019 at 18:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.