Modin scalability issues

hi all,
I am evaluating modin and I am trying to test the scalability (strong scaling).

Following is a snippet of the code.

r = 100* 10**6
it = 5

import ray
ray.init(address='v-001:6379', _redis_password=RAY_PW, _node_ip_address='v-001')	

frame_data = np.random.randint(0, r, size=(r, 2)) 
frame_data1 = np.random.randint(0, r, size=(r, 2)) 

os.environ["MODIN_ENGINE"] = "ray"  
import modin.pandas as pd

for i in range(it):

    df_l = pd.DataFrame(frame_data).add_prefix("col")
    df_r = pd.DataFrame(frame_data1).add_prefix("col")
    print(f"data loaded", flush=True)

    t1 = time.time()
    out = df_l.merge(df_r, on='col0', how='inner')
    t2 = time.time()

    print(f"timings {r} {i} {(t2 - t1) * 1000:.0f} ms, {out.shape[0]}", flush=True)

    del df_l 
    del df_r
    del out 
    gc.collect()

I have been running this while varying the number of CPUs in the ray cluster. I have a 10 node cluster with 16 CPUs each. So, I am testing for 1, 2, 4, …, 128, 160 CPUs (each time the entire cluster has been stopped and restarted with new configuration).

For 100M rows, following are the average timings I got (discarding the 0th iteration).

world AVERAGE of time (ms)
1 48879.18562
2 42504.13573
4 38960.01136
8 43442.16764
16 45417.99891
32 47339.0817
64 45241.66238
128 48524.96344
160 52743.6024

It looks like I might be doing something wrong here, because I couldn’t get any speed up, while adding more resources for the operation.

I tried assigning NPartitions the same as number of CPUs in the cluster. But that didn’t help as well.

import modin.config
modin.config.NPartitions.put(w)

I am wondering what I am missing here?

Thanks in advance

Hi @nirandaperera, thanks for posting the details of your benchmark!

I see a couple of reasons the numbers might work out this way. For a 2-column dataframe, with 100M rows, that’s O(1.6GB) each, so I’m not sure you’re saturating the CPUs. You will probably need more complex data and a bit more of it. Can I suggest a file from the NYC Taxi dataset? TLC Trip Record Data - TLC

Part of the problem with no scaling is happening because the data is coming from a numpy array. We use ray.put to put data into the Ray object store, and that data doesn’t actually get sent to another node, which is a Ray-ism. Effectively what’s happening in the benchmark is that all of the data is staying on the head node (from what I can see), which should help explain your numbers being mostly unchanged from 1-N CPUs.

Another issue likely happening here is that Modin is probably doing a broadcast join. We have plans to implement a generic shuffle join in the near future, and we have active development on a generic shuffling mechanism.

Does that make sense?

Edit: order of magnitude.

@devin-petersohn
Thank you for the response.

I didn’t know about the issue with Ray object store. In that scenario, how can we distribute a dataset then? Wouldn’t Ray object store handle the modin.config.NPartitions param? Can you point me to an example where we could distribute a dataset over N nodes?

Nevertheless, I am guessing the merge operation wouldn’t scale very well with a broadcast join (unless the broadcasted table is very small compared to the other). Is there a timeline for the shuffle join feature?

Hi @nirandaperera , I had left a comment in the issue of the Github repo as didn’t see the discussion here.

No, that is a Modin parameter that only specifies how many partitions to use for a Modin dataframe along each axis.

Not sure that there is such a mechanism in Ray. @devin-petersohn , any thoughts?

If you can read the data from a file on S3 or some NFS, Modin will be able to tell Ray how to distribute the workload.