Parquet row-group based reader and partitioning


The current Parquet reader appears to parallelize only by columns, but not by rows or row-groups.

In a test, I have a 4.4 GB parquet file that has 6 columns and 17 rowgroups that I am reading into Modin, it takes 2 minutes 30 seconds to load, and appears to be using only 6 cores, when many more are available.

The text file version of this data is 21GB and loads in 30 seconds, and uses 50 or more cores in parallel. ( of note: Native Pandas takes 7 minutes 30 seconds )

I’d propose to implement a Parquet reader that reads row groups, or sets of row groups if there are more row-groups than partitions in parallel. Ideally this could also allow for splitting of columns into groups as well if there are many columns to produce evenly distributed amounts of data for each partition to load.

One advantage of this would be to benefit from Parquet compression in input data, but still read in parallel, something that cannot be achieved with gzipped input files.

Note: see a previous version of the parquet reader that did read by row groups, but was replaced with:
I’m going to play with some of that old code - was there anything known to be incorrect about the previous implementation which included:
def _read_parquet_row_group(path, columns, row_group_id, kwargs={}):


Thanks @Justin_Paschall! This is great!

Modin uses Block partitioning (as opposed to Spark, which uses Row partitioning). Link to documentation for convenience. Your proposal makes sense, if there is more parallelism we can exploit, we should.

We use pyarrow to read parquet files. There is a way to read row groups in pyarrow. Link to Modin code

As a first pass, we can just choose between number of columns and number of row groups. The end result will still need to be split, but it should hopefully be relatively straightforward to add.

Long-term idea

The highest degree of parallelism would likely be to read directly into block partitions using columns and row groups. Letting the layout of the data on the filesystem decided the partitioning makes sense, and that is how a lot of systems use Parquet anyway. In the code here, we are building the DataFrame partitions by having the remote task returned from Ray return multiple objects (num_return_vals). We can instead use 2D list comprehension and a remote task that will just read the row group + column group one at a time.