How to process a large tar file entirely in memory?
Asked Answered
D

1

1

I am trying to build an endpoint in Kotlin which accepts 10+ GB tar files, and processes the contents one by one.

The Tar file contains millions of JSON files, and I am running this application in a Docker container with very limited disk size, so extracting the entire archive to a temporary directory is not an option.

The following approach with Apache Compress:

post(...) {
    val multipart = call.receiveMultipart()
    multipart.forEachPart { part ->
        if (part is PartData.FileItem) {
            part.streamProvider().use { inputStream ->
                BufferedInputStream(inputStream).use { bufferedInputStream ->
                    TarArchiveInputStream(bufferedInputStream).use { tarInput ->

leads to java.io.IOException: Corrupted TAR archive. error due to providing the tar data in stream instead of a one huge variable that contains all bytes. I also cannot consume the entire input stream into one ByteArray variable and provide it to BufferedInputStream because I don't have 20 gigs of memory.

Any help is appreciated.

  • inputStream -> java.io.InputStream
  • bufferedStream -> java.io.BufferedStream

The example code doesn't contain any special types belonging to Kotlin or Ktor.

Update

Longer example by sending the file as the POST body:

call.receiveStream().buffered().use { bufferedInputStream ->
    TarArchiveInputStream(bufferedInputStream).use { tarInput ->
        var entry = tarInput.nextEntry
        while (entry != null) {
            if (!entry.isDirectory && entry.name.endsWith(".json")) {
                scope.launch {
                    val jsonString = tarInput.bufferedReader().readText()
                    val json: Map<String, JsonElement> =
                        Json.parseToJsonElement(jsonString).jsonObject

Update after Answer

// any stream that implements java.io.InputStream
val bodyStream = call.receiveStream()
val elems = sequence {
    bodyStream.buffered().use { bufferedInputStream ->
        TarArchiveInputStream(bufferedInputStream).use { tarInput ->
            while (true) {
                val entry = tarInput.nextEntry ?: break
                // do something with entry, yield that something, and process that something later.
                yield()
            }
        }
    }
}

Problem was asynchronously processing the tar, its explained in detail at accepted answer.

Disencumber answered 15/8 at 7:22 Comment(9)
Do you get the error immediately after starting the reading, without consuming anything, or it happens after some time? I don't know these utils, but your provided code looks good. If the lib provides this tar input stream, then it should be able to process by streaming.Sensualism
@Sensualism Sometimes it errors out with "Corrupted TAR archive", sometimes it doesnt error out but fills the contents with total gibberish. See: paste.com.tr/raw/vcpscorl. This doesnt happen if I load the entire TAR archive into a variable and pass it as input stream instead (tested with small TAR).Disencumber
I would assume this is a problem with networking/passing the data, not with the tar reader. HTTP isn't very reliable for such huge files, although 10gb for local network makes sense. Before you go to untaring anything, you can do something simpler like counting bytes read or calculating a hash.Sensualism
Yes this is local network, curl -v -F upload=@... http:/0.0.0.0:8080/..., but I get the same "Corrupted TAR archive" error both in small sample and large samples.Disencumber
@Sensualism Is there any possibility that java.io.BufferedReader might brick the entire tar input? paste.com.tr/raw/rkswbpoaDisencumber
Again, I believe the problem is not in the code above. BufferedInputStream is a very common and well tested utility used by thousands of applications every day. It is fine. Please start by making sure you actually receive the raw data that you send. My guess would be either: 1. Network problems. 2. Multipart processing. I don't know, maybe bigger files are split into multiple parts? 3. Some inconsistency between how you receive the data and how you send it. In some cases curl automatically converts new lines in the sent files, maybe this is it?Sensualism
If you don't plan to send multiple files and this multipart isn't a requirement, I would consider not using multipart, but sending the file directly. It will be simpler, less error-prone and probably more performant as well, because AFAIR multipart requires sending the data as base64 (or urlencode?) which takes more space. Your curl would use --data-binary instead of -F. You would have to adapt the server side as well.Sensualism
@Sensualism , no luck. cURL output: paste.com.tr/zdczoatz, and server code paste.com.tr/xkuroygl. I am getting stream of the body directly, yet I still get "corrupted tar archive" error, and file contents are all mixed up and gibberish again.Disencumber
I updated the question with the code sample you provided above, I hope you don't mind. And I added the answer.Sensualism
S
2

I guess the problem is caused by this line:

scope.launch {

We can't process the input stream concurrently, because whenever we call nextItem we re-use exactly the same input stream, it just seeks to another place. So all concurrent consumers actually consume from the same place.

We can only read the input stream sequentially. If the JSON parsing takes a lot of time and you still like to use multiple threads, you can first read into byte arrays / strings sequentially, then parse them as JSONs concurrently. But reading from the stream itself has to be sequential.

Sensualism answered 15/8 at 9:45 Comment(1)
Thank you thank you dude! There is no way I could have figured this out what the hell. Fixed by removing scope.launch and processing sequential <3 <3Disencumber

© 2022 - 2024 — McMap. All rights reserved.