How to generate a DataFrame with random content and N rows?
Asked Answered
L

6

3

How can I create a Spark DataFrame in Scala with 100 rows and 3 columns that have random integer values in range (1, 100)?

I know how to create a DataFrame manually, but I cannot automate it:

val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3") 
Loach answered 7/2, 2018 at 8:38 Comment(0)
O
7

Here you go, Seq.fill is your friend:

def randomInt1to100 = scala.util.Random.nextInt(100)+1

val df = sc.parallelize(
  Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")
Overrefinement answered 7/2, 2018 at 8:51 Comment(2)
When I test it with 50000 rows and add .withColumn("id",monotonicallyIncreasingId), then id is created incorrectly. It arrives to 2000 and then it generates big numbers like 17179870015. Do you know why?Loach
@Loach this is how it works (read the docs). If you want an id from 1 to 50000 then use row_number (window-functions), but this is very inefficient.Overrefinement
L
12

Generating the data locally and then parallelizing it is totally fine, especially if you don't have to generate a lot of data.

However, should you ever need to generate a huge dataset, you can alway implement an RDD that does this for you in parallel, as in the following example.

import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD

// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
  def values: Iterator[A] = Iterator.fill(numValues)(random)
}

// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {

  // Based on the item and executor count, determine how many values are
  // computed in each executor. Distribute the rest evenly (if any).
  private val valuesPerSlice = numValues / numSlices
  private val slicesWithExtraItem = numValues % numSlices

  // Just ask the partition for the data
  override def compute(split: Partition, context: TaskContext): Iterator[A] =
    split.asInstanceOf[RandomPartition[A]].values

  // Generate the partitions so that the load is as evenly spread as possible
  // e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
  override protected def getPartitions: Array[Partition] =
    ((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice + 1, random)) ++
      (slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray

}

Once you have this you can use it passing your own random data generator to get an RDD[Int]

val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100) + 1)
rdd.foreach(println)
/*
 * outputs:
 * 30
 * 86
 * 75
 * 20
 * ...
 */

or an RDD[(Int, Int, Int)]

def rand = scala.util.Random.nextInt(100) + 1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
 * outputs:
 * (33,22,15)
 * (65,24,64)
 * (41,81,44)
 * (58,7,18)
 * ...
 */

and of course you can wrap it in a DataFrame very easily as well:

spark.createDataFrame(rdd).show()
/*
 * outputs:
 * +---+---+---+
 * | _1| _2| _3|
 * +---+---+---+
 * |100| 48| 92|
 * | 34| 40| 30|
 * | 98| 63| 61|
 * | 95| 17| 63|
 * | 68| 31| 34|
 * .............
 */

Notice how in this case the generated data is different every time the RDD/DataFrame is acted upon. By changing the implementation of RandomPartition to actually store the values instead of generating them on the fly, you can have a stable set of random items, while still retaining the flexibility and scalability of this approach.

One nice property of the stateless approach is that you can generate huge dataset even locally. The following ran in a few seconds on my laptop:

new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647
Lunar answered 7/2, 2018 at 12:45 Comment(1)
This gives a NotSerializableException: org.apache.spark.SparkContext. AFAIKT, you cant just construct an RDD directly because it requires explicitly passing the SparkContext. When I try doing this in Notebooks it just works. Not sure what the diff is.Alienee
O
7

Here you go, Seq.fill is your friend:

def randomInt1to100 = scala.util.Random.nextInt(100)+1

val df = sc.parallelize(
  Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
).toDF("col1", "col2", "col3")
Overrefinement answered 7/2, 2018 at 8:51 Comment(2)
When I test it with 50000 rows and add .withColumn("id",monotonicallyIncreasingId), then id is created incorrectly. It arrives to 2000 and then it generates big numbers like 17179870015. Do you know why?Loach
@Loach this is how it works (read the docs). If you want an id from 1 to 50000 then use row_number (window-functions), but this is very inefficient.Overrefinement
T
3

You can simply use scala.util.Random to generate the random numbers within range and loop for 100 rows and finally use createDataFrame api

import scala.util.Random
val data = 1 to 100 map(x =>  (1+Random.nextInt(100), 1+Random.nextInt(100), 1+Random.nextInt(100)))

sqlContext.createDataFrame(data).toDF("col1", "col2", "col3").show(false)
Tmesis answered 7/2, 2018 at 8:51 Comment(2)
It's not for Spark 2, right? How can I run sqlContext.createDataFrame... from shell?Loach
You will have to create sqlContext for that. val sqlContext = new SQLContext(sparkContext)Tmesis
G
2

You can use this below generic code

//no of rows required
val rows = 15
//no of columns required
val cols = 10

val spark = SparkSession.builder
  .master("local[*]")
  .appName("testApp")
  .config("spark.sql.warehouse.dir", "file:///c:/tmp/spark-warehouse")
  .getOrCreate()

import spark.implicits._

val columns = 1 to cols map (i => "col" + i)

// create the DataFrame schema with these columns (in that order)
val schema = StructType(columns.map(StructField(_, IntegerType)))

val lstrows = Seq.fill(rows * cols)(Random.nextInt(100) + 1).grouped(cols).toList.map { x => Row(x: _*) }

val rdd = spark.sparkContext.makeRDD(lstrows)
val df = spark.createDataFrame(rdd, schema)
Groceryman answered 7/2, 2018 at 11:28 Comment(0)
P
1

If you need to create a large amount of random data, Spark provides an object called RandomRDDs that can generate datasets filled with random numbers following a uniform, normal, or various other distributions.

https://spark.apache.org/docs/latest/mllib-statistics.html#random-data-generation

From their example:

import org.apache.spark.mllib.random.RandomRDDs._

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc, 1000000L, 10)
// Apply a transform to get a random double RDD following `N(1, 4)`.
val v = u.map(x => 1.0 + 2.0 * x)
Precondition answered 17/10, 2018 at 2:15 Comment(0)
L
0

There's already an implicit toDF since spark 1.6 that makes it simple to create a DataFrame from any Seq or Range

val spark: SparkSession = ???
import spark.sqlContext.implicits._

val df = Range(1, 100).toDF("n")

Once created you can discard the range column and select anything else you like.

val df = Range(1, 100).toDF().select(
  rand() as "x",
  when($"x" < 0.5, lit(true)) otherwise lit(false) as "y"
).persist()

Using a persist just to stop the random reevaluating.

Longitude answered 17/4 at 4:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.