Java 11 HttpClient Http2 Too many streams Error
Asked Answered
E

3

12

I am using HttpClient of Java 11 to post the request to an HTTP2 server. The HttpClient Object is created as a Singleton Spring bean as shown below.

@Bean
    public HttpClient getClient() {
                return HttpClient.newBuilder().version(Version.HTTP_2).executor(Executors.newFixedThreadPool(20)).followRedirects(Redirect.NORMAL)
                .connectTimeout(Duration.ofSeconds(20)).build();
    }

I am using the sendAsync method to send the requests asynchronously.

When I try to hit the server continuously, I am receiving the error after certain time "java.io.IOException: too many concurrent streams". I used Fixed threadpool in the Client building to try to overcome this error, but it is still giving the same error.

The Exception stack is..

java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1108) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:345) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:250) ~[java.net.http:?]
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: too many concurrent streams
    at java.net.http/jdk.internal.net.http.Http2Connection.reserveStream(Http2Connection.java:440) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:103) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:88) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:293) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:425) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:330) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:322) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:304) ~[java.net.http:?]

Can someone help me in fixing this issue?

The server is Tomcat9 and its max concurrent streams are the default.

Erastatus answered 28/2, 2019 at 3:19 Comment(0)
S
8

When I try to hit the server continuously

The server has a setting for max_concurrent_streams that is communicated to the client during the initial establishment of a HTTP/2 connection.

If you blindly "hit the server continuously" using sendAsync you are not waiting for previous requests to finish and eventually you exceed the max_concurrent_streams value and receive the error above.

The solution is to send concurrently a number of requests that is less than max_concurrent_streams; after that, you only send a new request when a previous one completes. This can easily implemented on the client using a Semaphore or something similar.

Sulfur answered 28/2, 2019 at 9:11 Comment(2)
Thank you for the clarification, I understood the issue now. I will bound the requests with some barrier. For further understanding, when I use the Fixedthreadpool instead of default Cachedthreadpool it is bounding the number of connections, where as each connection is limited to the maximum number of streams that are configured either at server or client end. am I correct?Erastatus
The thread pool does not bound the number of connections. With HTTP/2, there is probably just one connection to the server. What you may be bounding is the number of concurrent requests, but I would not rely on that because it would be highly dependent on the implementation. You upgrade the Java version and the fixed thread pool may not be limiting the concurrent requests like in previous Java versions. I would be explicit and use a Semaphore like I explained in the answer so that it will work with any Java version.Sulfur
S
6

Unfortunately, the approach with Semaphore, suggested by @sbordet, didn't work for me. I tried this:

var semaphore = semaphores.computeIfAbsent(getRequestKey(request), k -> new Semaphore(MAX_CONCURRENT_REQUESTS_NUMBER));

CompletableFuture.runAsync(semaphore::acquireUninterruptibly, WAITING_POOL)
                .thenComposeAsync(ignored -> httpClient.sendAsync(request, responseBodyHandler), ASYNC_POOL)
                .whenComplete((response, e) -> semaphore.release());

There's no guarantee that a connection stream is released by the time the execution is passed to the next CompletableFuture, where the semaphore is released. For me the approach worked in case of normal execution, however if there're any exceptions, it seems that the connection stream may be closed after semaphore.release() is invoked.

Finally, I ended up by using OkHttp. It handles the problem (it just waits until some streams are freed up if the number of concurrent streams reaches max_concurrent_streams). It also handles the GOAWAY frame. In case of Java HttpClient I had to implement retry logic to handle this as it just throws IOException if the server sends GOAWAY frame.

Savino answered 5/8, 2019 at 13:11 Comment(4)
Can you explain how the getRequestKey(request) is implemented in this example?Veracity
@Veracity I just implemented it in a similar way it's implemented in Java. Please refer to jdk.internal.net.http.Http2Connection#keyString for the details.Savino
Does that assume that a single host:port combination will have a single Semaphore (regardless of how many connections)? I am interested in the case that a single host:port has many connections, and I am not sure there is a way to create a window/Semaphore for each unique connection instance. Perhaps there is a reactive way built into the client concept, but I haven't figured that out yet.Veracity
@Veracity The question was about the HTTP 2 multiplexing and Java client specifically. I looked into this issue quite a while ago so I cannot be sure, but as I remember Java HTTP Client uses only one connection if the host supports HTTP 2 multiplexing. So using one semaphore per scheme:host:port was relevant for this case.Savino
K
2

I think @sbordet's answer is incorrect and this error does not occur because your requests per second is exceeding MAX_CONCURRENT_STREAMS, but because the number of open HTTP streams (per HTTP 2 connection?) exceeds that number.

For example, I have a server at work that has a MAX_CONCURRENT_STREAMS setting of 128: ​

$ curl -iv -H "Content-Type: application/json" https://example.local

...
* Connection state changed (MAX_CONCURRENT_STREAMS == 128)!

