Export a larger than memory modin dataframe to a disk format

Hi,
What are the ways to export a larger than memory modin dataframe to any file format that supports it? I see that most pd.to_x methods defer to pandas.
What I have right now is something like this but please let me know if there is a better way or how you do it.

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

for idx, i in enumerate(chunker(big_df, 100000)):
    print(f'============={idx}================')
    i.to_pickle(f"tmp_df/df_{idx}.pkl")

Hi @RAbraham, thanks for posting!

There are a couple of ways to do what you’re trying (assuming you are okay with chunking:

1.) We have a prototype for writing and reading a set of pickle files from partitions here: FEAT-#2764: experimental ray implementation of read_pickle, to_pickle functions by anmyachev · Pull Request #2626 · modin-project/modin · GitHub
2.) You can also use the partition API (which will avoid any communication, and will be more efficient than looping over chunks): Pandas Partition API — Modin 0.10.1+14.gb58663b.dirty documentation. This will allow you to write multiple files in parallel, which is safe as long as they are separate files.

Happy to answer questions about these!

Hi @devin-petersohn
Thanks for your reply :). This looks interesting
What’s a partition? Can you point me to some description? I searched the ray site and I understand that there can be many partitions per node? Is it partitioned by some key?

Also,
Just to see if I understand correctly.
For

  1. It’ll be the below?
big_df.to_pickle_distributed("mydata*.pkl.gz") # * stands for partition?

new_df = modin_pd.read_pickle_distributed("mydata*.pkl.gz")

I actually don’t know what to do after I have access to the partition

import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions
import numpy as np
data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8))
df = pd.DataFrame(data)
partitions = unwrap_partitions(df, axis=0, get_ip=True)
# How do I save a ref?  do a ray.get() and then save with normal python serializers like dill etc?

Regards,
Rajiv

@RAbraham Great questions, the documentation in Modin definitely could use some updating.

By partition, we mean something very (intentionally) vague. Modin’s partitioning capabilities are very flexible. Data can be partitioned by row, column, or block (as fine grained as a cell). A partition here is just a subset of the data. We have some metadata associated with each of the partitions so it’s possible to know the number of rows, columns, and IP address associated with each. There’s no requirement for key-based partitioning, but it is possible.

Yes, * is a wild card for storing a bunch of files. It’s not the cleanest or most explicit (what if you want mydata_{A, B, C, ...}?), but it’s a start and any feedback on this would be helpful. The writer will store the data based on the current row-level partitioning.

partitions is a list of ray.ObjectRef objects. Now you can pass them each to a ray task and do custom operators (writing a file in your case). Something like this (which I tested locally):

import ray
import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions
import numpy as np

@ray.remote
def save_pickle_partition(partition, path):
    partition.to_pickle(path)
    return 0


data = np.random.randint(0, 100, size=(2 ** 10, 2 ** 8))
df = pd.DataFrame(data)
partitions = unwrap_partitions(df, axis=0, get_ip=False)  # swapped to False for simplicity
path_base = "my_data_{}.pkl.gz"
return_codes = [
    save_pickle_partition.remote(part, path_base.format(idx))
    for idx, part in enumerate(partitions)
]
assert all(code == 0 for code in ray.get(return_codes))  # can check to make sure it worked

After this I get a bunch of pickle files:

In [3]: !ls my_data*
my_data_0.pkl.gz   my_data_12.pkl.gz  my_data_16.pkl.gz  my_data_2.pkl.gz  my_data_6.pkl.gz
my_data_1.pkl.gz   my_data_13.pkl.gz  my_data_17.pkl.gz  my_data_3.pkl.gz  my_data_7.pkl.gz
my_data_10.pkl.gz  my_data_14.pkl.gz  my_data_18.pkl.gz  my_data_4.pkl.gz  my_data_8.pkl.gz
my_data_11.pkl.gz  my_data_15.pkl.gz  my_data_19.pkl.gz  my_data_5.pkl.gz  my_data_9.pkl.gz

This machine has 20 virtual cores, so there are 20 partitions.

Does this help?

1 Like

@devin-petersohn this does help! I’ll try it out. Thanks!

how about an argument generator which is of type Generator (defined below) which is either a string or a Callable. You could pass in the index or (hesitant suggestion) even the dataframe that’s going to be pickled. something like

def my_generator(partition_index: int, df: DataFrame):
     # return f"data-my-{partition-index}.pkl.gz"
     # or .Possibly very slow :0 
     # first_index = df.index[0]
     # second_index= df.index[-1]
     # return f"data-from-{first_index}-to-{last_index}"
    pass
Generator = Union[str, Callable]
big_df.to_pickle_distributed(my_generator) # of type Generator

Just a thought :slight_smile:

Hi @devin-petersohn

I was just testing performance and I found that the partition code may be taking a lot more time than I expected. It’s about 15 mins for 4GB of data (about 15 partitions) as opposed to 23 seconds for my naive approach in the OP. I just wanted to check if I’m doing anything untoward. I just modified your code a bit.

# import pandas as pd

import ray
ray.init()
import modin.pandas as pd
from modin.distributed.dataframe.pandas import unwrap_partitions
import numpy as np


from pathlib import Path
import shutil
tmp_path = Path("tmp_df")

if tmp_path.exists():
    shutil.rmtree(tmp_path)

tmp_path.mkdir()


@ray.remote
def save_pickle_partition(partition, path: Path):
    partition.to_pickle(str(path))
    return 0


def save_df(a_df, folder: Path, filename_format="data_{}.pkl.gz"):
    partitions = unwrap_partitions(a_df, axis=0, get_ip=False)  # swapped to False for simplicity
    path_base = filename_format
    file_path = lambda idx: folder / path_base.format(idx)
    return_codes = [
        save_pickle_partition.remote(part, file_path(idx))
        for idx, part in enumerate(partitions)
    ]
    assert all(code == 0 for code in ray.get(return_codes))

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

def save_file(a_df, prefix, idx):
    a_df.to_pickle(f"{prefix}{idx}.pkl")

if __name__ == '__main__':
    import time
    frame_data = np.random.randint(0, 100, size=(2 ** 20, 2 ** 8))  # 2GB each
    df = pd.DataFrame(frame_data).add_prefix("col")
    big_df = pd.concat([df for _ in range(2)])  # 2x2GB frames

    start = time.time()
    save_df(big_df, tmp_path)
    # for idx, i in enumerate(chunker(big_df, 100000)): # 23 seconds
    #     print(f'============={idx}================')
    #     save_file(i, "tmp_df/df_", idx)

    end = time.time()

    print("Time elapsed:", end - start)

What’s weird is that it’s much faster(5 seconds) if I cycle through the features sequentially

def save_df(a_df, folder: Path, filename_format="data_{}.pkl.gz"):
    partitions = unwrap_partitions(a_df, axis=0, get_ip=False)  # swapped to False for simplicity
    for idx, part in enumerate(partitions):
        save_file(ray.get(part), "tmp_df/df_", idx)

ok, the reason why it takes 15 mins is because the that implementation was doing compression data_{}.pkl.gz while the ‘quick’ implementation was just creating pkl files *.pkl files. I didn’t know that it chooses compression based on file name extension. Sorry for the false alarm.

@RAbraham No problem, thanks for the update!

Sorry for the delay in response I am trying to wrap up my PhD thesis!

@devin-petersohn np at all! Best of luck with wrapping it up! and if you are in the mood, do post a link. I’d love to skim through it.

This feature is wonderful! When will this feature be merged and released?
BRs!