Replicate deferred/async launch policies from C++ in Java
Asked Answered
S

4

6

In C++ you can start a thread with a deferred or asynchronous launch policy. Is there a way to replicate this functionality in Java?

auto T1 = std::async(std::launch::deferred, doSomething());
auto T2 = std::async(std::launch::async, doSomething()); 

Descriptions of each--

Asynchronous:

If the async flag is set, then async executes the callable object f on a new thread of execution (with all thread-locals initialized) except that if the function f returns a value or throws an exception, it is stored in the shared state accessible through the std::future that async returns to the caller.

Deferred:

If the deferred flag is set, then async converts f and args... the same way as by std::thread constructor, but does not spawn a new thread of execution. Instead, lazy evaluation is performed: the first call to a non-timed wait function on the std::future that async returned to the caller will cause the copy of f to be invoked (as an rvalue) with the copies of args... (also passed as rvalues) in the current thread (which does not have to be the thread that originally called std::async). The result or exception is placed in the shared state associated with the future and only then it is made ready. All further accesses to the same std::future will return the result immediately.

See the documentation for details.

Sporting answered 26/1, 2021 at 20:52 Comment(6)
I suggest you edit your question to describe what the behavior is that you want in Java. Someone may be able to better answer this question without knowing C++ if you can explain that. Otherwise, it's possible that deferred/async launch policies might not be ubiquitous terminology that also exists in Java, which would require someone to be familiar with both languages.Donatello
For deferred just create a Supplier or Runnable and call it when you need it. Wrap a fake CompletableFuture around it if you want a future like C++ gives you.Hospitality
As a note, Id suggest you look into modern multithreading in Java. You should not bother about those thread details. Just launch your tasks against a ScheduledExecutorService and create a task pipeline on the CompletableFuture objects, as in "do this, then that, when its done do that, afterwards this, ...".Hospitality
@Hospitality That's the type of solution I was expecting; you should post this as a proper answer. Java has a crazy amount of threading utilities (especially compared to C++), and the idiomatic way to write clean Java code does not map nicely to idiomatic C++Donatello
If idiomatic code is what you want, you should have asked for it. Trying to translate these C++ idioms exactly is really difficult and probably not needed anyway.Infix
That's not what "thread" means. std::async(policy, f, ...) arranges for the function f to be called "asynchronously" on the given args. How and when the call actually happens depends on the policy. The concept of threads belongs to a lower abstraction level. If you're looking for a name that represents the act of calling a particular f on a particular set of args, a better word for that is "task."Colb
H
5

Future

First of all, we have to observe that std::async is a tool to execute a given task and return a std::future object that holds the result of the computation once its available.

For example we can call result.get() to block and wait for the result to arrive. Also, when the computation encountered an exception, it will be stored and rethrown to us as soon as we call result.get().

Java provides similar classes, the interface is Future and the most relevant implementation is CompletableFuture.

std::future#get translates roughly to Future#get. Even the exceptional behavior is very similar. While C++ rethrows the exception upon calling get, Java will throw a ExecutionException which has the original exception set as cause.


How to obtain a Future?

In C++ you create your future object using std::async. In Java you could use one of the many static helper methods in CompletableFuture. In your case, the most relevant are

So in order to create a future that just prints Hello World!, you could for example do

CompletableFuture<Void> task = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
/*...*/
task.get();

Java not only has lambdas but also method references. Lets say you have a method that computes a heavy math task:

class MyMath {
    static int compute() {
        // Very heavy, duh
        return (int) Math.pow(2, 5);
    }
}

Then you could create a future that returns the result once its available as

CompletableFuture<Integer> task = CompletableFuture.runAsync(MyMath::compute);
/*...*/
Integer result = task.get();

async vs deferred

In C++, you have the option to specify a launch policy which dictates the threading behavior for the task. Let us put the memory promises C++ makes aside, because in Java you do not have that much control over memory.

The differences are that async will immediately schedule creation of a thread and execute the task in that thread. The result will be available at some point and is computed while you can continue work in your main task. The exact details whether it is a new thread or a cached thread depend on the compiler and are not specified.

deferred behaves completely different to that. Basically nothing happens when you call std::async, no extra thread will be created and the task will not be computed yet. The result will not be made available in the meantime at all. However, as soon as you call get, the task will be computed in your current thread and return a result. Basically as if you would have called the method directly yourself, without any async utilities at all.


