I am trying to make data preparation using pyspark involving among others steps such as string indexing, one hot encoding and quantile discretising. My data frame has quite many columns (1 thousand columns including 500 intervals columns, 250 categorical and 250 binaries) at 1 million rows.
My observations is that some data transformation are much, much slower than others. As in the summary below, some steps last even around 3 hours while others took just couple minutes.
Step (execution time):
- Log10 transformation for all interval variables (00:02:22)
- Random data partitioning of data frame (00:02:48)
- Quantile discretization and vector assembling for intervals (02:31:37)
- One hot encoding and vector assembling for categoricals (03:13:51)
- String indexing and vector assembling for binaries (03:17:34)
It seems that worst performing steps are string indexing, one hot encoding, quantile discretising or vector assembler.
Would You please suggest me what should I check or adjust in my spark configuration or code to improve these steps performance significantly?
I used for the above features engineering steps methods QuantileDiscretizer, VectorAssembler, OneHotEncoder, StringIndexer from Pyspark.ml.feature. I am sure that data were completely uploaded into cluster memory (persist(StorageLevel.MEMORY_ONLY)).
My cluster contains 7 nodes (each 4 cores and 16GB RAM). Spark version is 2.2. Pyspark is used.
Spark configuration applied:
- spark.serializer = org.apache.spark.serializer.KryoSerializer
- spark.kryo.unsafe = true
- spark.rdd.compress = false
- master = yarn
- deploy-mode = cluster
- spark.driver.cores=4
- driver-memory = 4G
- num-executors = 6
- executor-memory = 10G
- executor-cores = 4
- spark.yarn.maxAppAttempts=1
- spark.sql.cbo.enabled=true
- spark.sql.constraintPropagation.enabled=false