I am running a Spring Boot app that uses WebClient for both non-blocking and blocking HTTP requests. After the app has run for some time, all outgoing HTTP requests seem to get stuck.
WebClient is used to send requests to multiple hosts, but as an example, here is how it is initialized and used to send requests to Telegram:
WebClientConfig:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient.create(ConnectionProvider.builder("connectionProvider").build())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
The same ReactorClientHttpConnector is used for all WebClients.
TelegramClient:
@Autowired
ReactorClientHttpConnector httpClient;
WebClient webClient;
RateLimiter rateLimiter;
@PostConstruct
public void init() {
webClient = WebClient.builder()
.clientConnector(httpClient)
.baseUrl(telegramUrl)
.build();
rateLimiter = RateLimiter.of("telegram-rate-limiter",
RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMinutes(1))
.limitForPeriod(20)
.build());
}
public void sendMessage(@PathVariable("token") String token, @RequestParam("chat_id") long chatId, @RequestParam("text") String message) {
webClient.post().uri(String.format("/bot%s/sendMessage", token))
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
.with("text", message))
.retrieve()
.bodyToMono(Void.class)
.transformDeferred(RateLimiterOperator.of(rateLimiter))
.block();
}
The RateLimiter is used to ensure the number of requests does not exceed 20 a minute as specified in the Telegram API.
When the app is started, all requests are resolved normally as expected. But after some time has passed, all requests seem to get stuck. The amount of time needed for this to happen can vary from a few hours to a few days. It happens for all requests to different hosts and is easily noticeable when messages from the TelegramBot stops. Once the requests get stuck, they are stuck indefinitely and I have to restart the app to get it working again.
There are no exceptions in the log that seem to have caused this. Since I maintain a queue for my telegram messages, I can see the point in time when the requests stop when the number of messages in the queue steadily increases and when errors happen in the other processes that are waiting for the requests to resolve.
It does not seem like the requests are even sent out as the connect timeout and response timeout that I have set do not take effect.
I had previously also tried setting idle time to 0 but that did not solve the problem
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").maxConnections(1000).maxIdleTime(Duration.ofSeconds(0)).build())
HttpClient httpClient = HttpClient.newConnection()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
Update:
I enabled metrics and viewed it using micrometer when it got stuck. Interestingly, it shows that there is one connection for Telegram, but also shows no connections on idle, pending or active.
reactor_netty_connection_provider_idle_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_pending_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_active_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 0.0
reactor_netty_connection_provider_total_connections{id="-1268283746",name="connectionProvider",remote_address="api.telegram.org:443",} 1.0
Could the issue be this missing connection?
Update 2:
I thought this might be related to this other issue: Closing Reactor Netty connection on error status codes
So I updated my HttpClient to this:
@Bean
public ReactorClientHttpConnector httpClient() {
HttpClient httpClient = HttpClient.create(ConnectionProvider.builder("connectionProvider").metrics(true).build())
.doAfterResponseSuccess((r, c) -> c.dispose())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout)
.responseTimeout(Duration.ofMillis(responseTimeout));
return new ReactorClientHttpConnector(httpClient);
}
But all that seemed to do is accelerate the occurrence of the problem. Just like before, the active, pending and idle connections do not add up to the total connections. The total is always greater than the other 3 metrics added together.
Update 3: I did a thread dump when the issue happened. There were a total of 74 threads, so I don't think the app is running out of threads.
The dump for the Telegram thread:
"TelegramBot" #20 daemon prio=5 os_prio=0 cpu=14.65ms elapsed=47154.24s tid=0x00007f6b28e73000 nid=0x1c waiting on condition [0x00007f6aed6fb000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000000fa865c80> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt([email protected]/AbstractQueuedSynchronizer.java:885)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1039)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1345)
at java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:232)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
at reactor.core.publisher.Mono.block(Mono.java:1707)
at com.moon.arbitrage.cm.feign.TelegramClient.sendMessage(TelegramClient.java:59)
at com.moon.arbitrage.cm.service.TelegramService.lambda$sendArbMessage$0(TelegramService.java:53)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda$1092/0x000000084070f840.run(Unknown Source)
at com.moon.arbitrage.cm.service.TelegramService.task(TelegramService.java:82)
at com.moon.arbitrage.cm.service.TelegramService$$Lambda$920/0x0000000840665040.run(Unknown Source)
at java.lang.Thread.run([email protected]/Thread.java:829)
Locked ownable synchronizers:
- None
The reactor worker threads:
"reactor-http-epoll-1" #15 daemon prio=5 os_prio=0 cpu=810.44ms elapsed=47157.07s tid=0x00007f6b281c4000 nid=0x17 runnable [0x00007f6b0c46c000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:286)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-2" #16 daemon prio=5 os_prio=0 cpu=1312.16ms elapsed=47157.07s tid=0x00007f6b281c5000 nid=0x18 waiting on condition [0x00007f6b0c369000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000000fa865948> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1796)
at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128)
at java.util.concurrent.CompletableFuture.waitingGet([email protected]/CompletableFuture.java:1823)
at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:1998)
at com.moon.arbitrage.cm.service.OrderService.reconcileOrder(OrderService.java:103)
at com.moon.arbitrage.cm.service.BotService$BotTask.lambda$task$1(BotService.java:383)
at com.moon.arbitrage.cm.service.BotService$BotTask$$Lambda$1161/0x00000008400af440.accept(Unknown Source)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:702)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-3" #17 daemon prio=5 os_prio=0 cpu=171.84ms elapsed=47157.07s tid=0x00007f6b28beb000 nid=0x19 runnable [0x00007f6b0c26a000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:829)
Locked ownable synchronizers:
- None
"reactor-http-epoll-4" #18 daemon prio=5 os_prio=0 cpu=188.10ms elapsed=47157.07s tid=0x00007f6b28b7d800 nid=0x1a runnable [0x00007f6b0c169000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait0(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:177)
at io.netty.channel.epoll.EpollEventLoop.epollWait(EpollEventLoop.java:281)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:829)
Locked ownable synchronizers:
- None
Seems like one of them is blocked with another task (that isn't even from the Telegram service) but that shouldn't be an issue since the other three worker threads are runnable right?