Java AsyncHttpClient: broken file while writing from LazyResponseBodyPart to AsynchronousFileChannel
Asked Answered
C

1

7

I use AsyncHttpClient library for async non blocking requests. My case: write data to a file as it is received over the network.

For download file from remote host and save to file I used default ResponseBodyPartFactory.EAGER and AsynchronousFileChannel so as not to block the netty thread as data arrives. But as my measurements showed, in comparison with LAZY the memory consumption in the Java heap increases many times over.

So I decided to go straight to LAZY, but did not consider the consequences for the files.

This code will help to reproduce the problem.:

public static class AsyncChannelWriter {
     private final CompletableFuture<Integer> startPosition;
     private final AsynchronousFileChannel channel;

     public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
         this.channel = channel;
         this.startPosition = CompletableFuture.completedFuture((int) channel.size());
     }

     public CompletableFuture<Integer> getStartPosition() {
         return startPosition;
     }

     public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {

         return currentPosition.thenCompose(position -> {
             CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
             channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
                 @Override
                 public void completed(Integer result, ByteBuffer attachment) {
                     writenBytes.complete(result);
                 }

                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) {
                     writenBytes.completeExceptionally(exc);
                 }
             });
             return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
         });
     }

     public void close(CompletableFuture<Integer> currentPosition) {
         currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
     }
 }

 public static void main(String[] args) throws IOException {
     final String filepath = "/media/veracrypt4/files/1.jpg";
     final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";

     final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
             .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
     final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
     final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
     final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
     client.prepareGet(downloadUrl)
             .execute(new AsyncCompletionHandler<Response>() {

                 @Override
                 public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
                     final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
                     final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
                     final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
                     atomicReferencePosition.set(newPosition);
                     return State.CONTINUE;
                 }

                 @Override
                 public Response onCompleted(Response response) {
                     asyncChannelWriter.close(atomicReferencePosition.get());
                     return response;
                 }
             });
}

in this case, the picture is broken. But if i use FileChannel instead of AsynchronousFileChannel, in both cases, the files come out normal. Can there be any nuances when working with DirectByteBuffer (in case withLazyResponseBodyPart.getBodyByteBuffer()) and AsynchronousFileChannel?

What could be wrong with my code, if everything works fine with EAGER?


UPDATE

As I noticed, if I use LAZY, and for example,i add the line Thread.sleep (10) in the method onBodyPartReceived, like this:

 @Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
    final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
    final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
    final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
    atomicReferencePosition.set(newPosition);
    Thread.sleep(10);
    return State.CONTINUE;
}

The file is saved to disk in non broken state.

As I understand it, the reason is that during these 10 milliseconds, the asynchronous thread in AsynchronousFileChannel manages to write data to the disk from this DirectByteBuffer.

It turns out that the file is broken due to the fact that this asynchronous thread uses this buffer for writing along with the netty thread.

If we take a look at source code with EagerResponseBodyPart, then we will see the following

private final byte[] bytes;
  public EagerResponseBodyPart(ByteBuf buf, boolean last) {
    super(last);
    bytes = byteBuf2Bytes(buf);
  }

  @Override
  public ByteBuffer getBodyByteBuffer() {
    return ByteBuffer.wrap(bytes);
  }

Thus, when a piece of data arrives, it is immediately stored in the byte array. Then we can safely wrap them in HeapByteBuffer and transfer to the asynchronous thread in file channel.

But if you look at the code LazyResponseBodyPart

  private final ByteBuf buf;

  public LazyResponseBodyPart(ByteBuf buf, boolean last) {
    super(last);
    this.buf = buf;
  }
  @Override
  public ByteBuffer getBodyByteBuffer() {
    return buf.nioBuffer();
  }

As you can see, we actually use in asynchronous file channel thread netty ByteBuff(in this case always PooledSlicedByteBuf) via method call nioBuffer

What can i do in this situation, how to safely pass DirectByteBuffer in an async thread without copying buffer to java heap?

Culpa answered 31/5, 2019 at 15:20 Comment(7)
Why not use BodyDeferringAsyncHandler for a simpler life?Olva
@MạnhQuyếtNguyễn because it is not effective? I use this client to reduce memory consumption and CPU resources. For a simple life, I could use an apache synchronous client. By the way, BodyDeferringAsyncHandler no different from my example using EAGER in terms of memory consumption, cause BodyDeferringAsyncHandler uses getBodyPartBytes method. I'm not sure, but probably when using BodyDeferringAsyncHandler, the thread will block while writing to the OutputStream.Culpa
FYI: The thread calling client.prepareGet(downloadUrl).execute is not blocked. Keep it simpleOlva
@MạnhQuyếtNguyễn of course, but the thread that processes the data will be blocked.Culpa
There is always one thread blocked: The one who actually writes the dataOlva
@MạnhQuyếtNguyễn if we talk about the writing on the disc, then yes, this is a blocking operation that's going on in my code in AsynchronousFileChannel, because java does not implement real file AIO in linux. AsyncHttpClient threads are not blocked when data arrives over the network, and when data arrive at a method onBodyPartRecieved, we should not block the netty thread. However, we have moved away from the original question.Culpa
Let us continue this discussion in chat.Culpa
C
1

