Placement Group in Ray

Hi there,

Is it possible to use Placement Group in Ray?

I have a cost-saving set-up for a Ray cluster(on K8s).

    CPU: 4
    memory: 32Gi
    GPU: 0
    rayResources: { "CPU": 0 }
    nodeSelector: {}
    minWorkers: 0
    maxWorkers: 6
    memory: 32Gi
    CPU: 4
    GPU: 0
    rayResources: {}
    nodeSelector: {}

Therefore, when I connected to the Ray cluster the available resources would be something like this which is expected.

{'node:': 1.0, 'memory': 22548578304.0, 'object_store_memory': 9597968707.0}

However, when I tried to use Modin on the connected Ray cluster, it shows

Traceback (most recent call last):
  File "", line 20, in <module>
    df = pd.read_parquet("gs://xxx/xxx/xxx.parquet")
  File "/home/jovyan/ray/lib/python3.8/site-packages/modin/pandas/", line 222, in read_parquet
  File "/home/jovyan/ray/lib/python3.8/site-packages/modin/config/", line 213, in subscribe
  File "/home/jovyan/ray/lib/python3.8/site-packages/modin/pandas/", line 124, in _update_engine
  File "/home/jovyan/ray/lib/python3.8/site-packages/modin/core/execution/ray/common/", line 200, in initialize_ray
    num_cpus = int(ray.cluster_resources()["CPU"])
KeyError: 'CPU'

It’s understandable, Modin may calculate the num of cpus depending on the Ray cluster’s current resource.

For now, my silly workaround was using a small remote function to force the Ray cluster to scale up to at least one worker node before I used Modin.

def silly():
    from time import sleep

So, according to this scenario, Is it possible to use Placement Group in Ray?

Hi @Andrew_Li! Thank you so much for opening this issue! I’ve been able to replicate the error you found locally, and have opened an issue on GitHub (Modin is unable to handle ray cluster scaling from 0 · Issue #4450 · modin-project/modin · GitHub) for it. In the meanwhile, you can pip install directly from this branch on GitHub (GitHub - RehanSD/modin at rehan/scale_from_0) which contains a fix for this issue, that specifies the number of CPUs as 4 if the cluster doesn’t have any.

One important caveat is that Modin uses the number of CPUs to determine how many partitions to create (so with the fix, when no CPU resources are detected, it creates 4 row and column partitions, so 16 total partitions), so if you want to tune that based off of your cluster resources, I would run

from modin.config import NPartitions

You can run this either before doing any dataframe operations, in which case Modin will partition using that number from the start, or, you can run it after performing some operations, in which case Modin will use the new number for any newly created Dataframes, and eventually repartition the Dataframes with the old partitioning scheme to use the new number of partitions.

It would also be helpful to get a feel for what kind of behavior you would like to see from Modin in cases like this - I’d love to chat with you about how best Modin can scale up the number of partitions in situations where the ray cluster is being dynamically scaled.

I’ve also added a PR (FIX-#4450: Ensure Modin successfully initializes when Ray cluster has no resources by RehanSD · Pull Request #4451 · modin-project/modin · GitHub) - feel free to comment there as well!

Hi @rdurrani , thanks for your work.

I would like to confirm the following status after this FIX.

In my scenario:

  1. When I start using Modin to process a dataframe(let’s call it df), the rayWorker = 0, so the default num_cpus = 4 and I get 16 total partitions.
  2. All tasks/actors would be pending because the Ray autoscaler is scaling up to 6 rayWokers.
  3. After 3~5 mins, the int(ray.cluster_resources()["CPU"] will up to 24.
  4. At this moment, does the df still use 16 partitions for the following data processing or the df would be repartition using the new num_cpus = 24?

Hi @Andrew_Li!

  1. Yup! That’s how this particular fix works, although I’d love to get your input on what a good fallback would be!

  2. I’m not entirely sure on this point. The tasks will pend until Ray spins up workers, since you’ve disallowed scheduling on the head node, but I’m not entirely sure how Ray’s autoscaler scales up resources, so I’m not sure that the end result would be 6 workers. My hunch is it will create fewer than 6 at first, and then scale up as the workload increases.

  3. This should be true!

  4. When the number of CPUs available scales up, the num_partitions config (NPartitions) won’t automatically be updated. You can either make it so that NPartitions tracks the number of CPUs by running the code below, or you could statically update NPartitions by calling NPartitions.put

Code to make sure NPartitions always returns currently available resources:

import ray
from modin.config import NPartitions
def num_cpus():
     return int(ray.cluster_resources().get(“CPU”, 4))
NPartitions.get = num_cpus

Whenever a partitioning decision needs to be made (e.g. on creation of a new dataframe) Modin will call NPartitions.get() to determine the number of partitions to use, and so the above code snippet will ensure that it will always get the number of CPUs, or 4, if none are available.

Modin currently does not repartition automatically when NPartitions is updated. We currently have implemented dynamic repartitioning after operations like concat that can lead to DataFrames having too many partitions, and are working on rolling out dynamic repartitioning more broadly - so the DataFrames made before NPartitions is updated won’t be repartitioned (unless they hit a codepath that triggers repartitioning, and even then, they will only be repartitioned according to our heuristic that determines if the df has too few/too many partitions based off of the value of NPartitions.get()), while DataFrames that are instantiated after NPartitions is changed will have the new partitioning scheme.

One other caveat to note is that NPartitions does not poll, so the updated values will only be available to Modin when Modin asks (aka when a partitioning decision needs to be made). Hope this helps!

Thank you for the details, it’s really useful.

1 Like