Java Executor with throttling/throughput control
Asked Answered
H

6

35

I'm looking for a Java Executor that allows me to specify throttling/throughput/pacing limitations, for example, no more than say 100 tasks can be processed in a second -- if more tasks get submitted they should get queued and executed later. The main purpose of this is to avoid running into limits when hitting foreign APIs or servers.

I'm wondering whether either base Java (which I doubt, because I checked) or somewhere else reliable (e.g. Apache Commons) provides this, or if I have to write my own. Preferably something lightweight. I don't mind writing it myself, but if there's a "standard" version out there somewhere I'd at least like to look at it first.

Herbst answered 6/11, 2013 at 18:26 Comment(0)
V
41

Take a look at guavas RateLimiter:

A rate limiter. Conceptually, a rate limiter distributes permits at a configurable rate. Each acquire() blocks if necessary until a permit is available, and then takes it. Once acquired, permits need not be released. Rate limiters are often used to restrict the rate at which some physical or logical resource is accessed. This is in contrast to Semaphore which restricts the number of concurrent accesses instead of the rate (note though that concurrency and rate are closely related, e.g. see Little's Law).

Its threadsafe, but still @Beta. Might be worth a try anyway.

You would have to wrap each call to the Executor with respect to the rate limiter. For a more clean solution you could create some kind of wrapper for the ExecutorService.

From the javadoc:

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }
Vidovic answered 6/11, 2013 at 19:28 Comment(4)
Yes, thanks, this is exactly what I was looking for. The @Beta is annoying, but still this is good.Herbst
I have some concern the solution proposed. In the above example the rate limiter throttle the submit to the executor service, but then the thread pool applies it's own execution policy and may launch all task executions altogether.Corcovado
@Corcovado No, this situation won't come as we won't be submitting as tasks to executor in one go. rateLimiter.acquire(); // may wait will make a task to wait before it can be given to executor. So only "permitted" amount of tasks will be present in executor queueBhatt
But if the first tasks take longer than 0.5 sec each, the executor queue will build up and can be executed at a higher rate (if later tasks are faster), right? So this won't guarantee a max rate of 2 per second at all times, just on avarage..Faddish
E
8

The Java Executor doesn't offer such a limitation, only limitation by amount of threads, which is not what you are looking for.

In general the Executor is the wrong place to limit such actions anyway, it should be at the moment where the Thread tries to call the outside server. You can do this for example by having a limiting Semaphore that threads wait on before they submit their requests.

Calling Thread:

public void run() {
  // ...
  requestLimiter.acquire();
  connection.send();
  // ...
 }

While at the same time you schedule a (single) secondary thread to periodically (like every 60 seconds) releases acquired resources:

 public void run() {
  // ...
  requestLimiter.drainPermits();  // make sure not more than max are released by draining the Semaphore empty
  requestLimiter.release(MAX_NUM_REQUESTS);
  // ...
 }
Erikerika answered 6/11, 2013 at 18:42 Comment(2)
Thanks for the answer. You make a good case, but I'm not convinced that the Executor is the wrong place to impose these limits. The kind of logic you are describing is what I envision being encapsulated by the executor. Since I'm going to be using an executor anyway, I think being able to write something like new ThrottlingExecutor().setThrottlingRule(new MaxPerSecond(100)) would be a pretty clean solution.Herbst
I agree with mrip. If you use CompletableFuture the only place you can implement such rate limiting is inside the Executor.Thole
C
4

no more than say 100 tasks can be processed in a second -- if more tasks get submitted they should get queued and executed later

You need to look into Executors.newFixedThreadPool(int limit). This will allow you to limit the number of threads that can be executed simultaneously. If you submit more than one thread, they will be queued and executed later.

ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 =  threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);  
...  

Snippet above shows how you would work with an ExecutorService that allows no more than 100 threads to be executed simultaneously.

Update:
After going over the comments, here is what I have come up with (kinda stupid). How about manually keeping a track of threads that are to be executed ? How about storing them first in an ArrayList and then submitting them to the Executor based on how many threads have already been executed in the last one second.
So, lets say 200 tasks have been submitted into our maintained ArrayList, We can iterate and add 100 to the Executor. When a second passes, we can add few more threads based on how many have completed in theExecutor and so on

Chinachinaberry answered 6/11, 2013 at 18:30 Comment(3)
This doesn't address the OP's question. He's not asking how to limit the number of threads, he's asking how to limit the rate at which the Executor processes tasks via whatever mechanism.Twohanded
Thanks for the answer. The problem with fixed thread pools for my purposes is that (a) I don't need a lot of threads, probably only one and (b) it still doesn't limit the number of executions per second. Particularly in cases where the tasks are very fast to execute (e.g. ask a server for some data). Neither fixed thread pools nor Java's scheduled executors seem to provide this, at least not in the natural way.Herbst
@Herbst I apologize if I posted something that did not help you. Sorry for wasting your time. Please read the update and see if it helps,Chinachinaberry
S
1

Depending on the scenario, and as suggested in one of the previous responses, the basic functionalities of a ThreadPoolExecutor may do the trick.

But if the threadpool is shared by multiple clients and you want to throttle, to restrict the usage of each one of them, making sure that one client won't use all the threads, then a BoundedExecutor will do the work.

More details can be found in the following example:

http://jcip.net/listings/BoundedExecutor.java

Saviour answered 28/6, 2016 at 16:0 Comment(0)
F
0

Personally I found this scenario quite interesting. In my case, I wanted to stress that the interesting phase to throttle is the consuming side one, as in classical Producer/Consumer concurrent theory. That's the opposite of some of the suggested answers before. This is, we don't want to block the submitting thread, but block the consuming threads based in a rate (tasks/second) policy. So, even if there are tasks ready in the queue, executing/consuming Threads may block waiting to meet the throtle policy.

That said, I think a good candidate would be the Executors.newScheduledThreadPool(int corePoolSize). This way you would need a simple queue in front of the executor (a simple LinkedBlockingQueue would suit), and then schedule a periodic task to pick actual tasks from the queue (ScheduledExecutorService.scheduleAtFixedRate). So, is not an straightforward solution, but it should perform goog enough if you try to throttle the consumers as discussed before.

Foe answered 15/10, 2015 at 16:16 Comment(0)
S
0

Can limit it inside Runnable:

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Take from JAVA Thread Debounce and Throttle

Sundries answered 20/9, 2019 at 1:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.