In java How to migrate from Executors.newFixedThreadPool(MAX_THREAD_COUNT()) to Virtual Thread
Asked Answered
M

2

6

Motivation: Trying to migrate Virtual Threads.

Problem: Even though Virtual Threads are cheap, OS may find it suspicious for some processes to be stacked up at the same time, like searching for IP or ports on the network.

I am using the below code to limit the creation of threads. TS_NetworkIPUtils TS_NetworkPortUtils

var executor = useVirtualThread
                    ? Executors.newVirtualThreadPerTaskExecutor()
                    : Executors.newFixedThreadPool(MAX_THREAD_COUNT());

Is it possible to create an executor service for creating virtual threads and having a limiting feature at the same time?

Mcadams answered 4/2, 2024 at 21:46 Comment(11)
Do you want to limit the number of simultaneous tasks the Executor accepts, or do you want to limit the number of platform threads used by the virtual threads? (I doubt the latter is possible, and even if it were, it would be unwise.)Rumpus
@Rumpus I do not want to limit threads used by virtual threads, just simultaneous tasks.Mcadams
I think, rather then delegating the work to ExecutorService, combination of Semaphore and StructuredTaskScope is the new way to go.Mcadams
If you want to limit simultaneous tasks, then Semaphore might be a way to go. Also discussed here. Structured Concurrency always looked to me as a completely different ideology, which just ... "piggybacks" on virtual threads, but you know better your use cases...Galatia
It is slightly difficult to read code that forks into custom libraries and understand your goals. It would be really nice if you could come with an MRE that would explain your problem. BTW, isn't Executors.newFixedThreadPool(MAX_THREAD_COUNT(), Thread.ofVirtual().factory()); what you are looking for?Galatia
@Galatia Never pool virtual threads, per documentation.Dimeter
@TugalsanKarabacak You need a counting semaphore, not a binary semaphore. See my Answer, and see example tutorial by Cay Horstmann.Dimeter
@Basil Bourque Correct. I am working on it.Mcadams
@Galatia Yes, "Executors.newFixedThreadPool(MAX_THREAD_COUNT(), Thread.ofVirtual().factory());" might be the simplest solution, but noone likes it (per documentation).Mcadams
@BasilBourque, I am aware that it is not advised to pool virtual threads because they are cheap, just makes no sense. But has it ever been said that they must not be pooled? Probably because they might retain some state from previous use or due to some performance considerations? If there is no prohibition on reusing virtual threads, then I don't see anything wrong with pooling them if and only if it is desirable to restrict the amount of virtual threads simultaneously accessing some resource like in the case of OP.Galatia
I have updated my Thread helper (TS_ThreadAsyncAwait) class to support Semaphore. This is the shortest general purpose code, I have come up with. github.com/tugalsan/com.tugalsan.blg.semaphore/blob/main/src/…Mcadams
D
2

tl;dr

Is it possible to create an executor service for creating virtual threads and having limiting feature at the same time?

Yes.

Pile up many virtual threads, blocked while waiting for a counting semaphore to be released by any of the limited number of currently-executing tasks.

Details

I am using below code to limit the creation of threads. TS_NetworkIPUtils

No, do not limit the number of threads. Virtual threads are by design extremely “cheap”, taking little in the way system resources (memory, CPU) by default. You likely can support millions of virtual threads at a time.

When your tasks are expensive, you may need to limit the amount of simultaneous tasks. Go ahead and stack up many threads, but block their execution to wait upon a certain number of concurrent tasks busy executing. Blocking on virtual threads is extremely cheap (their raison d’être), so no problem if they pile up.

Use a counting semaphore to limit simultaneous tasks

You can use a counting semaphore to limit the number of simultaneous tasks running in virtual threads. See section Rate Limiting in tutorial Virtual Threads by Cay Horstmann.

