Low parallelism when running Apache Beam wordcount pipeline on Spark with Python SDK
Asked Answered
A

2

6

I am quite experienced with Spark cluster configuration and running Pyspark pipelines, but I'm just starting with Beam. So, I am trying to do an apple-to-apple comparison between Pyspark and the Beam python SDK on a Spark PortableRunner (running on top of the same small Spark cluster, 4 workers each with 4 cores and 8GB RAM), and I've settled on a wordcount job for a reasonably large dataset, storing the results in a Parquet table.

I have thus downloaded 50GB of Wikipedia text files, splitted across about 100 uncompressed files, and stored them in the directory /mnt/nfs_drive/wiki_files/ (/mnt/nfs_drive is a NFS drive mounted on all workers).

First, I am running the following Pyspark wordcount script:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')

The script runs perfectly well and outputs the parquet files at the desired location in about 8 minutes. The main stage (reading and splitting tokens) is divided in a reasonable number of tasks, so that the cluster is used efficiently: enter image description here

I am now trying to achieve the same with Beam and the portable runner. First, I have started the Spark job server (on the Spark master node) with the following command:

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077

Then, on the master and worker nodes, I am running the SDK Harness as follows:

 docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool

Now that the Spark cluster is set up to run Beam pipelines, I can submit the following script:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])


wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)


_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()

The code is submitted successfully, I can see the job on the Spark UI and the workers are executing it. However, even if left running for more than 1 hour, it does not produce any output!

I thus wanted to make sure that there is not a problem with my setup, so I've run the exact same script on a smaller dataset (just 1 Wiki file). This completes successfully in about 3.5 minutes (Spark wordcount on the same dataset takes 16s!).

I wondered how could Beam be that slower, so I started looking at the DAG submitted to Spark by the Beam pipeline via the job server. I noticed that the Spark job spends most of the time in the following stage: enter image description here

This is just splitted in 2 tasks, as shown here: enter image description here

Printing debugging lines show that this task is where the "heavy-lifting" (i.e. reading lines from the wiki files and splitting tokens) is performed - however, since this happens in 2 tasks only, the work will be distributed on 2 workers at most. What's also interesting is that running on the large 50GB dataset results on exactly the same DAG with exactly the same number of tasks.

I am quite unsure how to proceed further. It seems like the Beam pipeline has reduced parallelism, but I'm not sure if this is due to sub-optimal translation of the pipeline by the job server, or whether I should specify my PTransforms in some other way to increase the parallelism on Spark.

Any suggestion appreciated!

Amie answered 17/11, 2020 at 16:6 Comment(0)
A
1

It took a while, but I figured out what's the issue and a workaround.

The underlying problem is in Beam's portable runner, specifically where the Beam job is translated into a Spark job.

The translation code (executed by the job server) splits stages into tasks based on calls to sparkContext().defaultParallelism(). The job server does not configure default parallelism explicitly (and does not allow the user to set that through pipeline options), hence it falls back, in theory, to configure the parallelism based on the numbers of executors (see the explanation here https://spark.apache.org/docs/latest/configuration.html#execution-behavior). This seems to be the goal of the translation code when calling defaultParallelism().

Now, in practice, it is well known that, when relying on the fallback mechanism, calling sparkContext().defaultParallelism() too soon can result in lower numbers than expected since executors might not have registered with the context yet. In particular, calling defaultParallelism() too soon will give 2 as result, and stages will be split into 2 tasks only.

My "dirty hack" workaround thus consists in modifying the source code of the job server by simply adding a delay of 3 seconds after instantiating SparkContext and before doing anything else:

$ git diff                                                                                                                                                                                                                                                                                                                         v2.25.0
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
       conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
-      return new JavaSparkContext(conf);
+      JavaSparkContext jsc = new JavaSparkContext(conf);
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+      }
+      return jsc;
     }
   }
 }

After recompiling the job server and launching it with this change, all the calls to defaultParallelism() are done after the executors are registered, and the stages are nicely split in 16 tasks (same as numbers of executors). As expected, the job now completes much faster since there are many more workers doing the work (it is however still 3 times slower than the pure Spark wordcount).

