I'm doing very basic experiments with Apache Arrow, mostly in regards to passing some data between Java, C++, Python using Arrow's IPC format (to file), Parquet format (to file) and IPC format (stream through JNI).
C++ and Python looks somewhat usable, but the Java-part is really troubling me.
Sadly, the Java-documentation is kind of limited, but despite ackknowledging the warnings of these hidden docs (not part of TOC), i'm just trying to populate some VectorSchemaRoot
from a previously written file.
Ignoring 99% of my experiment-code, the following minimal example shows my problem where i'm just creating some data, write it out (can be nicely imported in Python) and try to read it back in Java.
But data-ownership seems to be in the way in regards to what i'm trying to achieve.
Maybe the idea of keeping VectorSchemaRoot
as core-entry (like: all my data is here) is some kind of wrong usage, but i'm not sure what alternative is there. Why would i manually keep IntVectors
and co. when this Class would do the same and serving some API to work with it.
package arrow.test;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import com.google.common.collect.ImmutableList;
public class Run {
public static void main(String[] args) {
ImmutableList.Builder<Field> builder = ImmutableList.builder();
Field intField = new Field("i", FieldType.nullable(new ArrowType.Int(32, true)), null);
builder.add(intField);
Field boolField = new Field("b", FieldType.nullable(new ArrowType.Bool()), null);
builder.add(boolField);
RootAllocator sourceRootAlloc = new RootAllocator(Integer.MAX_VALUE);
Schema sourceSchema = new Schema(builder.build(), null);
VectorSchemaRoot sourceRoot = VectorSchemaRoot.create(sourceSchema, sourceRootAlloc);
FieldVector vector= sourceRoot.getVector("i");
IntVector intVector = (IntVector) vector;
intVector.allocateNew(5);
intVector.set(0, 0);
intVector.set(1, 1);
intVector.set(2, 2);
intVector.set(3, 3);
intVector.set(4, 4);
intVector.setValueCount(5);
vector = sourceRoot.getVector("b");
BitVector bitVector = (BitVector) vector;
bitVector.allocateNew(5);
bitVector.set(0, 1);
bitVector.set(1, 1);
bitVector.set(2, 0);
bitVector.set(3, 0);
bitVector.set(4, 1);
bitVector.setValueCount(5);
sourceRoot.setRowCount(5);
System.out.println("before writing");
System.out.println(sourceRoot.contentToTSVString());
// WRITE
// -----
try {
FileOutputStream fileOut = new FileOutputStream("out", /*!overwrite=append*/false);
ArrowFileWriter writer = new ArrowFileWriter(sourceRoot, null, fileOut.getChannel());
writer.start();
writer.writeBatch();
writer.end();
writer.close();
fileOut.close();
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// READ
// ----
FileInputStream fileInputStream;
RootAllocator targetRootAlloc = new RootAllocator(Integer.MAX_VALUE);
VectorSchemaRoot targetRoot = null;
try {
fileInputStream = new FileInputStream("out");
ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), targetRootAlloc);
targetRoot = reader.getVectorSchemaRoot();
reader.loadNextBatch();
System.out.println("before closing stream");
System.out.println(targetRoot.contentToTSVString());
reader.close();
fileInputStream.close();
System.out.println("after closing stream");
System.out.println(targetRoot.contentToTSVString());
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Running this (with Java 11 and -Dio.netty.tryReflectionSetAccessible=true
as documented) leads to:
... irrelevant warning
...
before writing
i b
0 true
1 true
2 false
3 false
4 true
before closing stream
i b
0 true
1 true
2 false
3 false
4 true
after closing stream
Exception in thread "main" java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0))
at io.netty.buffer.ArrowBuf.checkIndexD(ArrowBuf.java:337)
at io.netty.buffer.ArrowBuf.chk(ArrowBuf.java:324)
at io.netty.buffer.ArrowBuf.getByte(ArrowBuf.java:526)
at org.apache.arrow.vector.BaseFixedWidthVector.isSet(BaseFixedWidthVector.java:776)
at org.apache.arrow.vector.IntVector.getObject(IntVector.java:143)
at org.apache.arrow.vector.IntVector.getObject(IntVector.java:39)
at org.apache.arrow.vector.VectorSchemaRoot.contentToTSVString(VectorSchemaRoot.java:268)
at arrow.test.Run.main(Run.java:102)
while Python can do it easily:
import pyarrow as pa
print(pa.__version__)
buf = pa.ipc.open_file("out")
print(buf.schema)
df = buf.read_pandas()
print(df)
outputs:
0.17.1
i: int32
b: bool
i b
0 0 True
1 1 True
2 2 False
3 3 False
4 4 True
Now it seems, that ArrowFileReader
feels responsible to clean up the data despite the fact, that the allocator is defined out of it's scope. targetRoot.getRowCount()
is correct, but each VectorField is of size 0
.
I tried a lot of alternatives (not shown) in regards to using VectorUnloader
and VectorLoader
to load from some reader VectorSchemaRoot (local scope), transferring ownership using batch.cloneWithTransfer(targetAlloc)
(global scope) and loading into target VectorSchemaRoot (global scope), but that did not work, usually due to A buffer can only be associated between two allocators that share the same root
Either i must have misunderstood a lot about the usage patterns, or (probably not, but it feels like) the Java part is pretty broken.
Any ideas?
targetRootAlloc
to store the values and printing out shows that those are there. But when the reader is closed, those values disappear, althoughtargetRootAlloc
was declared out of scope. I don't know how to make clear, that it should not delete anything. I have trouble understandingvectorSchemaRoot cannot write the value into it, may be the direct memory is not valid or the schema is error
as the print shows, that data is there – Eisk