std::launch::async in Java

That said, lets focus on how to translate this behavior to Java. Lets start with async.

This is the simple one, as it is basically the default and intended behavior offered in CompletableFuture. So you just do runAsync or supplyAsync, depending on whether your method returns a result or not. Let me show the previous examples again:

// without result
CompletableFuture<Void> task = CompletableFuture.runAsync(() -> System.out.println("Hello World!"));
/*...*/ // the task is computed in the meantime in a different thread
task.get();

// with result
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(MyMath::compute);
/*...*/
Integer result = task.get();

Note that there are also overloads of the methods that except an Executor which can be used if you have your own thread pool and want CompletableFuture to use that instead of its own (see here for more details).


std::launch::deferred in Java

I tried around a lot to mock this behavior with CompletableFuture but it does not seem to be possibly without creating your own implementation (please correct me if I am wrong though). No matter what, it either executes directly upon creation or not at all.

So I would just propose to use the underlying task interface that you gave to CompletableFuture, for example Runnable or Supplier, directly. In our case, we might also use IntSupplier to avoid the autoboxing.

Here are the two code examples again, but this time with deferred behavior:

// without result
Runnable task = () -> System.out.println("Hello World!");
/*...*/ // the task is not computed in the meantime, no threads involved
task.run(); // the task is computed now

// with result
IntSupplier task = MyMath::compute;
/*...*/
int result = task.getAsInt();

Modern multithreading in Java

As a final note I would like to give you a better idea how multithreading is typically used in Java nowadays. The provided facilities are much richer than what C++ offers by default.

Ideally should design your system in a way that you do not have to care about such little threading details. You create an automatically managed dynamic thread pool using Executors and then launch your initial task against that (or use the default executor service provided by CompletableFuture). After that, you just setup an operation pipeline on the future object, similar to the Stream API and then just wait on the final future object.

For example, let us suppose you have a list of file names List<String> fileNames and you want to

  1. read the file
  2. validate its content, skip it if its invalid
  3. compress the file
  4. upload the file to some web server
  5. check the response status code

and count how many where invalid, not successfull and successfull. Suppose you have some methods like

class FileUploader {
    static byte[] readFile(String name) { /*...*/ }
    static byte[] requireValid(byte[] content) throws IllegalStateException { /*...*/ }
    static byte[] compressContent(byte[] content) { /*...*/ }
    static int uploadContent(byte[] content) { /*...*/ }
}

then we can do so easily by

AtomicInteger successfull = new AtomicInteger();
AtomicInteger notSuccessfull = new AtomicInteger();
AtomicInteger invalid = new AtomicInteger();

// Setup the pipeline
List<CompletableFuture<Void>> tasks = fileNames.stream()
    .map(name -> CompletableFuture
        .completedFuture(name)
        .thenApplyAsync(FileUploader::readFile)
        .thenApplyAsync(FileUploader::requireValid)
        .thenApplyAsync(FileUploader::compressContent)
        .thenApplyAsync(FileUploader::uploadContent)
        .handleAsync((statusCode, exception) -> {
            AtomicInteger counter;
            if (exception == null) {
                counter = statusCode == 200 ? successfull : notSuccessfull;
            } else {
                counter = invalid;
            }
            counter.incrementAndGet();
        })
    ).collect(Collectors.toList());

// Wait until all tasks are done
tasks.forEach(CompletableFuture::join);

// Print the results
System.out.printf("Successfull %d, not successfull %d, invalid %d%n", successfull.get(), notSuccessfull.get(), invalid.get());

The huge benefit of this is that it will reach max throughput and use all hardware capacity offered by your system. All tasks are executed completely dynamic and independent, managed by an automatic pool of threads. And you just wait until everything is done.

Hospitality answered 27/1, 2021 at 9:3 Comment(0)
B
1

For asynchronous launch of a thread, in modern Java prefer the use of a high-level java.util.concurrent.ExecutorService.

One way to obtain an ExecutorService is through java.util.concurrent.Executors. Different behaviors are available for ExecutorServices; the Executors class provides methods for some common cases.

Once you have an ExecutorService, you can submit Runnables and Callables to it.

