Arrow + Java: Populate VectorSchemaRoot (from stream / file) | Memory-Ownership | Usage patterns
Asked Answered
E

2

6

I'm doing very basic experiments with Apache Arrow, mostly in regards to passing some data between Java, C++, Python using Arrow's IPC format (to file), Parquet format (to file) and IPC format (stream through JNI).

C++ and Python looks somewhat usable, but the Java-part is really troubling me.

Sadly, the Java-documentation is kind of limited, but despite ackknowledging the warnings of these hidden docs (not part of TOC), i'm just trying to populate some VectorSchemaRoot from a previously written file.

Ignoring 99% of my experiment-code, the following minimal example shows my problem where i'm just creating some data, write it out (can be nicely imported in Python) and try to read it back in Java.

But data-ownership seems to be in the way in regards to what i'm trying to achieve.

Maybe the idea of keeping VectorSchemaRoot as core-entry (like: all my data is here) is some kind of wrong usage, but i'm not sure what alternative is there. Why would i manually keep IntVectors and co. when this Class would do the same and serving some API to work with it.

package arrow.test;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;

import com.google.common.collect.ImmutableList;

public class Run {

  public static void main(String[] args) {

    ImmutableList.Builder<Field> builder = ImmutableList.builder();
    
    Field intField = new Field("i", FieldType.nullable(new ArrowType.Int(32, true)), null);
    builder.add(intField);      
    
    Field boolField = new Field("b", FieldType.nullable(new ArrowType.Bool()), null);
    builder.add(boolField);     
    
    RootAllocator sourceRootAlloc = new RootAllocator(Integer.MAX_VALUE);
    Schema sourceSchema = new Schema(builder.build(), null);
    VectorSchemaRoot sourceRoot  = VectorSchemaRoot.create(sourceSchema, sourceRootAlloc);
    
    FieldVector vector= sourceRoot.getVector("i");
    IntVector intVector = (IntVector) vector;
    intVector.allocateNew(5);
    intVector.set(0, 0);
    intVector.set(1, 1);
    intVector.set(2, 2);
    intVector.set(3, 3);
    intVector.set(4, 4);
    intVector.setValueCount(5);
    
    vector = sourceRoot.getVector("b");
    BitVector bitVector = (BitVector) vector;
    bitVector.allocateNew(5);
    bitVector.set(0, 1);
    bitVector.set(1, 1);
    bitVector.set(2, 0);
    bitVector.set(3, 0);
    bitVector.set(4, 1);
    bitVector.setValueCount(5);
    
    sourceRoot.setRowCount(5);

    System.out.println("before writing");
    System.out.println(sourceRoot.contentToTSVString());        
    
    // WRITE
    // -----
    
    try {
      FileOutputStream fileOut = new FileOutputStream("out", /*!overwrite=append*/false);
        ArrowFileWriter writer = new ArrowFileWriter(sourceRoot, null, fileOut.getChannel());
      writer.start();
      writer.writeBatch();
          writer.end();
          writer.close();
          fileOut.close();  
    }
    catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    
    // READ
    // ----
    
    FileInputStream fileInputStream;
      
    RootAllocator targetRootAlloc = new RootAllocator(Integer.MAX_VALUE);
    VectorSchemaRoot targetRoot = null;
    
    try {
      fileInputStream = new FileInputStream("out");

      ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), targetRootAlloc);
      
      targetRoot = reader.getVectorSchemaRoot();    
      reader.loadNextBatch();       
      
      System.out.println("before closing stream");
      System.out.println(targetRoot.contentToTSVString());

      reader.close();
      fileInputStream.close();          

      System.out.println("after closing stream");
      System.out.println(targetRoot.contentToTSVString());      
    }
    catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }           
  }
}

Running this (with Java 11 and -Dio.netty.tryReflectionSetAccessible=true as documented) leads to:

... irrelevant warning
...
before writing
i   b
0   true
1   true
2   false
3   false
4   true

before closing stream
i   b
0   true
1   true
2   false
3   false
4   true