Something like the following example. We submit 15 tasks, but limit to only 5 or less executing simultaneously. The new Semaphore ( 5 ) provides a bucket of five permits that may acquired and released. The permits are like a total of five copies of a book being borrowed from a library and eventually returned for someone else to borrow — lots of borrowers, but only five at a time.

Notice how the later tasks are waiting several seconds for a permit from our counting semaphore.

Finally, after all the submitted tasks are done, we exit our try-with-resources syntax block.

(Caveat: I just whipped out this code example, without much thought behind it. I was shooting for something simpler than the example seen in the linked Cay Horstmann tutorial.)

package work.basil.example.threading;

import java.time.Duration;
import java.time.Instant;
import java.util.SequencedCollection;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

public class LimitTasks
{
    public static void main ( String[] args )
    {
        LimitTasks app = new LimitTasks ( );
        app.demo ( );
    }

    private void demo ( )
    {
        System.out.println ( "Demo start. " + Instant.now ( ) );

        SequencedCollection < String > log = new CopyOnWriteArrayList <> ( );  // System.out.println calls across threads do *NOT* necessarily appear on console in chronological order.
        final Semaphore TASKS_PERMITS = new Semaphore ( 5 );
        log.add ( "Permits available: " + TASKS_PERMITS.availablePermits ( ) );
        try (
                ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
        )
        {
            for ( int nthTask = 1 ; nthTask <= 15 ; nthTask++ )
            {
                executorService.submit ( new PrintIt ( nthTask , TASKS_PERMITS , log ) );
            }
        }
        log.add ( "After try-with-resources on ExecutorService. " + Instant.now ( ) );
        log.forEach ( System.out :: println );
        System.out.println ( "Demo done. " + Instant.now ( ) );
    }
}

class PrintIt implements Runnable
{
    private final int label;
    private final Semaphore tasksPermits;
    private final SequencedCollection < String > log;

    public PrintIt ( final int label , final Semaphore tasksPermits , final SequencedCollection < String > log )
    {
        this.label = label;
        this.tasksPermits = tasksPermits;
        this.log = log;
    }

    @Override
    public void run ( )
    {
        this.log.add ( "nthTask " + label + ". Approximately " + this.tasksPermits.getQueueLength ( ) + " tasks waiting for a permit. " + Instant.now ( ) );
        long startNanos = System.nanoTime ( );
        try
        {
            this.tasksPermits.acquire ( );
            long elapsed = System.nanoTime ( ) - startNanos ;
            this.log.add ( "nthTask " + label + " waited " + Duration.ofNanos ( elapsed ) + " for a permit. Permits available: " + this.tasksPermits.availablePermits ( ) + ". " + Instant.now ( ) );
            long low = Duration.ofSeconds ( 2 ).toNanos ( );
            long high = Duration.ofSeconds ( 10 ).toNanos ( );
            long random = ThreadLocalRandom.current ( ).nextLong ( low , high );
            LockSupport.parkNanos ( random );
            this.log.add ( "nthTask " + label + " running at " + Instant.now ( ) + ". Permits available: " + this.tasksPermits.availablePermits ( ) + "." );
        } catch ( InterruptedException e )
        {
            throw new RuntimeException ( e );
        } finally
        {
            this.tasksPermits.release ( );
        }
    }
}

When run on Java 21 on a Mac with Apple Silicon from IntelliJ IDEA 2023.3.3 (Ultimate Edition).

