Using predicates to filter rows from pyarrow.parquet.ParquetDataset
Asked Answered
L

4

29

I have a parquet dataset stored on s3, and I would like to query specific rows from the dataset. I was able to do that using petastorm but now I want to do that using only pyarrow.

Here's my attempt:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    'analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)

df = dataset.read_pandas().to_pandas()

But that returns a pandas DataFrame as if the filter didn't work, i.e I have rows with various values of event_name. Is there something I am missing or something I misunderstood? I could filter after getting the pandas DataFrame but I would use much more memory space than needed.

Latchkey answered 10/6, 2019 at 8:33 Comment(0)
P
18

For anyone getting here from Google, you can now filter on rows in PyArrow when reading a Parquet file. Regardless if you read it via pandas or pyarrow.parquet.

From the documentation:

filters (List[Tuple] or List[List[Tuple]] or None (default)) – Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.

Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).

Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.

Pilsner answered 17/6, 2020 at 12:56 Comment(0)
C
33

Note: I’ve expanded this into a comprehensive guide to Python and Parquet in this post

Parquet Format Partitions

In order to use filters you need to store your data in Parquet format using partitions. Loading a few Parquet columns and partitions out of many can result in massive improvements in I/O performance with Parquet versus CSV. Parquet can partition files based on values of one or more fields and it creates a directory tree for the unique combinations of the nested values, or just one set of directories for one partition column. The PySpark Parquet documentation explains how Parquet works fairly well.

A partition on gender and country would look like this:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...

There is also row group partitioning if you need to further partition your data, but most tools only support specifying row group size and you have to do the key-->row group lookup yourself, which is ugly (happy to answer about that in another question).

Writing Partitions with Pandas

You need to partition your data using Parquet and then you can load it using filters. You can write the data in partitions using PyArrow, pandas or Dask or PySpark for large datasets.

For example, to write partitions in pandas:

df.to_parquet(
    path='analytics.xxx', 
    engine='pyarrow',
    compression='snappy',
    columns=['col1', 'col5'],
    partition_cols=['event_name', 'event_category']
)

This lays the files out like:

analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-0001.c000.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=OtherCategory/part-0001.c000.snappy.parquet

Loading Parquet Partitions in PyArrow

To grab events by one property using the partition columns, you put a tuple filter in a list:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[('event_name', '=', 'SomeEvent')]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

Filtering with Logical ANDs

To grab an event with two or more properties using AND you just create a list of filter tuples:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        ('event_name',     '=', 'SomeEvent'),
        ('event_category', '=', 'SomeCategory')
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

Filtering with Logical ORs

To grab two events using OR you need to nest the filter tuples in their own lists:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

dataset = pq.ParquetDataset(
    's3://analytics.xxx', 
    filesystem=fs, 
    validate_schema=False, 
    filters=[
        [('event_name', '=', 'SomeEvent')],
        [('event_name', '=', 'OtherEvent')]
    ]
)
df = dataset.to_table(
    columns=['col1', 'col5']
).to_pandas()

Loading Parquet Partitions with AWS Data Wrangler

As another answer mentioned, the easiest way to load data filtering to just certain columns in certain partitions wherever the data is located (locally or in the cloud) is to use the awswrangler module. If you're using S3, check out the documentation for awswrangler.s3.read_parquet() and awswrangler.s3.to_parquet(). The filtering works the same as with the examples above.

import awswrangler as wr

df = wr.s3.read_parquet(
    path="analytics.xxx",
    columns=["event_name"], 
    filters=[('event_name', '=', 'SomeEvent')]
)

Loading Parquet Partitions with pyarrow.parquet.read_table()

If you're using PyArrow, you can also use pyarrow.parquet.read_table():

import pyarrow.parquet as pq

fp = pq.read_table(
    source='analytics.xxx',
    use_threads=True,
    columns=['some_event', 'some_category'],
    filters=[('event_name', '=', 'SomeEvent')]
)
df = fp.to_pandas()

Loading Parquet Partitions with PySpark

Finally, in PySpark you can use pyspark.sql.DataFrameReader.read_parquet()

import pyspark.sql.functions as F
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]") \
                    .appName('Stack Overflow Example Parquet Column Load') \
                    .getOrCreate()

# I automagically employ Parquet structure to load the selected columns and partitions
df = spark.read.parquet('s3://analytics.xxx') \
          .select('event_name', 'event_category') \
          .filter(F.col('event_name') == 'SomeEvent')

Hopefully this helps you work with Parquet :)

Cindicindie answered 16/10, 2020 at 18:18 Comment(4)
the link to the medium post is brokenTurboprop
@JasonS fixed: blog.datasyndrome.com/…Cindicindie
If there are needed updates, please let me know!Cindicindie
appreciate it ... but I don't use Parquet anymore :/Turboprop
P
18

For anyone getting here from Google, you can now filter on rows in PyArrow when reading a Parquet file. Regardless if you read it via pandas or pyarrow.parquet.

From the documentation:

filters (List[Tuple] or List[List[Tuple]] or None (default)) – Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.

Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).

Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.

Pilsner answered 17/6, 2020 at 12:56 Comment(0)
D
5

Currently, the filters functionality is only implemented at the file level, not yet at the row level.

So if you have a dataset as a collection of multiple, partitioned parquet files in a nested hierarchy (the type of partitioned datasets described here: https://arrow.apache.org/docs/python/parquet.html#partitioned-datasets-multiple-files), you can use the filters argument to only read a subset of the files.
But, you can't yet use it for reading only a subset of the row groups of a single file (see https://issues.apache.org/jira/browse/ARROW-1796).

But, it would be nice that you get an error message of specifying such an invalid filter. I opened an issue for that: https://issues.apache.org/jira/browse/ARROW-5572

Doviedow answered 12/6, 2019 at 12:30 Comment(2)
Ok got it! I should have thought more about how to structure the data so I could make more efficient queries. Yes, having an error message would have been nice indeed, thanks for reporting.Latchkey
Hi, the issue you raise seems to be solved ... but I tested the code and it still does not throw an error ... issues.apache.org/jira/browse/ARROW-5572Zajac
S
5

For python 3.6+ AWS has a library called aws-data-wrangler that helps with the integration between Pandas/S3/Parquet and it allows you to filter on partitioned S3 keys.

to install do;

pip install awswrangler

To reduce the data you read, you can filter rows based on the partitioned columns from your parquet file stored on s3. To filter the rows from the partitioned column event_name with the value "SomeEvent" do;

for awswrangler < 1.0.0

import awswrangler as wr

df = wr.pandas.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)

for awswrangler > 1.0.0 do;

import awswrangler as wr

df = wr.s3.read_parquet(
         path="s3://my-bucket/my/path/to/parquet-file.parquet",
         columns=["event_name"], 
         filters=[('event_name', '=', 'SomeEvent')]
)
Socialminded answered 13/1, 2020 at 9:44 Comment(5)
I just attempted to utilize this example, and I get: AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'Brantley
it looks like there is a bug; github.com/awslabs/aws-data-wrangler/issues/267Socialminded
That's what I was afraid of. That was my bug report you linked to, in fact. :)Brantley
you can try older versions for now and see what it brings?Socialminded
Looks like it's not really a bug. The response to my bug report was: "Unfortunately AWS Data Wrangler does not support filters on physical columns, only on partitions. (Docs updated in the commit above). It's not only a matter of pass use_legacy_dataset=False, seems that the new dataset approach does not have support to receive boto3 sessions." Perhaps it would be a good idea to edit this answer to emphasize that filtering only works on partitions, not physical columns?Brantley

© 2022 - 2024 — McMap. All rights reserved.