Distributing <class 'int'> object. This may take some time

Environment:

  • modin == 0.14.0
  • pandas == 1.4.1
  • ray == 1.12.0
  • runing ray on K8s(GKE)
    • 1.21.10-gke.2000

Deploy Ray on K8s using Helm Chart:

image: rayproject/ray:1.12.0-py38
podTypes:
    CPU: 4
    memory: 30Gi
    GPU: 0
    rayResources: { "CPU": 0 }
  rayWorkerType:
    minWorkers: 0
    maxWorkers: 6
    memory: 30Gi
    CPU: 3
    GPU: 0

Hi there,

I have a simple job that concatenates 10 small parquet files and saves to one parquet file on GCS bucket. Each small dataset is around 1.0~2.0 GiB and each big parquet file is around 10~20 GiB. I used a for loop to produce 10 big parquet files from 100 small parquet files.

The following pseudo-code may be more clear about what I want to describe.

# file name: `my_job.py`
import ray
import modin.pandas as pd


runtime_env = {
    "pip": ["gcsfs", "modin"],
}
ray.init(
    address="ray://ray-cluster-ray-head.ray.svc.cluster.local:10001",
    namespace="andrew",
    runtime_env=runtime_env,
)


def combine_dfs(file_list: List[str], destination_path: str):
    df = pd.concat([pd.read_parquet(file) for file in file_list])
    df.to_parquet(destination_path)  # to GCS bucket


file_dict = {
 'job01': ['job01-path1', 'job01-path2', 'job01-path3', '...'],
 'job02': ['job02-path1', 'job02-path2', 'job02-path3', '...'],
 'job03': ['job03-path1', 'job03-path2', 'job03-path3', '...'],
 'job04': ['job04-path1', 'job04-path2', 'job04-path3', '...'],
 'job05': ['job05-path1', 'job05-path2', 'job05-path3', '...'],
 'job06': ['job06-path1', 'job06-path2', 'job06-path3', '...'],
 'job07': ['job07-path1', 'job07-path2', 'job07-path3', '...'],
 'job08': ['job08-path1', 'job08-path2', 'job08-path3', '...'],
 'job09': ['job09-path1', 'job09-path2', 'job09-path3', '...'],
 'job10': ['job10-path1', 'job10-path2', 'job10-path3', '...']
}

for key, item in file_dict.items():
    combine_dfs(
        item, f"gs://{bucket_name}/{key}.parquet"
    )

I ran this script on the terminal and the Ray cluster’s resource was up to max(CPU=18, Mem=180 Gi). I would see the following warning:

UserWarning: Distributing <class 'int'> object. This may take some time.

Sometimes, this warning just went away and the script processed the next big parquet file. HOWEVER, if I got bad luck, this warning would hang up forever.

When the hang-up forever issue happened, I observed two phenomena.

  • Ray Dashboard: All workers were IDLE. All CPUs usage were very low(<2%)
  • Ray Status: Usage showed 0.0/18.0 CPU

The funny thing was, when I saw this situation I press control + C once(Only once, if I press twice the script would be terminated). The terminal showed the following logs and the script went to the next process to produce the next big parquet file.

UserWarning: Distributing <class 'int'> object. This may take some time.
^CException ignored in: <function ClientObjectRef.__del__ at 0x7fe6e7ea5b80>
Traceback (most recent call last):
  File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/common.py", line 110, in __del__
    self._worker.call_release(self.id)
  File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 622, in call_release
    self._release_server(id)
  File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 628, in _release_server
    self.data_client.ReleaseObject(ray_client_pb2.ReleaseRequest(ids=[id]))
  File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 531, in ReleaseObject
    self._async_send(datareq)
  File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 426, in _async_send
    with self.lock:
KeyboardInterrupt: 
......
# Keep going the next processes...

The silly workaround for me now, is I keep monitoring the terminal and Ray Dashboard then press control + C manually if necessary.

Any ideas for this situation and any tips for preventing it?

Hi @Andrew_Li thanks for posting!