While this works, it is of course not a great solution. A much better solution would be one of the following:

  • change the translation engine so that it can deduce the number of tasks based on the number of available executors in a more robust way;
  • allow the user to configure, via pipeline options, the default parallelism to be used by the job server for translating jobs (this is what's done by the Flink portable runner).

Until a better solution is in place, it clearly prevents any use of Beam Spark job server in a production cluster. I will post the issue to Beam's ticket queue so that a better solution can be implemented (hopefully soon).

Amie answered 7/1, 2021 at 16:59 Comment(1)
For future google searches... here is the Jira issue for providing a better fix in Beam: issues.apache.org/jira/browse/BEAM-11671Amie
D
2

The file IO part of the pipeline can be simplified by using apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*').

Fusion is another reason that could prevent parallelism. The solution is to throw in a apache_beam.transforms.util.Reshuffle after reading in all the files.

Dolabriform answered 22/11, 2020 at 1:7 Comment(4)
I agree about the first point, in fact I was previously using ReadFromText directly, but changed to the above approach in the hope that using MatchFiles first would improve parallelism. Regarding Reshuffle: I had given it a try but it didn't seem to help, but I will try again. However, forcing the reshuffle would move the data between workers...Amie
unfortunately, I changed the pipeline to beam_counts = p | beam.io.ReadFromText(textfile, validate=False, coder=CustomCoder()) | beam.Reshuffle() | beam.FlatMap(split_word) | beam.combiners.Count.PerElement() and I can confirm that neither changing to ReadFromText nor adding Reshuffle improves the parallelism of the pipeline.Amie
Can you plese confirm spark version: is it 2.x, 3? How do you launch it? maybe there is a small paralelize hidden out there.Physiography
Spark version is 2.4.6 (no support for 3.x in Beam yet). The spark cluster is launched manually in standalone mode before submitting any job, by using the default parallelism. I also suspect that there is some hidden low default parallelism, but it is certainly not in my cluster setup (the pure Spark job has high parallelism on the same cluster). As mentioned in #64488560, the job server configures its own spark context, so it might be there...Amie
A
1

It took a while, but I figured out what's the issue and a workaround.

The underlying problem is in Beam's portable runner, specifically where the Beam job is translated into a Spark job.

The translation code (executed by the job server) splits stages into tasks based on calls to sparkContext().defaultParallelism(). The job server does not configure default parallelism explicitly (and does not allow the user to set that through pipeline options), hence it falls back, in theory, to configure the parallelism based on the numbers of executors (see the explanation here https://spark.apache.org/docs/latest/configuration.html#execution-behavior). This seems to be the goal of the translation code when calling defaultParallelism().

Now, in practice, it is well known that, when relying on the fallback mechanism, calling sparkContext().defaultParallelism() too soon can result in lower numbers than expected since executors might not have registered with the context yet. In particular, calling defaultParallelism() too soon will give 2 as result, and stages will be split into 2 tasks only.

My "dirty hack" workaround thus consists in modifying the source code of the job server by simply adding a delay of 3 seconds after instantiating SparkContext and before doing anything else:

$ git diff                                                                                                                                                                                                                                                                                                                         v2.25.0
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index aa12192..faaa4d3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -95,7 +95,13 @@ public final class SparkContextFactory {
       conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", SparkRunnerKryoRegistrator.class.getName());
-      return new JavaSparkContext(conf);
+      JavaSparkContext jsc = new JavaSparkContext(conf);
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+      }
+      return jsc;
     }
   }
 }

After recompiling the job server and launching it with this change, all the calls to defaultParallelism() are done after the executors are registered, and the stages are nicely split in 16 tasks (same as numbers of executors). As expected, the job now completes much faster since there are many more workers doing the work (it is however still 3 times slower than the pure Spark wordcount).

While this works, it is of course not a great solution. A much better solution would be one of the following:

  • change the translation engine so that it can deduce the number of tasks based on the number of available executors in a more robust way;
  • allow the user to configure, via pipeline options, the default parallelism to be used by the job server for translating jobs (this is what's done by the Flink portable runner).

Until a better solution is in place, it clearly prevents any use of Beam Spark job server in a production cluster. I will post the issue to Beam's ticket queue so that a better solution can be implemented (hopefully soon).

Amie answered 7/1, 2021 at 16:59 Comment(1)
For future google searches... here is the Jira issue for providing a better fix in Beam: issues.apache.org/jira/browse/BEAM-11671Amie

© 2022 - 2024 — McMap. All rights reserved.