​ But I seem to be able to hit it with up to ~1000 requests per second without getting any errors back: ​

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams1 {
​
    private static final int CONCURRENCY = 1000;
​
    public static void main(String[] args) {
        final var counter = new AtomicInteger();
        final var singletonHttpClient = newHttpClient();
        final var singletonRequest = newRequest();
        final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(CONCURRENCY);
​
        for (int i = 0; i < CONCURRENCY; i++) {
            responses.add(singletonHttpClient.sendAsync(singletonRequest, BodyHandlers.discarding()));
        }
​
        for (CompletableFuture<HttpResponse<Void>> response : responses) {
            response.thenAccept(x -> {});
            response.join();
            System.out.println(counter.incrementAndGet());
        }
​
        singletonHttpClient.executor().ifPresent(executor -> {
            if (executor instanceof ExecutorService executorService) {
                executorService.shutdown();
            }
        });
    }
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .executor(Executors.newFixedThreadPool(CONCURRENCY))
                .build();
    }
}

​ When I increase CONCURRENCY to an absurd number like 2000, I get this error, and not java.io.IOException: too many concurrent streams: ​

Exception in thread "main" java.util.concurrent.CompletionException: java.net.SocketException: Connection reset
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
    at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at java.net.http/jdk.internal.net.http.Stream.completeResponseExceptionally(Stream.java:1153)
    at java.net.http/jdk.internal.net.http.Stream.cancelImpl(Stream.java:1238)
    at java.net.http/jdk.internal.net.http.Stream.connectionClosing(Stream.java:1212)
    at java.net.http/jdk.internal.net.http.Http2Connection.shutdown(Http2Connection.java:710)
    at java.net.http/jdk.internal.net.http.Http2Connection$Http2TubeSubscriber.processQueue(Http2Connection.java:1323)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(SequentialScheduler.java:205)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(SequentialScheduler.java:149)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:230)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

​ However, I can reproduce your error with this code (I hit this error first, and then found your question here!) ​

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams2 {
​
    public static void main(String[] args) {
        final var singletonHttpClient = newHttpClient();
        final var singletonRequest = newRequest();
        final var counter = new AtomicInteger();
​
        final var scheduler = Executors.newScheduledThreadPool(2);
​
        scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
​
        scheduler.scheduleAtFixedRate(() -> {
            final var batchSize = counter.incrementAndGet();
            final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
​
            try {
                for (int i = 0; i < batchSize; i++) {
                    responses.add(
                            singletonHttpClient.sendAsync(
                                    singletonRequest,
                                    BodyHandlers.discarding()
                            )
                    );
                }
​
                for (CompletableFuture<HttpResponse<Void>> response : responses) {
                    response.thenAccept(x -> {
                    });
                    response.join();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
​
            System.out.println("batchSize = " + batchSize);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
​
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .build();
    }
}

​ This one fails on 128th (!) execution of my once per 500ms runnable: ​

java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1189)
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:453)
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: too many concurrent streams

​ So the problem is not number of requests per second, but something else, which seems to be the number of concurrent open streams per http connection/client.

We can verify this by NOT sharing the same http client (and request) for all batch requests: ​

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams2 {
​
    public static void main(String[] args) {
        final var counter = new AtomicInteger();
​
        final var scheduler = Executors.newScheduledThreadPool(2);
​
        scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
​
        scheduler.scheduleAtFixedRate(() -> {
            final var httpClient = newHttpClient();
            final var request = newRequest();
            final var batchSize = counter.incrementAndGet();
            
            final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
​
            try {
                for (int i = 0; i < batchSize; i++) {
                    responses.add(
                            httpClient.sendAsync(
                                    request,
                                    BodyHandlers.discarding()
                            )
                    );
                }
​
                for (CompletableFuture<HttpResponse<Void>> response : responses) {
                    response.thenAccept(x -> {
                    });
                    response.join();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
​
            System.out.println("batchSize = " + batchSize);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
​
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .build();
    }
}

​ For me, this one fails at 143rd try with this error message: ​

java.util.concurrent.CompletionException: java.lang.InternalError: java.net.SocketException: Too many open files
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1159)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.InternalError: java.net.SocketException: Too many open files
    at java.net.http/jdk.internal.net.http.PlainHttpConnection.<init>(PlainHttpConnection.java:293)
    at java.net.http/jdk.internal.net.http.AsyncSSLConnection.<init>(AsyncSSLConnection.java:49)
    at java.net.http/jdk.internal.net.http.HttpConnection.getSSLConnection(HttpConnection.java:293)
    at java.net.http/jdk.internal.net.http.HttpConnection.getConnection(HttpConnection.java:279)
    at java.net.http/jdk.internal.net.http.Http2Connection.createAsync(Http2Connection.java:369)
    at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:128)
    at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:93)
    at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:343)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:475)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:380)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:372)
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:408)
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    ... 5 more

​ This one is most likely due to my laptop's relatively low ulimit of 12544. ​

Kero answered 31/8, 2022 at 11:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.