after closing stream
Exception in thread "main" java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
  at io.netty.buffer.ArrowBuf.checkIndexD(ArrowBuf.java:337)
  at io.netty.buffer.ArrowBuf.chk(ArrowBuf.java:324)
  at io.netty.buffer.ArrowBuf.getByte(ArrowBuf.java:526)
  at org.apache.arrow.vector.BaseFixedWidthVector.isSet(BaseFixedWidthVector.java:776)
  at org.apache.arrow.vector.IntVector.getObject(IntVector.java:143)
  at org.apache.arrow.vector.IntVector.getObject(IntVector.java:39)
  at org.apache.arrow.vector.VectorSchemaRoot.contentToTSVString(VectorSchemaRoot.java:268)
  at arrow.test.Run.main(Run.java:102)

while Python can do it easily:

import pyarrow as pa
print(pa.__version__)

buf = pa.ipc.open_file("out")
print(buf.schema)

df = buf.read_pandas()
print(df)

outputs:

0.17.1
i: int32
b: bool
   i      b
0  0   True
1  1   True
2  2  False
3  3  False
4  4   True

Now it seems, that ArrowFileReader feels responsible to clean up the data despite the fact, that the allocator is defined out of it's scope. targetRoot.getRowCount() is correct, but each VectorField is of size 0.

I tried a lot of alternatives (not shown) in regards to using VectorUnloader and VectorLoader to load from some reader VectorSchemaRoot (local scope), transferring ownership using batch.cloneWithTransfer(targetAlloc)(global scope) and loading into target VectorSchemaRoot (global scope), but that did not work, usually due to A buffer can only be associated between two allocators that share the same root

Either i must have misunderstood a lot about the usage patterns, or (probably not, but it feels like) the Java part is pretty broken.

Any ideas?

Eisk answered 16/7, 2020 at 15:33 Comment(0)
A
1

I ran into the same issue trying to load an Arrow file written out by Python and then read by Java, with the intent to ultimately stream the contents out as JSON (via Jackson).

Initially I tried a similar strategy -- read the table, cart it around in memory, then eventually let Jackson pick it up and use my custom serializer. I experienced the same "why are the vectors empty but the row count is full? ohhh closing the reader closed the vectors" moment as you bolded in your question.

I ultimately decided to hang onto only the path to the file in memory, and the Jackson serializer actually opens the file and writes the JSON stream while the reader and vectors are all open; roughly:

// adapted from Kotlin, apologies if it's a bit syntactically invalid
void serialize(JsonGenerator gen) {
  gen.writeStartArray();
  try (ArrowFileReader rdr = new ArrowFileReader(
                Files.newByteChannel(path, StandardOpenOption.READ),
                allocator)) {
    while (rdr.loadNextBatch()) {
        // for each batch, write the loaded vectors' rows
        writeLoadedVectorsToJson(gen, rdr.getVectorSchemaRoot());
    }
  }
  gen.writeEndArray();
}

This wasn't my instinct at first but it actually resulted in much cleaner, lower-memory code; I didn't need access to the values except at serialization time, and at most 1 of the N batches was in memory at any given time*. So this was ultimately a better version than I was trying at before.

I ended up wrapping the path and reader in an AutoClosable & Iterator<VectorSchemaRoot> implementation that I can use elsewhere if I need the contents in other ways in Java.

I have the impression that having the reader open during the code that operates on the vectors is running is intentional, because you may need e.g. to seek back to an earlier batch, or at least avoid keeping all vectors from all batches in memory at once.

*: to your point in your JIRA about docs, it's not clear without digging into code whether the Java file reader memory-maps or loads batches into direct buffers since the docs are so sparse.

Abner answered 11/3, 2021 at 14:50 Comment(0)
M
0

Meet the same question today and solving it, the question happens because the vectorSchemaRoot cannot write the value into it, may be the direct memory is not valid or the schema is error, if you have weChat we can talk, wechat id:DawnWang

Mackey answered 17/7, 2020 at 6:32 Comment(1)
Thanks for mentioning that i'm not alone struggling with it, although i can't follow you: did you solve the problem (nicely)? In my example, the reader must use targetRootAlloc to store the values and printing out shows that those are there. But when the reader is closed, those values disappear, although targetRootAlloc was declared out of scope. I don't know how to make clear, that it should not delete anything. I have trouble understanding vectorSchemaRoot cannot write the value into it, may be the direct memory is not valid or the schema is error as the print shows, that data is thereEisk

© 2022 - 2024 — McMap. All rights reserved.