Reusing the same Modin DataFrame across different processes

Is it possible once a Modin DataFrame is created from a python process, to reuse and access that dataframe from another process independently of whether the previous process is still alive?

I’ve tried serializing the _query_compiler object using dill and deserializing it from a another process, but I get the following error from the ray cluster:

[2021-10-27 22:30:37,464 C 88714 754187] core_worker.cc:1031:  
2Check failed: has_owner Object IDs generated randomly (ObjectID.from_random()) 
3or out-of-band (ObjectID.from_binary(...)) cannot be passed as a task 
4argument because Ray does not know which task will create them. 
5If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/
6*** StackTrace Information ***
7    ray::GetCallTrace()
8    ray::SpdLogMessage::Flush()
9    ray::SpdLogMessage::~SpdLogMessage()
10    ray::RayLog::~RayLog()
11    ray::core::CoreWorker::GetOwnerAddress()
12    __pyx_f_3ray_7_raylet_prepare_args()
13    __pyx_pw_3ray_7_raylet_10CoreWorker_53submit_task()
14    cfunction_call
15    _PyObject_MakeTpCall
16    call_function
17    _PyEval_EvalFrameDefault
18    _PyEval_EvalCode
19    _PyFunction_Vectorcall
20    call_function
21    _PyEval_EvalFrameDefault
22    _PyEval_EvalCode
23    _PyFunction_Vectorcall
24    _PyEval_EvalFrameDefault
25    _PyEval_EvalCode
26    _PyFunction_Vectorcall
27    method_vectorcall
28    call_function
29    _PyEval_EvalFrameDefault
30    _PyEval_EvalCode
31    _PyFunction_Vectorcall
32    call_function
33    _PyEval_EvalFrameDefault
34    _PyEval_EvalCode
35    _PyFunction_Vectorcall
36    call_function
37    _PyEval_EvalFrameDefault
38    _PyEval_EvalCode
39    _PyFunction_Vectorcall
40    call_function
41    _PyEval_EvalFrameDefault
42    _PyEval_EvalCode
43    _PyFunction_Vectorcall
44    method_vectorcall
45    PyVectorcall_Call
46    _PyEval_EvalFrameDefault
47    _PyEval_EvalCode
48    _PyFunction_Vectorcall
49    method_vectorcall
50    PyVectorcall_Call
51    _PyEval_EvalFrameDefault
52    _PyEval_EvalCode
53    _PyFunction_Vectorcall
54    method_vectorcall
55    call_function
56    _PyEval_EvalFrameDefault

Hi @Cherif_Jazra , thanks for the question. You can try serializing a Modin DataFrame directly.

import pickle
import modin.pandas as pd

df = pd.DataFrame([1,2,3])
print(df)
   0
0  1
1  2
2  3
raw_df = pickle.dumps(df)
df2 = pickle.loads(raw_df)
print(df2)
   0
0  1
1  2
2  3

Please, let us know if it works.

Thanks for the answer @YarShev! serializing and deserializing in the same process is not a problem, it is when I would like to deserialize from another process that I get the above error.

I’m connecting to an existing ray cluster using
ray.init(address='auto')

Any idea how this could be supported?

You can use Modin functionality to retrieve partitions of the DataFrame and pass them directly in a remote function that is run in another process.

import ray
from modin.distributed.dataframe.pandas import unwrap_partitions, from_partitions

@ray.remote
def foo(partitions):
    df = from_partitions(partitions, axis=None)
    # other stuff

partitions = unwrap_partitions(df)
foo.remote(partitions)

Does this make sense?

Thanks @YarShev, I tried that and I still got a ray error. The error is similar (Check failed: has_owner Object IDs generated randomly) but not exactly the same as above:

[2021-10-29 10:25:42,709 I 9445 1432874] core_worker.cc:146: Constructing CoreWorkerProcess. pid: 9445
[2021-10-29 10:25:42,711 I 9445 1432874] grpc_server.cc:92: driver server started, listening on port 10026.
[2021-10-29 10:25:42,716 I 9445 1432874] core_worker.cc:441: Initializing worker at address: 192.168.152.41:10026, worker ID 05000000ffffffffffffffffffffffffffffffffffffffffffffffff, raylet e37ef9391058d5861887e6890c105363a3fbe776cc9857f6ef8b3d49
[2021-10-29 10:25:42,824 I 9445 1432874] io_service_pool.cc:35: IOServicePool is running with 1 io_service.
[2021-10-29 10:25:42,825 I 9445 1433050] service_based_accessor.cc:611: Received notification for node id = e37ef9391058d5861887e6890c105363a3fbe776cc9857f6ef8b3d49, IsAlive = 1
[2021-10-29 10:27:32,101 C 9445 1432874] core_worker.cc:1059:  Check failed: has_owner Object IDs generated randomly (ObjectID.from_random()) or out-of-band (ObjectID.from_binary(...)) cannot be serialized because Ray does not know which task will create them. If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/
*** StackTrace Information ***
    ray::GetCallTrace()
    ray::SpdLogMessage::Flush()
    ray::SpdLogMessage::~SpdLogMessage()
    ray::RayLog::~RayLog()
    ray::core::CoreWorker::GetOwnershipInfo()
    __pyx_pw_3ray_7_raylet_10CoreWorker_91serialize_and_promote_object_ref()
    method_vectorcall_O
    call_function
    _PyEval_EvalFrameDefault
    _PyEval_EvalCode
    _PyFunction_Vectorcall
    save
    save
    dump
    _pickle_Pickler_dump
    method_vectorcall_O
    call_function
    _PyEval_EvalFrameDefault
    _PyFunction_Vectorcall
    call_function
    _PyEval_EvalFrameDefault
    _PyEval_EvalCode
    _PyFunction_Vectorcall
    call_function
    _PyEval_EvalFrameDefault
    _PyFunction_Vectorcall
    call_function
    _PyEval_EvalFrameDefault
    _PyEval_EvalCode
    _PyFunction_Vectorcall
    call_function
    _PyEval_EvalFrameDefault
    __Pyx_PyFunction_FastCallNoKw()
    __Pyx_PyFunction_FastCallDict()
    __Pyx_PyObject_Call2Args()
    __pyx_f_3ray_7_raylet_prepare_args()
    __pyx_pw_3ray_7_raylet_10CoreWorker_53submit_task()
    cfunction_call
    _PyObject_MakeTpCall
    call_function
    _PyEval_EvalFrameDefault
    _PyEval_EvalCode
    _PyFunction_Vectorcall
    call_function
    _PyEval_EvalFrameDefault
    _PyEval_EvalCode
    _PyFunction_Vectorcall
    _PyEval_EvalFrameDefault
    _PyEval_EvalCode
    _PyFunction_Vectorcall

@Cherif_Jazra I think what’s happening is you are sending a local Modin object to a remote, but it looks like Ray doesn’t support sending Ray ObjectRefs between Ray clusters.

There are a couple of ways around this. If you can write the data to S3 (or some other filessystem), that is probably best. You could also convert a Modin dataframe to pandas with df._to_pandas() and then passing that pandas object to Modin on the remote Ray cluster with pd.DataFrame(pandas_obj).

Would either of those work for you? It might be worth opening an issue in Ray for this kind of support, please feel free to cc me there and link the discussion here.

@devin-petersohn thank you for following up!

Just to confirm, it is the same cluster I’m invoking but from different processes.

Unfortunately materializing the data in memory with to_pandas or using S3 would not work for me.

It does seem like ray is not allowing access to the ObjectRefs if they are coming from a different owner. Is that also your takeway from the error message? I do see the following note from the ray 1.x spec regarding ownership that Ownership currently cannot be transferred

If there is no current way to deserialize a Modin DataFrame across process boundary without moving its data across, would there be something that can be done to support this? Would the _owner field in ray.put() (API and Package Reference — Ray v1.7.0) help with this use case? Is this a question for the Ray folks?

It looks like an issue in Ray. @Cherif_Jazra, would you be able to run this example in your execution environment to make sure the issue is not Modin related?

import pandas
import ray
ray.init() # please, initialize Ray in a way you do

df = pandas.DataFrame([1, 2, 3])
o_refs = [ray.put(df)]

@ray.remote
def foo(dfs):
    local_df = ray.get(dfs[0])
    # other stuff with `local_df`

foo.remote(o_refs)

@YarShev this doesn’t work as well. Here is my setup

Jupyter notebook 1

import pandas
import ray
import pickle

ray.init(address='auto', _redis_password='5241590000000000')

df = pandas.DataFrame([1, 2, 3])
o_refs = [ray.put(df)]
serialized_pd = pickle.dumps(o_refs)
--> b'\x80\x04\x95C\x00\x00\x00\x00\x00\x00\x00]\x94\x8c\x0bray._raylet\x94\x8c\tObjectRef\x94\x93\x94C\x1c\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x00\x01\x00\x00\x00\x94\x85\x94R\x94a.'

Jupyter notebook 2 (different python process, same Ray cluster)

import pandas
import ray
import pickle

ray.init(address='auto', _redis_password='5241590000000000')

# Copy from above
serialized_pandas = b'\x80\x04\x95C\x00\x00\x00\x00\x00\x00\x00]\x94\x8c\x0bray._raylet\x94\x8c\tObjectRef\x94\x93\x94C\x1c\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00\x00\x00\x01\x00\x00\x00\x94\x85\x94R\x94a.'

@ray.remote
def foo(dfs):
    local_df = ray.get(dfs[0])

foo.remote(pickle.loads(serialized_pandas))

Kernel on jupyter notebook 2 crashes, with error on ray’s python-core-driver log same as above