Demo start. 2024-02-06T04:42:12.414617Z
Permits available: 5
nthTask 3. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425896Z
nthTask 4. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425813Z
nthTask 7. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.427279Z
nthTask 1. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425813Z
nthTask 2. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425813Z
nthTask 9. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.428066Z
nthTask 10. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.428092Z
nthTask 8. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.427894Z
nthTask 5. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425983Z
nthTask 6. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.426432Z
nthTask 11. Approximately 5 tasks waiting for a permit. 2024-02-06T04:42:12.435172Z
nthTask 12. Approximately 5 tasks waiting for a permit. 2024-02-06T04:42:12.435178Z
nthTask 13. Approximately 7 tasks waiting for a permit. 2024-02-06T04:42:12.435214Z
nthTask 14. Approximately 7 tasks waiting for a permit. 2024-02-06T04:42:12.435225Z
nthTask 15. Approximately 9 tasks waiting for a permit. 2024-02-06T04:42:12.435261Z
nthTask 3 waited PT0.0088205S for a permit. Permits available: 0. 2024-02-06T04:42:12.434720Z
nthTask 2 waited PT0.008820167S for a permit. Permits available: 1. 2024-02-06T04:42:12.434640Z
nthTask 4 waited PT0.008743834S for a permit. Permits available: 1. 2024-02-06T04:42:12.434705Z
nthTask 7 waited PT0.007267583S for a permit. Permits available: 1. 2024-02-06T04:42:12.434666Z
nthTask 1 waited PT0.008801666S for a permit. Permits available: 2. 2024-02-06T04:42:12.434619Z
nthTask 2 running at 2024-02-06T04:42:15.306355Z. Permits available: 0.
nthTask 9 waited PT2.887408834S for a permit. Permits available: 0. 2024-02-06T04:42:15.315471Z
nthTask 3 running at 2024-02-06T04:42:17.420189Z. Permits available: 0.
nthTask 10 waited PT4.9929405S for a permit. Permits available: 0. 2024-02-06T04:42:17.421040Z
nthTask 7 running at 2024-02-06T04:42:17.920249Z. Permits available: 0.
nthTask 8 waited PT5.493109666S for a permit. Permits available: 0. 2024-02-06T04:42:17.921006Z
nthTask 4 running at 2024-02-06T04:42:18.204203Z. Permits available: 0.
nthTask 5 waited PT5.779108792S for a permit. Permits available: 0. 2024-02-06T04:42:18.205082Z
nthTask 9 running at 2024-02-06T04:42:19.479390Z. Permits available: 0.
nthTask 6 waited PT7.053674S for a permit. Permits available: 0. 2024-02-06T04:42:19.480079Z
nthTask 1 running at 2024-02-06T04:42:19.800778Z. Permits available: 0.
nthTask 11 waited PT7.366630625S for a permit. Permits available: 0. 2024-02-06T04:42:19.801791Z
nthTask 10 running at 2024-02-06T04:42:20.015210Z. Permits available: 0.
nthTask 12 waited PT7.580567167S for a permit. Permits available: 0. 2024-02-06T04:42:20.015699Z
nthTask 11 running at 2024-02-06T04:42:24.640389Z. Permits available: 0.
nthTask 13 waited PT12.205975708S for a permit. Permits available: 0. 2024-02-06T04:42:24.641142Z
nthTask 8 running at 2024-02-06T04:42:26.301821Z. Permits available: 0.
nthTask 14 waited PT13.867504542S for a permit. Permits available: 0. 2024-02-06T04:42:26.302663Z
nthTask 12 running at 2024-02-06T04:42:26.407675Z. Permits available: 0.
nthTask 15 waited PT13.972995459S for a permit. Permits available: 0. 2024-02-06T04:42:26.408182Z
nthTask 5 running at 2024-02-06T04:42:27.728516Z. Permits available: 0.
nthTask 6 running at 2024-02-06T04:42:28.044373Z. Permits available: 1.
nthTask 15 running at 2024-02-06T04:42:29.974056Z. Permits available: 2.
nthTask 13 running at 2024-02-06T04:42:30.613859Z. Permits available: 3.
nthTask 14 running at 2024-02-06T04:42:33.088984Z. Permits available: 4.
After try-with-resources on ExecutorService. 2024-02-06T04:42:33.089508Z
Demo done. 2024-02-06T04:42:33.094512Z

Process finished with exit code 0

For more info on virtual threads, see the fascinating video talks by Ron Pressler, Alan Bateman, or José Paumard. And read the JEP 444.

