Unable to connect to external Ray cluster

Hi,
I set up a Ray v2.0.0 cluster in EC2 and made sure that I can invoke remote calls from an external host.
Then I tried to load a data frame with the following snippet:

import ray
import ray.util
ray.util.connect('<HOST>:<PORT>')

import modin.pandas as pd
import numpy as np

frame_data = np.random.randint(0, 100, size=(2**10, 2**8))
df = pd.DataFrame(frame_data)

But, I am getting: Exception: Trying to start two instances of ray via client.

Please suggest.
Shah

Hi @Shah_Hossain welcome!

Does the same issue happen when you install Modin from the GitHub master branch? Installation — Modin 0.8.3 documentation

Hello @devin-petersohn,
Thanks for the response. After installing from the master branch, the connection error went away.

But, now I am getting the following error for the sample snippet:

UserWarning: Distributing <class 'numpy.ndarray'> object. This may take some time.
Got Error from data channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612285142.705102771","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"

@Shah_Hossain How are you using Modin? It looks like some utility or library is accessing internal attributes of a pandas DataFrame, but Modin does not have the same memory layout and internal API as pandas.

_data specifically is an internal pandas DataFrame field.

@devin-petersohn: here is the snippet:

import ray
import ray.util
ray.util.connect('<Host>:<Port>')

import modin
import modin.pandas as pd
import numpy as np

frame_data = np.random.randint(0, 100, size=(2**10, 2**8))
df = pd.DataFrame(frame_data)

Ray Version: 2.0.0.dev0
Modin Version: 0.8.3+22.ge99b629

Thanks @Shah_Hossain that part is fine, how are you using it later?

@devin-petersohn after the final line of the above snippet (df = pd.DataFrame(frame_data)), I am getting the error I mentioned earlier. After that, the data frame is not usable (NameError: name 'df' is not defined).

Here is the stack trace:

UserWarning: Distributing <class 'numpy.ndarray'> object. This may take some time.
Got Error from data channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612300730.701408721","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>
Exception in thread Thread-8:
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 87, in _data_main
    raise e
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 62, in _data_main
    for response in resp_stream:
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
    return self._next()
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612300730.701408721","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>

---------------------------------------------------------------------------
ConnectionError                           Traceback (most recent call last)
<ipython-input-4-b75db4cc0579> in <module>
      4 
      5 frame_data = np.random.randint(0, 100, size=(2**10, 2**8))
----> 6 df = pd.DataFrame(frame_data)

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/pandas/dataframe.py in __init__(self, data, index, columns, dtype, copy, query_compiler)
    155                 data=data, index=index, columns=columns, dtype=dtype, copy=copy
    156             )
--> 157             self._query_compiler = from_pandas(pandas_df)._query_compiler
    158         else:
    159             self._query_compiler = query_compiler

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/pandas/utils.py in from_pandas(df)
     61     from .dataframe import DataFrame
     62 
---> 63     return DataFrame(query_compiler=EngineDispatcher.from_pandas(df))
     64 
     65 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/data_management/factories/dispatcher.py in from_pandas(cls, df)
     86     @classmethod
     87     def from_pandas(cls, df):
---> 88         return cls.__engine._from_pandas(df)
     89 
     90     @classmethod

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/data_management/factories/factories.py in _from_pandas(cls, df)
     69     @classmethod
     70     def _from_pandas(cls, df):
---> 71         return cls.io_cls.from_pandas(df)
     72 
     73     @classmethod

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/engines/base/io/io.py in from_pandas(cls, df)
     29     @classmethod
     30     def from_pandas(cls, df):
---> 31         return cls.query_compiler_cls.from_pandas(df, cls.frame_cls)
     32 
     33     @classmethod

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py in from_pandas(cls, df, data_cls)
    205     @classmethod
    206     def from_pandas(cls, df, data_cls):
--> 207         return cls(data_cls.from_pandas(df))
    208 
    209     @classmethod

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/engines/base/frame/data.py in from_pandas(cls, df)
   2006         new_columns = df.columns
   2007         new_dtypes = df.dtypes
-> 2008         new_frame, new_lengths, new_widths = cls._frame_mgr_cls.from_pandas(df, True)
   2009         return cls(
   2010             new_frame,

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py in from_pandas(cls, df, return_dims)
    578                 for j in range(0, len(df.columns), col_chunksize)
    579             ]
--> 580             for i in range(0, len(df), row_chunksize)
    581         ]
    582         if not return_dims:

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py in <listcomp>(.0)
    578                 for j in range(0, len(df.columns), col_chunksize)
    579             ]
--> 580             for i in range(0, len(df), row_chunksize)
    581         ]
    582         if not return_dims:

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py in <listcomp>(.0)
    576             [
    577                 put_func(df.iloc[i : i + row_chunksize, j : j + col_chunksize].copy())
--> 578                 for j in range(0, len(df.columns), col_chunksize)
    579             ]
    580             for i in range(0, len(df), row_chunksize)

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py in put(cls, obj)
    146             A `RayRemotePartition` object.
    147         """
--> 148         return PandasOnRayFramePartition(ray.put(obj), len(obj.index), len(obj.columns))
    149 
    150     @classmethod

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
     44         global _client_hook_enabled
     45         if client_mode_enabled and _client_hook_enabled:
---> 46             return getattr(ray, func.__name__)(*args, **kwargs)
     47         return func(*args, **kwargs)
     48 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/api.py in put(self, *args, **kwargs)
     41             kwargs: opaque keyword arguments
     42         """
---> 43         return self.worker.put(*args, **kwargs)
     44 
     45     def wait(self, *args, **kwargs):

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in put(self, vals)
    191             to_put.append(vals)
    192 
