Large object, shared memory

Hey,

First time poster and new to modin!

I’m using modin to interrogate a large graph in parallel.
I seem to be running out of memory and am wondering how the graph is being stored in memory.

my use case looks like so:

def main():
    def traverse_graph(row, graph):
        u, v = row.u, row.v
        shortest_path = networkx.shortest_paths(graph, u, v)
        # do something with shortest path
        return [node for node in shortest_path if some_filtering logic]

    # create some huge graph 40gb+
    graph = networkx.DiGraph()

    # load the graphing candidates from csv
    candidate_df = pd.from_csv('some_csv_file.csv')
    candidate_df["shortest_path_and_extras"] = candidate_df.apply(lambda row: traverse_graph(row, graph), axis=1)

I’m concered the graph is being passed to each worker and am wondering the best way to avoid it.
I tried:
graph_memory_reference = ray.put(graph) in conjuction with ray.get(graph_memory_reference) but the issue seems to persist

Hi @zkyz, welcome!

Yes, the problem is likely that the graph object is getting replicated. Even if you do ray.put and ray.get, the object will be replicated because each worker must deserialize the object in its own local memory.

Assuming there is no way to predictably slice/subset the graph, the best way I can see to avoid replicating the object on each worker is to remove all parallelism. It will look something like this (no guarantee that this works as-is :smile:)

import ray
import modin.pandas as pd
from modin.distributed.pandas import unwrap_partitions, from_partitions

@ray.remote
class GraphClass:
    def __init__(self, graph):
        self.graph = graph  # large graph 40gb+

    def shortest_path(partition):
        """Actor method to find shortest path in partition"""
        def traverse_graph(row):
            u, v = row.u, row.v
            shortest_path = networkx.shortest_paths(self.graph, u, v) # self.graph 
            # do something with shortest path
            return [node for node in shortest_path if some_filtering logic]
        # Call inner function in apply
        return partition.apply(lambda row: traverse_graph(row)

candidate_df = pd.read_csv("some_csv_file.csv")
partitions = unwrap_partitions(candidate_df, axis=0)  # get partitions as rows
graph_actor = GraphClass.remote(networkx.DiGraph())  # pass big graph
applied_parts = [graph_actor.traverse_graph(partition) for partition in partitions]
result_df = from_partitions(applied_parts, axis=0) 

This uses some of our newer developer tools to be able to pass Modin partitions into a Ray task or actor method.

Let me know how this goes and if it helps. Also any feedback on the developer APIs is welcome!

Hmmm,

That’s a shame parallelism is pretty essential as I’ll be needing to do millions of shortest path queries.

I think my only option may be to run a local flask API with the graph loaded into memory elsewhere.

This is something that’s tied to Ray and Dask’s distributed shared memory model. Even with the local flask API solution, I am not sure that there would be parallelism, rather a streaming solution. The solution I offer above would be more or less identical to that.