I use AsyncHttpClient library for async non blocking requests. My case: write data to a file as it is received over the network.
For download file from remote host and save to file I used default ResponseBodyPartFactory.EAGER
and AsynchronousFileChannel
so as not to block the netty thread as data arrives. But as my measurements showed, in comparison with LAZY
the memory consumption in the Java heap increases many times over.
So I decided to go straight to LAZY
, but did not consider the consequences for the files.
This code will help to reproduce the problem.:
public static class AsyncChannelWriter {
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
}
public CompletableFuture<Integer> getStartPosition() {
return startPosition;
}
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {
return currentPosition.thenCompose(position -> {
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
});
}
public void close(CompletableFuture<Integer> currentPosition) {
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
}
}
public static void main(String[] args) throws IOException {
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
}
});
}
in this case, the picture is broken. But if i use FileChannel
instead of AsynchronousFileChannel
, in both cases, the files come out normal. Can there be any nuances when working with DirectByteBuffer
(in case withLazyResponseBodyPart.getBodyByteBuffer()
) and AsynchronousFileChannel
?
What could be wrong with my code, if everything works fine with EAGER
?
UPDATE
As I noticed, if I use LAZY
, and for example,i add the line
Thread.sleep (10)
in the method onBodyPartReceived
, like this:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
}
The file is saved to disk in non broken state.
As I understand it, the reason is that during these 10 milliseconds, the asynchronous thread in AsynchronousFileChannel
manages to write data to the disk from this DirectByteBuffer
.
It turns out that the file is broken due to the fact that this asynchronous thread uses this buffer for writing along with the netty thread.
If we take a look at source code with EagerResponseBodyPart
, then we will see the following
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
bytes = byteBuf2Bytes(buf);
}
@Override
public ByteBuffer getBodyByteBuffer() {
return ByteBuffer.wrap(bytes);
}
Thus, when a piece of data arrives, it is immediately stored in the byte array. Then we can safely wrap them in HeapByteBuffer and transfer to the asynchronous thread in file channel.
But if you look at the code LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
this.buf = buf;
}
@Override
public ByteBuffer getBodyByteBuffer() {
return buf.nioBuffer();
}
As you can see, we actually use in asynchronous file channel thread netty ByteBuff
(in this case always PooledSlicedByteBuf
) via method call nioBuffer
What can i do in this situation, how to safely pass DirectByteBuffer
in an async thread without copying buffer to java heap?
BodyDeferringAsyncHandler
for a simpler life? – OlvaBodyDeferringAsyncHandler
no different from my example usingEAGER
in terms of memory consumption, causeBodyDeferringAsyncHandler
usesgetBodyPartBytes
method. I'm not sure, but probably when usingBodyDeferringAsyncHandler
, the thread will block while writing to theOutputStream
. – Culpaclient.prepareGet(downloadUrl).execute
is not blocked. Keep it simple – OlvaAsynchronousFileChannel
, because java does not implement real file AIO in linux.AsyncHttpClient
threads are not blocked when data arrives over the network, and when data arrive at a methodonBodyPartRecieved
, we should not block the netty thread. However, we have moved away from the original question. – Culpa