--> 193         out = [self._put(x) for x in to_put]
    194         if single:
    195             out = out[0]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in <listcomp>(.0)
    191             to_put.append(vals)
    192 
--> 193         out = [self._put(x) for x in to_put]
    194         if single:
    195             out = out[0]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in _put(self, val)
    206         data = dumps_from_client(val, self._client_id)
    207         req = ray_client_pb2.PutRequest(data=data)
--> 208         resp = self.data_client.PutObject(req)
    209         return ClientObjectRef(resp.id)
    210 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in PutObject(self, request, context)
    125                   context=None) -> ray_client_pb2.PutResponse:
    126         datareq = ray_client_pb2.DataRequest(put=request, )
--> 127         resp = self._blocking_send(datareq)
    128         return resp.put
    129 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in _blocking_send(self, req)
    104             if self._in_shutdown:
    105                 raise ConnectionError(
--> 106                     f"cannot send request {req}: data channel shutting down")
    107             data = self.ready_data[req_id]
    108             del self.ready_data[req_id]

ConnectionError: cannot send request req_id: 2
put {
  data: "\200\005\225.\001\000\000\000\000\000\000\214\021pandas.core.frame\224\214\tDataFrame\224\223\224)\201\224}\224(\214\004_mgr\224\214\036pandas.core.internals.managers\224\214\014BlockManager\224\223\224)\201\224(]\224(\214\030pandas.core.indexes.base\224\214\n_new_Index\224\223\224\214\031pandas.core.indexes.range\224\214\nRangeIndex\224\223\224}\224(\214\004name\224N\214\005start\224K\000\214\004stop\224K\200\214\004step\224K\001u\206\224R\224h\rh\020}\224(h\022Nh\023K\000h\024M\000\002h\025K\001u\206\224R\224e]\224\214\022numpy.core.numeric\224\214\013_frombuffer\224\223\224(\226\000\000\010\000 
..............
..............
..............
..............
4K\000K\200K\001\207\224R\224uaust\224b\214\004_typ\224\214\tdataframe\224\214\t_metadata\224]\224\214\005attrs\224}\224ub."
}
: data channel shutting down

Very strange, can you try the following:

import ray
import ray.util
ray.util.connect('<HOST>:<PORT>')

import modin.pandas as pd
import pandas
import numpy as np

x = ray.put(pandas.DataFrame(np.random.randint(0, 100, size=(2**4, 2**4)))

Does this work? From the trace it looks like the dataframe is not getting serialized properly by Ray.

I am getting a similar error. Let me try to do some data manipulation with Ray without Modin.

Got Error from data channel -- shutting down: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612307207.723737628","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>
Exception in thread Thread-8:
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 87, in _data_main
    raise e
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py", line 62, in _data_main
    for response in resp_stream:
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__
    return self._next()
  File "/home/ec2-user/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/grpc/_channel.py", line 803, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Exception iterating responses: 'DataFrame' object has no attribute '_data'"
	debug_error_string = "{"created":"@1612307207.723737628","description":"Error received from peer ipv4:52.43.158.36:51005","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Exception iterating responses: 'DataFrame' object has no attribute '_data'","grpc_status":2}"
>

---------------------------------------------------------------------------
ConnectionError                           Traceback (most recent call last)
<ipython-input-4-db5f700c182c> in <module>
      6 
      7 
----> 8 x = ray.put(pandas.DataFrame(np.random.randint(0, 100, size=(2**4, 2**4))))
      9 
     10 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
     44         global _client_hook_enabled
     45         if client_mode_enabled and _client_hook_enabled:
---> 46             return getattr(ray, func.__name__)(*args, **kwargs)
     47         return func(*args, **kwargs)
     48 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/api.py in put(self, *args, **kwargs)
     41             kwargs: opaque keyword arguments
     42         """
---> 43         return self.worker.put(*args, **kwargs)
     44 
     45     def wait(self, *args, **kwargs):

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in put(self, vals)
    191             to_put.append(vals)
    192 
--> 193         out = [self._put(x) for x in to_put]
    194         if single:
    195             out = out[0]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in <listcomp>(.0)
    191             to_put.append(vals)
    192 
--> 193         out = [self._put(x) for x in to_put]
    194         if single:
    195             out = out[0]

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/worker.py in _put(self, val)
    206         data = dumps_from_client(val, self._client_id)
    207         req = ray_client_pb2.PutRequest(data=data)
--> 208         resp = self.data_client.PutObject(req)
    209         return ClientObjectRef(resp.id)
    210 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in PutObject(self, request, context)
    125                   context=None) -> ray_client_pb2.PutResponse:
    126         datareq = ray_client_pb2.DataRequest(put=request, )
--> 127         resp = self._blocking_send(datareq)
    128         return resp.put
    129 

~/anaconda3/envs/ray-env-v200/lib/python3.7/site-packages/ray/util/client/dataclient.py in _blocking_send(self, req)
    104             if self._in_shutdown:
    105                 raise ConnectionError(
--> 106                     f"cannot send request {req}: data channel shutting down")
    107             data = self.ready_data[req_id]
    108             del self.ready_data[req_id]

ConnectionError: cannot send request req_id: 2
put {

Modin was not a part of that computation.

The error is coming from Ray serialization, it is probably worth raising an issue with the Ray team because this seems like a bug.

@Shah_Hossain I have created an issue for your case on Ray repo, since you were able to reproduce without Modin: Serialization error on Ray 2.0rc with pandas DataFrames · Issue #13882 · ray-project/ray · GitHub

I posted your trace there, please take a look!

@devin-petersohn: right. I raised the issue here: Error in RPC call in client mode - Monitoring & Debugging - Ray

Ok, great. I will link the issues together in the GitHub.