6[2021-11-01 10:31:03,244 C 20411 1932815] core_worker.cc:1059:  Check failed: has_owner Object IDs generated randomly (ObjectID.from_random()) or out-of-band (ObjectID.from_binary(...)) cannot be serialized because Ray does not know which task will create them. If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/
7*** StackTrace Information ***
8    ray::GetCallTrace()
9    ray::SpdLogMessage::Flush()
10    ray::SpdLogMessage::~SpdLogMessage()
11    ray::RayLog::~RayLog()
12    ray::core::CoreWorker::GetOwnershipInfo()
13    __pyx_pw_3ray_7_raylet_10CoreWorker_91serialize_and_promote_object_ref()
14    method_vectorcall_O
15    call_function
16    _PyEval_EvalFrameDefault
17    _PyEval_EvalCode
18    _PyFunction_Vectorcall
19    save
20    save
21    dump
22    _pickle_Pickler_dump
23    method_vectorcall_O
24    call_function
25    _PyEval_EvalFrameDefault
26    _PyFunction_Vectorcall
27    call_function
28    _PyEval_EvalFrameDefault
29    _PyEval_EvalCode
30    _PyFunction_Vectorcall
31    call_function
32    _PyEval_EvalFrameDefault
33    _PyFunction_Vectorcall
34    call_function
35    _PyEval_EvalFrameDefault

I agree this is not a modin issue per se. It seems that Ray will not support calls to object refs unless they come from the same driver who created them, which then is tied to the client’s python process’s lifetime. It seems like it could be possible to use actors and namespace to support calls from different process (speculating a bit), but that by design the modin team decided not to use Actors for the reasons mentioned in In Dataframe Systems: Theory, Architecture, and Implementation | EECS at UC Berkeley, Execution on Ray section (p79):

The actor abstraction in Ray allows for stateful computation, and while this may seem like a natural fit for data-intensive applications, we have found that in practice that there are significant drawbacks to using Ray’s actor abstraction to implement a distributed dataframe. First, each actor must have a handle, or line of direct communication, to every other actor for high performing shuffles. When a new compute node becomes available or goes down, as is common in elastic clusters or serverless environments, bringing that node up and allowing it to accept new computation becomes more expensive than anonymous tasks. Additionally, at the time of evaluation, we found that an implementation on Ray’s actor startup time to cause performance be roughly 30% slower than anonymous tasks.

From all the above, would it be fair to say that modin expects to run from the same python process throughout the lifetime of a data scientist workflow?

I wouldn’t fully agree with that statement. Modin doesn’t make any assumptions about being run from the same process, these are Ray limitations. Ray doesn’t allow you to pickle Ray ObjectRef objects and share them “out of band” which is effectively what is being attempted here: `ray.get` hangs on `ObjectRef`s passed out-of-band · Issue #9578 · ray-project/ray · GitHub. Even Actors would not help here because Ray would like to control the flow of object refs and objects between its processes.

There’s a potential solution in that thread to use ray.cloudpickle, but I am not 100% sure it is up to date given the age of that thread. Happy to help where I can!

2 Likes

Thank you for the clarification @devin-petersohn and for sharing the out-of-band ray ticket! I’m going to look further into it and will report back. I may have some follow up question regarding how Modin interacts with Ray

@Cherif_Jazra , do you really need to call pickle.dumps/loads on ObjectRef objects? You can just pass them in a remote function.

How would it be possible without serialization if I need to access the df from a different process?

As an update @devin-petersohn , I have actually been able to do this with ray.cloudpickle(modin_df) instead of just pickle. This seems to indicate to the Ray cluster that it should keep a permanent reference to the objectRef stored in the modin object instead of deleting them when the creator process exists.

It’s not quite clear how this done from the ray documentation and when the reference is deleted, though the following is described in the Reference Count section of Ray 1.x Architecture - Google Docs

References can also be created “out-of-band” by pickling an ObjectRef with ray.cloudpickle. In this case, a permanent reference will be added to the object’s count to prevent the object from going out of scope. Other methods of out-of-band serialization (e.g., passing the binary string that uniquely identifies an ObjectRef) are not guaranteed to work because they do not contain the owner’s address and the reference is not tracked by the owner.

I will follow up on the Ray discussion board.

1 Like

Ray ObjetcRef is just a reference to actual data. You can directly pass Ray ObjectRef objects in a remote function and access the data in the worker. Wherein, a copy of data doesn’t happen if passing Ray ObjectRef objects occurs within a single node since Ray has Plasma object store per node.

Just to clarify, if the data doesn’t support zero-copy serialization following the pickle 5 protocol PEP 574 – Pickle protocol 5 with out-of-band data | Python.org the data is copied to the process memory when you call ray.get.

I know I’m really late to the party, and what I’m going to say is probably not a solution for the exact problem, but…

Modin has a configuration variable named MODIN_PERSISTENT_PICKLE which controls what would happen to DataFrame and Series when you pickle.dump() them, and default value is chosen so objects are pickled fast but use Ray objects inside (hence not fully persistent across runs), but you can set it to True and objects would be fully materialized.

This is essentially the same as to what @devin-petersohn proposed with using ._to_pandas() but evades using protected API calls.