Throttling a step in beam application
Asked Answered
Y

2

8

I'm using python beam on google dataflow, my pipeline looks like this:

Read image urls from file >> Download images >> Process images

The problem is that I can't let Download images step scale as much as it needs because my application can get blocked from the image server.

Is it a way that I can throttle the step ? Either on input or output per minute.

Thank you.

Yusuk answered 5/9, 2018 at 11:1 Comment(3)
That's an interesting question. I'll try to answer by Monday;)Scuppernong
@Xitrum Did you ever figure this out or did use the suggestion from below?Ogham
@Scuppernong Did you ever figure this out?Incalculable
E
3

One possibility, maybe naïve, is to introduce a sleep in the step. For this you need to know the maximum number of instances of the ParDo that can be running at the same time. If autoscalingAlgorithm is set to NONE you can obtain that from numWorkers and workerMachineType (DataflowPipelineOptions). Precisely, the effective rate will be divided by the total number of threads: desired_rate/(num_workers*num_threads(per worker)). The sleep time will be the inverse of that effective rate:

Integer desired_rate = 1; // QPS limit

if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }

if (options.getWorkerMachineType() != null) { 
    machine_type = options.getWorkerMachineType();
    num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }

Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);

Then you can use TimeUnit.SECONDS.sleep(sleep_time.intValue()); or equivalent inside the throttled Fn. In my example, as a use case, I wanted to read from a public file, parse out the empty lines and call the Natural Language Processing API with a maximum rate of 1 QPS (I initialized desired_rate to 1 previously):

p
    .apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
    .apply("NLP requests", ParDo.of(new ThrottledFn()))
    .apply("Write Lines", TextIO.write().to(options.getOutput()));

The rate-limited Fn is ThrottledFn, notice the sleep function:

static class ThrottledFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        // Instantiates a client
        try (LanguageServiceClient language = LanguageServiceClient.create()) {

          // The text to analyze
          String text = c.element();
          Document doc = Document.newBuilder()
              .setContent(text).setType(Type.PLAIN_TEXT).build();

          // Detects the sentiment of the text
          Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();                 
          String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());

          TimeUnit.SECONDS.sleep(sleep_time.intValue());

          Log.info(nlp_results);
          c.output(nlp_results);
        }
    }
}

With this I get a 1 element/s rate as seen in the image below and avoid hitting quota when using multiple workers, even if requests are not really spread out (you might get 8 simultaneous requests and then 8s sleep, etc.). This was just a test, possibly a better implemention would be using guava's rateLimiter.

enter image description here

If the pipeline is using autoscaling (THROUGHPUT_BASED) then it would be more complicated and the number of workers should be updated (for example, Stackdriver Monitoring has a job/current_num_vcpus metric). Other general considerations would be controlling the number of parallel ParDos by using a dummy GroupByKey or splitting the source with splitIntoBundles, etc. I'd like to see if there are other nicer solutions.

Effeminacy answered 11/9, 2018 at 15:49 Comment(1)
Just wanted to mention that tonight I tried doing this using Guava's rate limiter (by using the code published by Spotify for scio core). It worked pretty well, except for seeing some warning messages in the logs about a step not outputting for a long period of time (which makes sense). I'm not sure whether that could cause problems. See #69082998 for details.Garboard
S
0

Probably the most straightforward way to do this is to (temporarily) reduce the parallelism of your job. You can do it using the beam.Reshuffle transformation. In your example it would look like this:

    | "Read image urls from file" >> beam.Map(read_function)
    | "Reshuffle to throttle API calls" >> beam.Reshuffle(num_buckets=3)
    | "Download images" >> beam.Map(your_download_function)
    | "Process images"  >> beam.Map(your_process_function)

The argument that you pass to num_buckets should equal to the number of concurrent API calls you can afford to make.

You might need to add another Reshuffle after the "Download images" step (with a higher value of num_buckets) to restore the parallelism of your job and open up the bottleneck you created with the first Reshuffle.

Sundried answered 26/5, 2023 at 9:49 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.