Environment:
- modin == 0.14.0
- pandas == 1.4.1
- ray == 1.12.0
- runing ray on K8s(GKE)
- 1.21.10-gke.2000
Deploy Ray on K8s using Helm Chart:
image: rayproject/ray:1.12.0-py38
podTypes:
CPU: 4
memory: 30Gi
GPU: 0
rayResources: { "CPU": 0 }
rayWorkerType:
minWorkers: 0
maxWorkers: 6
memory: 30Gi
CPU: 3
GPU: 0
Hi there,
I have a simple job that concatenates 10 small parquet files and saves to one parquet file on GCS bucket. Each small dataset is around 1.0~2.0 GiB and each big parquet file is around 10~20 GiB. I used a for loop to produce 10 big parquet files from 100 small parquet files.
The following pseudo-code may be more clear about what I want to describe.
# file name: `my_job.py`
import ray
import modin.pandas as pd
runtime_env = {
"pip": ["gcsfs", "modin"],
}
ray.init(
address="ray://ray-cluster-ray-head.ray.svc.cluster.local:10001",
namespace="andrew",
runtime_env=runtime_env,
)
def combine_dfs(file_list: List[str], destination_path: str):
df = pd.concat([pd.read_parquet(file) for file in file_list])
df.to_parquet(destination_path) # to GCS bucket
file_dict = {
'job01': ['job01-path1', 'job01-path2', 'job01-path3', '...'],
'job02': ['job02-path1', 'job02-path2', 'job02-path3', '...'],
'job03': ['job03-path1', 'job03-path2', 'job03-path3', '...'],
'job04': ['job04-path1', 'job04-path2', 'job04-path3', '...'],
'job05': ['job05-path1', 'job05-path2', 'job05-path3', '...'],
'job06': ['job06-path1', 'job06-path2', 'job06-path3', '...'],
'job07': ['job07-path1', 'job07-path2', 'job07-path3', '...'],
'job08': ['job08-path1', 'job08-path2', 'job08-path3', '...'],
'job09': ['job09-path1', 'job09-path2', 'job09-path3', '...'],
'job10': ['job10-path1', 'job10-path2', 'job10-path3', '...']
}
for key, item in file_dict.items():
combine_dfs(
item, f"gs://{bucket_name}/{key}.parquet"
)
I ran this script on the terminal and the Ray cluster’s resource was up to max(CPU=18, Mem=180 Gi). I would see the following warning:
UserWarning: Distributing <class 'int'> object. This may take some time.
Sometimes, this warning just went away and the script processed the next big parquet file. HOWEVER, if I got bad luck, this warning would hang up forever.
When the hang-up forever issue happened, I observed two phenomena.
- Ray Dashboard: All workers were IDLE. All CPUs usage were very low(<2%)
- Ray Status: Usage showed
0.0/18.0 CPU
The funny thing was, when I saw this situation I press control + C
once(Only once, if I press twice the script would be terminated). The terminal showed the following logs and the script went to the next process to produce the next big parquet file.
UserWarning: Distributing <class 'int'> object. This may take some time.
^CException ignored in: <function ClientObjectRef.__del__ at 0x7fe6e7ea5b80>
Traceback (most recent call last):
File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/common.py", line 110, in __del__
self._worker.call_release(self.id)
File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 622, in call_release
self._release_server(id)
File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 628, in _release_server
self.data_client.ReleaseObject(ray_client_pb2.ReleaseRequest(ids=[id]))
File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 531, in ReleaseObject
self._async_send(datareq)
File "/home/jovyan/ray/lib/python3.8/site-packages/ray/util/client/dataclient.py", line 426, in _async_send
with self.lock:
KeyboardInterrupt:
......
# Keep going the next processes...
The silly workaround for me now, is I keep monitoring the terminal and Ray Dashboard then press control + C
manually if necessary.
Any ideas for this situation and any tips for preventing it?