Dimeter answered 6/2, 2024 at 3:52 Comment(0)
G
6

While the best way to limit a number of virtual threads concurrently accessing a limited resource is a Semaphore, recommended in a part Use Semaphores to Limit Concurrency of Virtual Threads manual, yet another solution, a drop-in replacement of

Executors.newFixedThreadPool(MAX_THREAD_COUNT());

construct, when migrating to virtual threads, is perfectly legal:

Executors.newFixedThreadPool(MAX_THREAD_COUNT(), Thread.ofVirtual().factory());

Although, a part Represent Every Concurrent Task as a Virtual Thread; Never Pool Virtual Threads of this manual indeed advises against pooling of virtual threads, main reason behind it is the fact that virtual threads are inexpensive, therefore their pooling makes no sense, at the same time incurring additional expenses on synchronization like organizing a queue etc. Moreover, the part Use Semaphores to Limit Concurrency, comparing Semaphore and thread pool methods, reads:

Submitting tasks to a thread pool queues them up for later execution, but the semaphore internally (or any other blocking synchronization construct for that matter) creates a queue of threads that are blocked on it that mirrors the queue of tasks waiting for a pooled thread to execute them. Because virtual threads are tasks, the resulting structure is equivalent.

In other words, synchronization expenses in Semaphore and thread pool solutions are approximately the same.

The other question that may arise about virtual threads pooling is their reusability. Indeed, if a virtual thread that ran one Runnable/Callable task, cannot correctly run next one, because of, for example, some leftover state or due to performance losses associated with re-tasking, then a pool of virtual threads is downright harmful. However, no any virtual threads' documentation, JEP, or manual says that virtual threads cannot be reused. Instead, they insist that virtual threads are just Java threads, only they are so cheap that their reusing makes no sense.

Having said that 1) synchronization expenses are not greater in a thread pool than with Semaphore and 2) reusing virtual threads is not prohibited, we could conclude that a fixed pool of virtual threads can be a viable alternative to a Semaphore-based solution in a case of an access to limited resources. Obviously, a thread state, ThreadLocals, should be cleaned up after the virtual thread usage exactly as it is done for the platform threads, borrowed from a pool.

It is worth to note that pooling of virtual threads in facts restricts an application of Structured Concurrency, for example, Scoped Values, because their binding to a virtual thread should be done at the time of its initiating/starting, which is impossible with pre-created pool-managed threads. Therefore, Semaphore or equivalent method remains more advisable as it allows to engage a full power of virtual threads and Structural Concurrency.

Galatia answered 8/2, 2024 at 16:45 Comment(0)
D
2

tl;dr

Is it possible to create an executor service for creating virtual threads and having limiting feature at the same time?

Yes.

Pile up many virtual threads, blocked while waiting for a counting semaphore to be released by any of the limited number of currently-executing tasks.

Details

I am using below code to limit the creation of threads. TS_NetworkIPUtils

No, do not limit the number of threads. Virtual threads are by design extremely “cheap”, taking little in the way system resources (memory, CPU) by default. You likely can support millions of virtual threads at a time.

When your tasks are expensive, you may need to limit the amount of simultaneous tasks. Go ahead and stack up many threads, but block their execution to wait upon a certain number of concurrent tasks busy executing. Blocking on virtual threads is extremely cheap (their raison d’être), so no problem if they pile up.

Use a counting semaphore to limit simultaneous tasks

You can use a counting semaphore to limit the number of simultaneous tasks running in virtual threads. See section Rate Limiting in tutorial Virtual Threads by Cay Horstmann.

Something like the following example. We submit 15 tasks, but limit to only 5 or less executing simultaneously. The new Semaphore ( 5 ) provides a bucket of five permits that may acquired and released. The permits are like a total of five copies of a book being borrowed from a library and eventually returned for someone else to borrow — lots of borrowers, but only five at a time.