Future<MyReturnValue> myFuture = myExecutorService.submit(myTask);
Bultman answered 26/1, 2021 at 21:47 Comment(0)
H
1

If I understood you correctly, may be something like this:

private static CompletableFuture<Void> deferred(Runnable run) {
   CompletableFuture<Void> future = new CompletableFuture<>();
   future.thenRun(run);
   return future;
}

private static CompletableFuture<Void> async(Runnable run) {
    return CompletableFuture.runAsync(run);
}

And then using them like:

public static void main(String[] args)  throws Exception {
    CompletableFuture<Void> def = deferred(() -> System.out.println("run"));
    def.complete(null);
    System.out.println(def.join());

    CompletableFuture<Void> async = async(() -> System.out.println("run async"));
    async.join();
}
Holder answered 26/1, 2021 at 22:19 Comment(7)
Actually, the deferred example does not work as expected. The complete(null) has to be moved into the deferred method, before calling thenRun to complete the first, incomplete stage.Hospitality
At which point CompletableFuture.completedFuture(null) is probably a better alternative.Hospitality
Actually, seems that it is then executed immediatly and not on the get() call. It seems to be surprisingly different to defer the execution until the get() call.Hospitality
@Hospitality I am confused about your points? May be its me here, but can you make may be your point a bit clearer? ThxHolder
Sure, sorry. I tried around a bit and executed it and I did not manage to setup a CompletableFuture that executes at the moment you call get or join (and in the same thread). Your trick was to setup an incomplete stage to delay execution of the second stage and then complete the stage later on. But your example will then execute upon the complete call, not on the get/join (put some sleeps in between). Also, this technique would not be of much use if the function provides a result, as complete would override the result - you would have to capture the first stage explicitly.Hospitality
At this point I am just curious whether it is even possible to setup a CompletableFuture that executes its pipeline exactly at the point where you call get/join, without extending the class and overriding behavior.Hospitality
@Hospitality yes sir, you are absolutely correct. This is a "dirty" trick that will work only for Runnables, and only when both complete and join/get will be called. Otherwise, I am on the same page with your answer and Runnable.Holder
I
1

To get something like a deferred thread, you might try running a thread at a reduced priority.

First, in Java it's often idiomatic to make a task using a Runnable first. You can also use the Callable<T> interface, which allows the thread to return a value (Runnable can't).

public class MyTask implements Runnable {
  @Override
  public void run() {
    System.out.println( "hello thread." );
  }
}

Then just create a thread. In Java threads normally wrap the task they execute.

MyTask myTask = new MyTask();
Thread t = new Tread( myTask );
t.setPriority( Thread.currentThread().getPriority()-1 );
t.start();

This should not run until there is a core available to do so, which means it shouldn't run until the current thread is blocked or run out of things to do. However you're at the mercy of the OS scheduler here, so the specific operation is not guaranteed. Most OSs will guarantee that all threads run eventually, so if the current thread takes a long time with out blocking the OSs will start it executing anyway.

setPriority() can throw a security exception if you're not allowed to set the priority of a thread (uncommon but possible). So just be aware of that minor inconvenience.

For an asynch task with a Future I would use an executor service. The helper methods in the class Executors are a convenient way to do this.

First make your task as before.

public class MyCallable implements Callable<String> {
  @Override
  public String call() {
    return "hello future thread.";
  }
}

Then use an executor service to run it:

MyCallable myCallable = new MyCallable();
ExecutorService es = Executors.newCachedThreadPool();
Future<String> f = es.submit( myCallable );

You can use the Future object to query the thread, determine its running status and get the value it returns. You will need to shutdown the executor to stop all of its threads before exiting the JVM.

es.shutdown();

I've tried to write this code as simply as possible, without the use of lambdas or clever use of generics. The above should show you what those lambdas are actually implementing. However it's usually considered better to be a bit more sophisticated when writing code (and a bit less verbose) so you should investigate other syntax once you feel you understand the above.

Infix answered 26/1, 2021 at 23:21 Comment(1)
Please read the definition of C++ std::launch::deferred. It is not involving any extra threads. It just lazily executes the method at the moment you call get/join on the returned future, in the callers thread. Basically as if you would have just called the method directly yourself instead of using a future with get/join. So your answer, while correct, does not apply to OPs question.Hospitality

© 2022 - 2024 — McMap. All rights reserved.