I talked to the maintainer of AsyncHttpClient. Can see here

The main problem was that i dont's use netty ByteBuf methods retain and release. In the end, I came to two solutions.

First: Write the bytes in sequence to the file with tracking position with CompletableFuture.

Define wrapper class for AsynchronousFileChannel

@Log4j2
public class AsyncChannelNettyByteBufWriter implements Closeable {
    private final AtomicReference<CompletableFuture<Long>> positionReference;
    private final AsynchronousFileChannel channel;

    public AsyncChannelNettyByteBufWriter(AsynchronousFileChannel channel) {
        this.channel = channel;
        try {
            this.positionReference = new AtomicReference<>(CompletableFuture.completedFuture(channel.size()));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public CompletableFuture<Long> write(ByteBuf byteBuffer) {
        final ByteBuf byteBuf = byteBuffer.retain();
        return positionReference.updateAndGet(x -> x.thenCompose(position -> {
            final CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
            channel.write(byteBuf.nioBuffer(), position, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
                @Override
                public void completed(Integer result, ByteBuf attachment) {
                    attachment.release();
                    writenBytes.complete(result);
                }

                @Override
                public void failed(Throwable exc, ByteBuf attachment) {
                    attachment.release();
                    log.error(exc);
                    writenBytes.completeExceptionally(exc);
                }
            });
            return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
        }));
    }

    public void close() {
        positionReference.updateAndGet(x -> x.whenComplete((position, throwable) -> {
            try {
                channel.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }));
    }
}

In fact, there probably won't be an AtomicReference here, if the recording happens in one thread, and if from several, then we need to seriously approach synchronization.

And main usage.

public static void main(String[] args) throws IOException {
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
    final AsyncChannelNettyByteBufWriter asyncChannelNettyByteBufWriter = new AsyncChannelNettyByteBufWriter(channel);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() {
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) {
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf();
                    asyncChannelNettyByteBufWriter.write(byteBuf);
                    return State.CONTINUE;
                }

                @Override
                public Response onCompleted(Response response) {
                    asyncChannelNettyByteBufWriter.close();
                    return response;
                }
            });
}

The second solution: track the position based on the received size of bytes.

public static void main(String[] args) throws IOException {
    final String filepath = "1.jpg";
    final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
    final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
            .setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
    final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), new HashSet<>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE)), executorService);

    client.prepareGet(downloadUrl)
            .execute(new AsyncCompletionHandler<Response>() {

                private long position = 0;
                @Override
                public State onBodyPartReceived(HttpResponseBodyPart content) {
                    final ByteBuf byteBuf = ((LazyResponseBodyPart) content).getBuf().retain();
                    long currentPosition = position;
                    position+=byteBuf.readableBytes();
                    channel.write(byteBuf.nioBuffer(), currentPosition, byteBuf, new CompletionHandler<Integer, ByteBuf>() {
                        @Override
                        public void completed(Integer result, ByteBuf attachment) {
                            attachment.release();
                            if(content.isLast()){
                                try {
                                    channel.close();
                                } catch (IOException e) {
                                    throw new UncheckedIOException(e);
                                }
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuf attachment) {
                            attachment.release();
                            try {
                                channel.close();
                            } catch (IOException e) {
                                throw new UncheckedIOException(e);
                            }
                        }
                    });
                    return State.CONTINUE;
                }
                @Override
                public Response onCompleted(Response response) {
                    return response;
                }
            });
}

In the second solution, because we don’t wait until some bytes are written to the file, AsynchronousFileChannel can create a lot of threads (If you use Linux, because Linux does not implement non-blocking asynchronous file IO. In Windows, the situation is much better).

As my measurements showed, in the case of writing to a slow USB flash, the number of threads can reach tens of thousands, so for this you need to limit the number of threads by creating your ExecutorService and transferring it to AsynchronousFileChannel.

Are there obvious advantages and disadvantages of the first and second solutions? It's hard for me to say. Maybe someone can tell what is more effective.

Culpa answered 13/6, 2019 at 17:32 Comment(2)
Won't the first solution you suggested also suffer from the same threading issue? As it's also using AsynchronousFileChannel.Spruik
@Spruik will be, but noticeably less. Because in the first solution, bytes are written to the file sequentially, and a new portion of bytes will not be written until the previous one is written. In the second case, we do not wait until the previous portion of bytes is written to the file, so in the case of slow recording, we create a lot of threads. You can check this by emulating a recording on a slow storage (cheap flash drive for example) and check how many threads are created.Culpa

© 2022 - 2024 — McMap. All rights reserved.