Notice how the later tasks are waiting several seconds for a permit from our counting semaphore.

Finally, after all the submitted tasks are done, we exit our try-with-resources syntax block.

(Caveat: I just whipped out this code example, without much thought behind it. I was shooting for something simpler than the example seen in the linked Cay Horstmann tutorial.)

package work.basil.example.threading;

import java.time.Duration;
import java.time.Instant;
import java.util.SequencedCollection;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

public class LimitTasks
{
    public static void main ( String[] args )
    {
        LimitTasks app = new LimitTasks ( );
        app.demo ( );
    }

    private void demo ( )
    {
        System.out.println ( "Demo start. " + Instant.now ( ) );

        SequencedCollection < String > log = new CopyOnWriteArrayList <> ( );  // System.out.println calls across threads do *NOT* necessarily appear on console in chronological order.
        final Semaphore TASKS_PERMITS = new Semaphore ( 5 );
        log.add ( "Permits available: " + TASKS_PERMITS.availablePermits ( ) );
        try (
                ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
        )
        {
            for ( int nthTask = 1 ; nthTask <= 15 ; nthTask++ )
            {
                executorService.submit ( new PrintIt ( nthTask , TASKS_PERMITS , log ) );
            }
        }
        log.add ( "After try-with-resources on ExecutorService. " + Instant.now ( ) );
        log.forEach ( System.out :: println );
        System.out.println ( "Demo done. " + Instant.now ( ) );
    }
}

class PrintIt implements Runnable
{
    private final int label;
    private final Semaphore tasksPermits;
    private final SequencedCollection < String > log;

    public PrintIt ( final int label , final Semaphore tasksPermits , final SequencedCollection < String > log )
    {
        this.label = label;
        this.tasksPermits = tasksPermits;
        this.log = log;
    }

    @Override
    public void run ( )
    {
        this.log.add ( "nthTask " + label + ". Approximately " + this.tasksPermits.getQueueLength ( ) + " tasks waiting for a permit. " + Instant.now ( ) );
        long startNanos = System.nanoTime ( );
        try
        {
            this.tasksPermits.acquire ( );
            long elapsed = System.nanoTime ( ) - startNanos ;
            this.log.add ( "nthTask " + label + " waited " + Duration.ofNanos ( elapsed ) + " for a permit. Permits available: " + this.tasksPermits.availablePermits ( ) + ". " + Instant.now ( ) );
            long low = Duration.ofSeconds ( 2 ).toNanos ( );
            long high = Duration.ofSeconds ( 10 ).toNanos ( );
            long random = ThreadLocalRandom.current ( ).nextLong ( low , high );
            LockSupport.parkNanos ( random );
            this.log.add ( "nthTask " + label + " running at " + Instant.now ( ) + ". Permits available: " + this.tasksPermits.availablePermits ( ) + "." );
        } catch ( InterruptedException e )
        {
            throw new RuntimeException ( e );
        } finally
        {
            this.tasksPermits.release ( );
        }
    }
}

When run on Java 21 on a Mac with Apple Silicon from IntelliJ IDEA 2023.3.3 (Ultimate Edition).

