Why is Apache Orc RecordReader.searchArgument() not filtering correctly?
Asked Answered
S

2

6

Here is a simple program that:

  1. Writes records into an Orc file
  2. Then tries to read the file using predicate pushdown (searchArgument)

Questions:

  1. Is this the right way to use predicate push down in Orc?
  2. The read(..) method seems to return all the records, completely ignoring the searchArguments. Why is that?

Notes:

I have not been able to find any useful unit test that demonstrates how predicate pushdown works in Orc (Orc on GitHub). Nor am I able to find any clear documentation on this feature. Tried looking at Spark and Presto code, but I was not able to find anything useful.

The code below is a modified version of https://github.com/melanio/codecheese-blog-examples/tree/master/orc-examples/src/main/java/codecheese/blog/examples/orc

public class TestRoundTrip {
public static void main(String[] args) throws IOException {
    final String file = "tmp/test-round-trip.orc";
    new File(file).delete();

    final long highestX = 10000L;
    final Configuration conf = new Configuration();

    write(file, highestX, conf);
    read(file, highestX, conf);
}

private static void read(String file, long highestX, Configuration conf) throws IOException {
    Reader reader = OrcFile.createReader(
            new Path(file),
            OrcFile.readerOptions(conf)
    );

    //Retrieve x that is "highestX - 1000". So, only 1 value should've been retrieved.
    Options readerOptions = new Options(conf)
            .searchArgument(
                    SearchArgumentFactory
                            .newBuilder()
                            .equals("x", Type.LONG, highestX - 1000)
                            .build(),
                    new String[]{"x"}
            );
    RecordReader rows = reader.rows(readerOptions);
    VectorizedRowBatch batch = reader.getSchema().createRowBatch();

    while (rows.nextBatch(batch)) {
        LongColumnVector x = (LongColumnVector) batch.cols[0];
        LongColumnVector y = (LongColumnVector) batch.cols[1];

        for (int r = 0; r < batch.size; r++) {
            long xValue = x.vector[r];
            long yValue = y.vector[r];

            System.out.println(xValue + ", " + yValue);
        }
    }
    rows.close();
}

private static void write(String file, long highestX, Configuration conf) throws IOException {
    TypeDescription schema = TypeDescription.fromString("struct<x:int,y:int>");
    Writer writer = OrcFile.createWriter(
            new Path(file),
            OrcFile.writerOptions(conf).setSchema(schema)
    );

    VectorizedRowBatch batch = schema.createRowBatch();
    LongColumnVector x = (LongColumnVector) batch.cols[0];
    LongColumnVector y = (LongColumnVector) batch.cols[1];
    for (int r = 0; r < highestX; ++r) {
        int row = batch.size++;
        x.vector[row] = r;
        y.vector[row] = r * 3;
        // If the batch is full, write it out and start over.
        if (batch.size == batch.getMaxSize()) {
            writer.addRowBatch(batch);
            batch.reset();
        }
    }
    if (batch.size != 0) {
        writer.addRowBatch(batch);
        batch.reset();
    }
    writer.close();
}

}

Sakovich answered 22/6, 2017 at 6:9 Comment(0)
L
2

I encountered the same issue, and I think it was rectified by changing

.equals("x", Type.LONG,

to

.equals("x",PredicateLeaf.Type.LONG

On using this, the reader seems to return only the batch with the relevant rows, not only once which we asked for.

Ligan answered 5/11, 2018 at 12:13 Comment(1)
Change highestX to something like 1M (Rather than 10000L) for it to work. Why ? Because sarg will only filter/skip files/stripes/row groups. It doesn't filter rows. Or change search argument to search for -100, you will see difference.Indic
G
3

I know this question is old but maybe the answer is useful for someone. (And I just saw that mac wrote a comment saying basically the same as me a few hours ago, but I think a separate answer is better visible)

Orc internally separates the data into so called "row groups" (with 10000 rows each per default) where each row group has its own indices. The search argument is only used to filter out row groups in which no row can match the search argument. However, it does NOT filter out individual rows. It could even be that the indices state a row group matches a search argument while not a single row in it actually matches the search. This is because the row group indices mainly consist of min and max values of each column in the row group.

So you will have to iterate over the returned rows and skip the ones that do not match your search criteria.

Gild answered 13/6, 2019 at 9:0 Comment(1)
its a great answer.Hilariahilario
L
2

I encountered the same issue, and I think it was rectified by changing

.equals("x", Type.LONG,

to

.equals("x",PredicateLeaf.Type.LONG

On using this, the reader seems to return only the batch with the relevant rows, not only once which we asked for.

Ligan answered 5/11, 2018 at 12:13 Comment(1)
Change highestX to something like 1M (Rather than 10000L) for it to work. Why ? Because sarg will only filter/skip files/stripes/row groups. It doesn't filter rows. Or change search argument to search for -100, you will see difference.Indic

© 2022 - 2024 — McMap. All rights reserved.