How to limit memory usage while scanning parquet from S3 and join using Polars?

As a follow-up question to this question, I would like to find a way to limit the memory usage when scanning, filtering and joining a large dataframe saved in S3 cloud, with a tiny local dataframe.

Suppose my code looks like the following:

import pyarrow.dataset as ds
import polars as pl
import s3fs

# S3 credentials
secret = ...
key = ...
endpoint_url = ...

# some tiny dataframe
sellers_df = pl.DataFrame({'seller_id': ['0332649223', '0192491683', '0336435426']})

# scan, filter and join with huge dataframe on S3
fs = s3fs.S3FileSystem(endpoint_url=endpoint_url, key=key, secret=secret)
dataset = ds.dataset(f'{s3_bucket}/benchmark_dt/dt_partitions', filesystem=fs, partitioning='hive')
    scan_df = pl.scan_pyarrow_dataset(dataset) \
        .filter(pl.col('dt') >= '2023-05-17') \
        .filter(pl.col('dt') <= '2023-10-18') \
        .join(sellers_df.lazy(), on='seller_id', how='inner').collect()

And my parquet files layout, looks like the following:

-- dt_partitions
    -- dt=2023-06-09
        -- data.parquet
    -- dt=2023-06-10
            -- data.parquet
    -- dt=2023-06-11
        -- data.parquet
    -- dt=2023-06-12
        -- data.parquet
    ...

When running the code I notice that Polars first loads the entire dataset to the memory, according to the given dates, and after performs the join.
This causes me severe memory problems.

Is there any way to perform the join in pre-defined batches/streaming to save memory?

Thanks in advance.

Edit:

This is the explain plan (you can see no streaming applied):

INNER JOIN:
LEFT PLAN ON: [col("seller_id")]

    PYTHON SCAN 
    PROJECT */3 COLUMNS
    SELECTION: ((pa.compute.field('dt') >= '2023-10-17') & (pa.compute.field('dt') <= '2023-10-18'))
RIGHT PLAN ON: [col("seller_id")]
  DF ["seller_id"]; PROJECT */1 COLUMNS; SELECTION: "None"
END INNER JOIN
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]

However, when using is_in:

PYTHON SCAN 
  PROJECT */3 COLUMNS
  SELECTION: ((pa.compute.field('seller_id')).isin(["0332649223","0192491683","0336435426","3628932648","5241104373","1414317462","4028203396","6445502649","1131069079","9027417785","6509736571","9214134975","7722199293","1617136891","8786329949","8260764409","5103636478","3444202168","9066806312","3961998994","7345385102","2756955097","7038039666","0148664533","5120870693","8843132164","6424549457","8242686761","3148647530","8329075741","0803877447","2228154163","8661602117","2544985488","3241983296","4756084729","5317176976","0658022895","3802149808","2368104663","0835399702","0806598632","9753553141","3473629988","1145080603","5731199445","7622500016","4980968502","6713967792","8469333969"]) & ((pa.compute.field('dt') >= '2023-10-17') & (pa.compute.field('dt') <= '2023-10-18')))

Followed @Dean MacGregor answer, added os.environ['AWS_ALLOW_HTTP'] = 'true' and it worked:

--- STREAMING
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]

    Parquet SCAN s3://test-bucket/benchmark_dt/dt_partitions/dt=2023-10-17/part-0.parquet
    PROJECT */3 COLUMNS
RIGHT PLAN ON: [col("seller_id")]
  DF ["seller_id"]; PROJECT */1 COLUMNS; SELECTION: "None"
END INNER JOIN  --- END STREAMING

  • what does print(pl.scan_pyarrow_dataset(dataset) \ .filter(pl.col('dt') >= '2023-05-17') \ .filter(pl.col('dt') <= '2023-10-18') \ .join(sellers_df.lazy(), on='seller_id', how='inner').explain()) say?

    – 

  • How about doing .filter(pl.col('seller_id' in ['0332649223', '0192491683', '0336435426'])

    – 




  • @DeanMacGregor it says “python scan” instead of “parquet scan”, i will post explain plan tommorow

    – 

  • @0x26res i will check and update here

    – 

  • @0x26res polars isn’t 100% at mapping and pushing its filters to pyarrow datasets. My early guess is to consolidate the their 2 filters into one .filter((pl.col('dt') >= '2023-05-17') & (pl.col('dt') <= '2023-10-18')) and that the join is fine. If not that then maybe something weird like .filter(pl.col('dt').is_in(pl.select(pl.date_range(pl.date(2023,5,17),pl.date(2023,10,18),'1d').dt.strftime('%Y-%m-%d'))['date']) and if not that then just doing the filtering in pyarrow before handing off to polars.

    – 

polars doesn’t (know how to) do predicate pushdown to the pa dataset. The development efforts are to bolster its own cloud hive reading so perhaps give that a try?

The syntax for using it is a bit different than pyarrow.

It should look roughly like this with possible nuance for how to enter your auth info.

import polars as pl

scan_df = pl.scan_parquet(f's3://{s3_bucket}/benchmark_dt/dt_partitions/**/*.parquet', 
                  storage_options={'access_key_id':key,
                                   'secret_access_key':secret}
)

Note the difference in syntax: The path needs the “s3://” prefix and you have to use globbing patterns for the hive structure. For the auth, use the storage_options parameter with a dictionary with key values supported by object store. It doesn’t rely on, or utilize, fsspec/s3fs as pyarrow does.

From there you can do

scan_df.filter(pl.col('dt') >= '2023-05-17') \
        .filter(pl.col('dt') <= '2023-10-18') \
        .join(sellers_df.lazy(), on='seller_id', how='inner').collect()

However, for predicate_pushdown to work, since your hive doesn’t partition by seller_id, your underlying parquet files would need to have row_groups that are separated by seller_id and statistics that denote that separation.

Even without predicate pushdown, it is still possible to stream the data but you need to change collect() to collect(streaming=True).

If you need to access a local/custom S3 endpoint then set os.environ['AWS_ALLOW_HTTP'] = 'true' to tell Object Store to connect to a non AWS url. Here are more environment variables that it will look for/at.

Leave a Comment