This is unusual considering that the UserWarning: Distributing <class 'int'> object. This may take some time. should only happen if you’re calling the DataFrame constructor.

Would you be willing to try to convert the warning into an exception (see Python warnings docs here: warnings — Warning control — Python 3.10.4 documentation) so we can get a traceback and get better understanding of what’s happening here?

Hi @devin-petersohn , thanks for the quick reply!

I tried to catch the UserWarning and then print the traceback to stdout. If I did it in the wrong way, please let me know.

Basically, the combine_daily_to_monthly is the combine_dfs I posted.

import warnings
import traceback

warnings.filterwarnings("error")


def get_df_mem(df):
    return df.memory_usage(deep=True).sum() / 1024**3

def combine_daily_to_monthly(file_list: List[str], destination_path: str):
    logging.info(f"File length: {len(file_list)}")
    logging.info("DataFrame concating......")
    df = pd.concat([pd.read_parquet(file) for file in file_list])
    logging.info(f"DataFrame memory usage: {get_df_mem(df):.3f} Gi")
    logging.info(f"DataFrame shape: {df.shape}")
    df.to_parquet(destination_path)
    logging.info(f"{destination_path} has been saved.")


try:
    combine_daily_to_monthly(
        item, f"gs://{source_bucket_name}/{source_prefix}/{key}.parquet"
    )
except Warning:
    traceback.print_exc()

The log is as follows

Traceback (most recent call last):
  File "plumber/tools/combine_daily_to_monthly.py", line 85, in <module>
    combine_daily_to_monthly(
  File "plumber/tools/combine_daily_to_monthly.py", line 27, in combine_daily_to_monthly
    logging.info(f"DataFrame memory usage: {get_df_mem(df):.3f} Gi")
  File "plumber/tools/combine_daily_to_monthly.py", line 13, in get_df_mem
    return df.memory_usage(deep=True).sum() / 1024**3
  File "/home/jovyan/ray/lib/python3.8/site-packages/modin/pandas/dataframe.py", line 1367, in memory_usage
    return Series(index_value, index=["Index"]).append(result)
  File "/home/jovyan/ray/lib/python3.8/site-packages/modin/pandas/series.py", line 100, in __init__
    warnings.warn(
UserWarning: Distributing <class 'int'> object. This may take some time.
INFO:root:Elapsed time: 73.145 sec
Exception ignored in: <ssl.SSLSocket fd=16, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.160.4.22', 44738), raddr=('74.125.203.128', 443)>
ResourceWarning: unclosed <ssl.SSLSocket fd=16, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.160.4.22', 44738), raddr=('74.125.203.128', 443)>

For some reason, I wanted to get the memory usage after I concatenated the final dataframe.

According to this log, seems the UserWarning was triggered by df. memory_usage(), right?

Yes, but only because index=True by default because the values are stored separately from the data. I’m not sure this would be causing the issue because the number of values is quite small (in this case a single integer is being put in the Ray object store, so it shouldn’t be hanging there. You can also flag index=False and it will ignore the index’s memory consumption. I imagine the issue is happening after this.

Could we try next to capture the ray.timeline and see what the ray scheduler is seeing?

Sorry for the late reply!

I put the timeline.json to this gist(20220523-ray-timeline · GitHub), but I have no idea how to analyze the data, I haven’t used the ray.timeline tool.

Although I flagged index=False and I didn’t see the UserWarning, I wonder if a column’s data type is int, would I see the UserWarning: Distributing <class 'int'> object. This may take some time. ?

Awesome, you can use Perfetto (https://perfetto.dev/) or chrome://tracing to visualize the json. I see a long gap of time between one series of computations and another. Are you running this in a jupyter notebook or in an interactive setting (like ipython)? We should probably try to figure out what is happening there.

No, the Distributing... is only because of the index, and the int in that message is the memory usage for the index itself. Columns of type int will not throw this error because we can run it in a distributed way.

No, I was running on the terminal. Everything wrote in one script and call python xxx.py on the terminal.

Thanks for the detailed explanation!

@Andrew_Li is the script something you can share?