Alternative to DataFrame.apply(func) for accessing multiple columns as input without OOM error

Hi,
I have a use case where I have to create a new columns by applying a function to two other columns. What I currently do is:

import modin.pandas as pd

def do_something(col1, col2, some_arg):
 # Some computation using col1, col2 for e.g. to create new file paths, or resize images etc.
  pass

def get_col3(some_arg: str, row):
    col1 = row["col1"]
    col2 = row["col2"]
    col3 = do_something(col1, col2, some_arg)
    row["col3"] = col3
    return row

col3_function = lambda row: get_col3("Some Arg1", row)

result = df.apply(col3_function, axis=1)

I believe since I’m using a function, Dataframe.apply defaults to pandas and it crashes my program as it runs out of memory.
A brief gist of the error.

    self.error_threshold))
ray._private.memory_monitor.RayOutOfMemoryError: More than 95% of the memory on node 1826a980e445 is used (61.28 / 62.52 GB). The top 10 memory consumers are:

PID     MEM     COMMAND
9088    26.66GiB        ray::IDLE
9113    17.1GiB ray::IDLE
9138    7.77GiB ray::IDLE

Is there any design pattern I can use to capture this type of computation that will leverage modin(distributed, out-of-core) without OOMing?

@RAbraham I am surprised that this is defaulting to pandas. Did you get a warning?

Ray should also be spilling to disk here, even if the memory were near full capacity. This might be worth opening a GitHub issue in Modin so I can ping the Ray team.

@devin-petersohn
Sure, I’ll create a ticket. However, the documentation made me believe that this is known or I misunderstood it.
At pd.DataFrame supported APIs — Modin 0.10.1+15.gd03ed30.dirty documentation , for apply, it says to refer to agg and if one looks at agg, it mentions

agg / aggregate
	
    Dictionary func parameter defaults to pandas

    Numpy operations default to pandas

I thought I was passing the func parameter and hence it defaults to pandas (which I think causes the OOM?)

I don’t know the exact nature of the warning that you are expecting so I’ll just post a bigger output of the error(eliding duplicate error output) if you would like to inspect it.

root@1826a980e445:/layoutlm# pip freeze | grep ray
ray==1.4.1
root@1826a980e445:/layoutlm# pip freeze | grep modin
modin==0.10.1


(pid=15500) 2021-07-27 00:16:36,524     INFO worker.py:746 -- Calling ray.init() again after it has already been called.
2021-07-27 00:17:06,106 WARNING worker.py:1123 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. Task ID: c6953afc4a9f69e9ffffffffffffffffffffffff01000000 Worker ID: f8463e18f93638a0a6b0659626c770924d4709c69d10e8adce5897bc Node ID: 95320272ff4c804c42a24c3fe23220eeee78f161a580304188e8fe1d Worker IP address: 172.17.0.2 Worker port: 41061 Worker PID: 15502

2021-07-27 00:20:51,663 WARNING worker.py:1123 -- The autoscaler failed with the following error:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/monitor.py", line 317, in run
    self._run()
  File "/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/monitor.py", line 207, in _run
    self.update_load_metrics()
  File "/opt/conda/lib/python3.7/site-packages/ray/autoscaler/_private/monitor.py", line 170, in update_load_metrics
    request, timeout=4)
  File "/opt/conda/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/conda/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.DEADLINE_EXCEEDED
        details = "Deadline Exceeded"
        debug_error_string = "{"created":"@1627345250.576488095","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":4}"
>

2021-07-27 00:20:54,019 WARNING worker.py:1123 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. Task ID: 402ddcfdf56ca87affffffffffffffffffffffff01000000 Worker ID: d7527114bb49579d95e858670d29b7254117dc10d9b73173d54cc9c0 Node ID: 95320272ff4c804c42a24c3fe23220eeee78f161a580304188e8fe1d Worker IP address: 172.17.0.2 Worker port: 36575 Worker PID: 15504
(pid=15673) 2021-07-27 00:20:54,803     INFO worker.py:746 -- Calling ray.init() again after it has already been called.
(pid=15673) DecompressionBombWarning: Image size (92160000 pixels) exceeds limit of 89478485 pixels, could be decompression bomb DOS attack.
2021-07-27 00:21:00,302 ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): ray::apply_list_of_funcs() (pid=15830, ip=172.17.0.2)
  File "python/ray/_raylet.pyx", line 458, in ray._raylet.execute_task
  File "/opt/conda/lib/python3.7/site-packages/ray/_private/memory_monitor.py", line 141, in raise_if_low_memory
    self.error_threshold))
ray._private.memory_monitor.RayOutOfMemoryError: More than 95% of the memory on node 1826a980e445 is used (59.91 / 62.52 GB). The top 10 memory consumers are:

PID     MEM     COMMAND
15501   19.93GiB        ray::IDLE
15505   19.67GiB        ray::IDLE
15503   19.55GiB        ray::IDLE
15673   0.36GiB ray::apply_list_of_funcs()
15831   0.17GiB ray::apply_list_of_funcs()
15830   0.06GiB ray::IDLE
15367   0.06GiB /opt/conda/bin/python /opt/conda/bin/inv extract-features -c=/host/home/rajiv/dev/layoutlm/data/vali
15468   0.01GiB /opt/conda/bin/python -u /opt/conda/lib/python3.7/site-packages/ray/new_dashboard/agent.py --node-ip
15414   0.01GiB /opt/conda/bin/python -u /opt/conda/lib/python3.7/site-packages/ray/new_dashboard/dashboard.py --hos
15442   0.0GiB  /opt/conda/lib/python3.7/site-packages/ray/core/src/ray/raylet/raylet --raylet_socket_name=/tmp/ray/

In addition, up to 0.01 GiB of shared memory is currently being used by the Ray object store.
---
--- Tip: Use the `ray memory` command to list active objects in the cluster.
--- To disable OOM exceptions, set RAY_DISABLE_MEMORY_MONITOR=1.
---

I see, it should only happen when the func parameter passed in is a dictionary for agg (e.g. df.agg({"col1": lambda x: x, "col2": lambda x: x}) or if you pass a numpy function that can’t otherwise be interpreted.

The error message looks something like this:

1 Like

Oh I see. I don’t see that warning message then. I’m just going to try with ray 1.5 out of curiosity and if that doesn’t work, I’ll raise a ticket. Thanks Devin.

1 Like

same issue with Ray 1.5* (caveat in ticket). OOM on Dataframe.apply · Issue #3287 · modin-project/modin · GitHub

In the meantime, as it may take time to get to that ticket, is there a workaround that I can use?

Currently, I’m hacking it with the apply function on the Series which means I have to try fit everything in a single column and use apply on that. I’m not even sure that’ll work but it does proceed forward a bit.
I have some complex code so it’ll be some work :). So any tips are appreciated.

What about something like this:

import modin.pandas as pd

def do_something(col1, col2, some_arg):
 # Some computation using col1, col2 for e.g. to create new file paths, or resize images etc.
  pass

def get_col3(some_arg: str, row):
    col1 = row["col1"]
    col2 = row["col2"]
    return do_something(col1, col2, some_arg) # return smaller data

col3_function = lambda row: get_col3("Some Arg1", row)

new_column = df[["col1", "col2"]].apply(col3_function, axis=1)
df["col3"] = new_column

I made some small edits to your example code above. Does this work?

1 Like

I’ll give it a shot and let you know. Thanks Devin :slight_smile: