Ingesting large size datasets into Modin/Ray

Hello,

What is Modin on Ray expectations regarding ingesting large datasets in the dozens of terabytes and possibly more? Is there any presentation/paper discussing benchmarks with such dataset size?

In the Modin technical report, Dataframe Systems: Theory, Architecture, and Implementation | EECS at UC Berkeley, benchmarks are described in chapter 6, I see the 2 set ups:

  1. A single EC2 x1.32xlarge 128 cores and 1,952 GB RAM with a dataset up to 250GB (1.6B rows)
  2. A single x1e.32xlarge instance with 128 CPUs and 4TB of memory but the dataset itself is only 23GB (150M rows).

Can you provide benchmarks for larger datasets in the dozen of terrabytes size?

Specifically, I’m interested in understanding if you have any recommended approach for loading several files into a single Modin DataFrame specifically in the two cases

  1. A small amount of very large files
  2. A large number of small files

I’m also interested in understanding the major limitations for these dataset sizes across the different IO supported in pd.read_<file> and I/O APIs — Modin 0+untagged.50.g9498b92.dirty documentation

  • read_csv
  • read_parquet
  • read_sql
1 Like

Hi @Cherif_Jazra !

Thanks for reading my thesis :smile:! A couple of comments on your questions here:

This is something we’re working on, but let me address your two points because it’s an area of active development and improvement in Modin.

  1. A small amount of very large files

One thing I think that would be good to understand is file skew here and how it can impact Modin. One thing we have currently under development is a way to balance partitions after multiple concat operations. This is actively being worked on here: FIX-#3675: dataframe repartitioning after 'concat' operation in case actual partitions count is more than twice `NPartitions` by anmyachev · Pull Request #3720 · modin-project/modin · GitHub

  1. A large number of small files

This is related, but maybe a bit easier of a case to optimize for. With smaller partitions, it’s easier to merge them logically before we do a physical merge, so to the developer they will appear as normal and generally balanced partitions. This is easier when the partitions are small, which can happen with a large number of small files.

If you’re passing in a glob with read_csv_glob, there is no need to rebalance since the glob reader will be able to calculate good partitioning with knowledge of all of the files in advance.

The major limitations of read_csv in larger files is that the schema needs to be derived from the data after all of the data is loaded. This is expensive. This is also true for read_json(lines=True) and read_table

read_parquet limitation is that it is hard to tell how many rows are in a given file, and row-based partitioning is important for the scale you’re talking about. Nested parquet files and lots of partitions can make this complex, and I don’t think our reader is currently able to generate optimal partitioning in really complex nested parquet file structures.

read_sql in pandas doesn’t have a parameter for partition_column or something equivalent, so for some databases we end up with a syntax error in our generated SQL statements for reading partitioned data. This is also an area we are looking at, creating database-dependent SQL clauses for splitting up the query.

Does this make sense? Happy to answer any other questions!