Demo start. 2024-02-06T04:42:12.414617Z
Permits available: 5
nthTask 3. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425896Z
nthTask 4. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425813Z
nthTask 7. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.427279Z
nthTask 1. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425813Z
nthTask 2. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425813Z
nthTask 9. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.428066Z
nthTask 10. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.428092Z
nthTask 8. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.427894Z
nthTask 5. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.425983Z
nthTask 6. Approximately 0 tasks waiting for a permit. 2024-02-06T04:42:12.426432Z
nthTask 11. Approximately 5 tasks waiting for a permit. 2024-02-06T04:42:12.435172Z
nthTask 12. Approximately 5 tasks waiting for a permit. 2024-02-06T04:42:12.435178Z
nthTask 13. Approximately 7 tasks waiting for a permit. 2024-02-06T04:42:12.435214Z
nthTask 14. Approximately 7 tasks waiting for a permit. 2024-02-06T04:42:12.435225Z
nthTask 15. Approximately 9 tasks waiting for a permit. 2024-02-06T04:42:12.435261Z
nthTask 3 waited PT0.0088205S for a permit. Permits available: 0. 2024-02-06T04:42:12.434720Z
nthTask 2 waited PT0.008820167S for a permit. Permits available: 1. 2024-02-06T04:42:12.434640Z
nthTask 4 waited PT0.008743834S for a permit. Permits available: 1. 2024-02-06T04:42:12.434705Z
nthTask 7 waited PT0.007267583S for a permit. Permits available: 1. 2024-02-06T04:42:12.434666Z
nthTask 1 waited PT0.008801666S for a permit. Permits available: 2. 2024-02-06T04:42:12.434619Z
nthTask 2 running at 2024-02-06T04:42:15.306355Z. Permits available: 0.
nthTask 9 waited PT2.887408834S for a permit. Permits available: 0. 2024-02-06T04:42:15.315471Z
nthTask 3 running at 2024-02-06T04:42:17.420189Z. Permits available: 0.
nthTask 10 waited PT4.9929405S for a permit. Permits available: 0. 2024-02-06T04:42:17.421040Z
nthTask 7 running at 2024-02-06T04:42:17.920249Z. Permits available: 0.
nthTask 8 waited PT5.493109666S for a permit. Permits available: 0. 2024-02-06T04:42:17.921006Z
nthTask 4 running at 2024-02-06T04:42:18.204203Z. Permits available: 0.
nthTask 5 waited PT5.779108792S for a permit. Permits available: 0. 2024-02-06T04:42:18.205082Z
nthTask 9 running at 2024-02-06T04:42:19.479390Z. Permits available: 0.
nthTask 6 waited PT7.053674S for a permit. Permits available: 0. 2024-02-06T04:42:19.480079Z
nthTask 1 running at 2024-02-06T04:42:19.800778Z. Permits available: 0.
nthTask 11 waited PT7.366630625S for a permit. Permits available: 0. 2024-02-06T04:42:19.801791Z
nthTask 10 running at 2024-02-06T04:42:20.015210Z. Permits available: 0.
nthTask 12 waited PT7.580567167S for a permit. Permits available: 0. 2024-02-06T04:42:20.015699Z
nthTask 11 running at 2024-02-06T04:42:24.640389Z. Permits available: 0.
nthTask 13 waited PT12.205975708S for a permit. Permits available: 0. 2024-02-06T04:42:24.641142Z
nthTask 8 running at 2024-02-06T04:42:26.301821Z. Permits available: 0.
nthTask 14 waited PT13.867504542S for a permit. Permits available: 0. 2024-02-06T04:42:26.302663Z
nthTask 12 running at 2024-02-06T04:42:26.407675Z. Permits available: 0.
nthTask 15 waited PT13.972995459S for a permit. Permits available: 0. 2024-02-06T04:42:26.408182Z
nthTask 5 running at 2024-02-06T04:42:27.728516Z. Permits available: 0.
nthTask 6 running at 2024-02-06T04:42:28.044373Z. Permits available: 1.
nthTask 15 running at 2024-02-06T04:42:29.974056Z. Permits available: 2.
nthTask 13 running at 2024-02-06T04:42:30.613859Z. Permits available: 3.
nthTask 14 running at 2024-02-06T04:42:33.088984Z. Permits available: 4.
After try-with-resources on ExecutorService. 2024-02-06T04:42:33.089508Z
Demo done. 2024-02-06T04:42:33.094512Z

Process finished with exit code 0

For more info on virtual threads, see the fascinating video talks by Ron Pressler, Alan Bateman, or José Paumard. And read the JEP 444.

Dimeter answered 6/2, 2024 at 3:52 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.