Out of Core: read big csv file

Hi there,

I’m currently trying to figure out how to make Modin work with out-of-core data. The environment variable is set: “export MODIN_OUT_OF_CORE=true”. But somehow the read_csv() never works and gets killed. I read something related about changing ray’s plasma directory but that doesn’t seem to work either.

System Information:

  • Ray on Kubernetes
    • 2 nodes each with: 2 CPU, 4GB RAM, 20GB hard disk
  • ray version: 1.0.1
  • modin version: 0.8.2
  • python version: 3.7.7

Code:

import ray
ray.init(address="localhost:6379")
import modin.pandas as md

df = md.read_csv('data/big.csv') # 8 GB csv file

{‘node_ip_address’: ‘10.121.24.160’, ‘raylet_ip_address’: ‘10.121.24.160’, ‘redis_address’: ‘10.121.24.160:6379’, ‘object_store_address’: ‘/tmp/ray/session_2020-11-14_11-28-46_853445_493/sockets/plasma_store’, ‘raylet_socket_name’: ‘/tmp/ray/session_2020-11-14_11-28-46_853445_493/sockets/raylet’, ‘webui_url’: ‘10.121.24.160:8265’, ‘session_dir’: ‘/tmp/ray/session_2020-11-14_11-28-46_853445_493’, ‘metrics_export_port’: 59092, ‘node_id’: ‘84d1aecbd928ff468dd73fc9d3c814c9aa142195’}

ray status:

Cluster status: 1/1 target nodes (0 pending)

  • MostDelayedHeartbeats: {‘10.121.24.160’: 0.16134071350097656, ‘10.121.14.169’: 0.16130685806274414}
  • NodeIdleSeconds: Min=1367 Mean=1394 Max=1421
  • NumNodesConnected: 2
  • NumNodesUsed: 0.0
  • ResourceUsage: 0.0/2.0 CPU, 0.0 GiB/2.1 GiB memory, 0.0 GiB/0.63 GiB object_store_memory
  • TimeSinceLastHeartbeat: Min=0 Mean=0 Max=0

ray config file to deploy cluster:

# An unique identifier for the head node and workers of this cluster.
cluster_name: default

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 1

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 1

# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 1

# Whether or not to autoscale aggressively. If this is enabled, if at any point
#   we would start more workers, we start at least enough to bring us to
#   initial_workers.
autoscaling_mode: default

# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.8

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Kubernetes resources that need to be configured for the autoscaler to be
# able to manage the Ray cluster. If any of the provided resources don't
# exist, the autoscaler will attempt to create them. If this fails, you may
# not have the required permissions and will have to request them to be
# created by your cluster administrator.
provider:
    type: kubernetes

    # Exposing external IP addresses for ray pods isn't currently supported.
    use_internal_ips: true

    # Namespace to use for all resources created.
    namespace: ray

    # ServiceAccount created by the autoscaler for the head node pod that it
    # runs in. If this field isn't provided, the head pod config below must
    # contain a user-created service account with the proper permissions.
    autoscaler_service_account:
        apiVersion: v1
        kind: ServiceAccount
        metadata:
            name: autoscaler

    # Role created by the autoscaler for the head node pod that it runs in.
    # If this field isn't provided, the role referenced in
    # autoscaler_role_binding must exist and have at least these permissions.
    autoscaler_role:
        kind: Role
        apiVersion: rbac.authorization.k8s.io/v1
        metadata:
            name: autoscaler
        rules:
        - apiGroups: [""]
          resources: ["pods", "pods/status", "pods/exec"]
          verbs: ["get", "watch", "list", "create", "delete", "patch"]

    # RoleBinding created by the autoscaler for the head node pod that it runs
    # in. If this field isn't provided, the head pod config below must contain
    # a user-created service account with the proper permissions.
    autoscaler_role_binding:
        apiVersion: rbac.authorization.k8s.io/v1
        kind: RoleBinding
        metadata:
            name: autoscaler
        subjects:
        - kind: ServiceAccount
          name: autoscaler
        roleRef:
            kind: Role
            name: autoscaler
            apiGroup: rbac.authorization.k8s.io

    services:
      # Service that maps to the head node of the Ray cluster.
      - apiVersion: v1
        kind: Service
        metadata:
            # NOTE: If you're running multiple Ray clusters with services
            # on one Kubernetes cluster, they must have unique service
            # names.
            name: ray-head
        spec:
            # This selector must match the head node pod's selector below.
            selector:
                component: ray-head
            ports:
                - protocol: TCP
                  port: 8000
                  targetPort: 8000

      # Service that maps to the worker nodes of the Ray cluster.
      - apiVersion: v1
        kind: Service
        metadata:
            # NOTE: If you're running multiple Ray clusters with services
            # on one Kubernetes cluster, they must have unique service
            # names.
            name: ray-workers
        spec:
            # This selector must match the worker node pods' selector below.
            selector:
                component: ray-worker
            ports:
                - protocol: TCP
                  port: 8000
                  targetPort: 8000

# Kubernetes pod config for the head node pod.
head_node:
    apiVersion: v1
    kind: Pod
    metadata:
        # Automatically generates a name for the pod with this prefix.
        generateName: ray-head-

        # Must match the head node service selector above if a head node
        # service is required.
        labels:
            component: ray-head
    spec:
        # Change this if you altered the autoscaler_service_account above
        # or want to provide your own.
        serviceAccountName: autoscaler

        # Restarting the head node automatically is not currently supported.
        # If the head node goes down, `ray up` must be run again.
        restartPolicy: Never

        # This volume allocates shared memory for Ray to use for its plasma
        # object store. If you do not provide this, Ray will fall back to
        # /tmp which cause slowdowns if is not a shared memory volume.
        volumes:
        - name: dshm
          emptyDir:
              medium: Memory

        containers:
        - name: ray-node
          imagePullPolicy: Always
          # You are free (and encouraged) to use your own container image,
          # but it should have the following installed:
          #   - rsync (used for `ray rsync` commands and file mounts)
          #   - screen (used for `ray attach`)
          #   - kubectl (used by the autoscaler to manage worker pods)
          image: rayproject/autoscaler
          # Do not change this command - it keeps the pod alive until it is
          # explicitly killed.
          command: ["/bin/bash", "-c", "--"]
          args: ["trap : TERM INT; sleep infinity & wait;"]
          ports:
              - containerPort: 6379 # Redis port.
              - containerPort: 6380 # Redis port.
              - containerPort: 6381 # Redis port.
              - containerPort: 12345 # Ray internal communication.
              - containerPort: 12346 # Ray internal communication.

          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumeMounts:
              - mountPath: /dev/shm
                name: dshm
          resources:
              requests:
                  cpu: 1000m
                  memory: 512Mi
              limits:
                  # The maximum memory that this pod is allowed to use. The
                  # limit will be detected by ray and split to use 10% for
                  # redis, 30% for the shared memory object store, and the
                  # rest for application memory. If this limit is not set and
                  # the object store size is not set manually, ray will
                  # allocate a very large object store in each pod that may
                  # cause problems for other pods.
                  memory: 2Gi
          env:
              # This is used in the head_start_ray_commands below so that
              # Ray can spawn the correct number of processes. Omitting this
              # may lead to degraded performance.
              - name: MY_CPU_REQUEST
                valueFrom:
                    resourceFieldRef:
                        resource: requests.cpu

# Kubernetes pod config for worker node pods.
worker_nodes:
    apiVersion: v1
    kind: Pod
    metadata:
        # Automatically generates a name for the pod with this prefix.
        generateName: ray-worker-

        # Must match the worker node service selector above if a worker node
        # service is required.
        labels:
            component: ray-worker
    spec:
        serviceAccountName: default

        # Worker nodes will be managed automatically by the head node, so
        # do not change the restart policy.
        restartPolicy: Never

        # This volume allocates shared memory for Ray to use for its plasma
        # object store. If you do not provide this, Ray will fall back to
        # /tmp which cause slowdowns if is not a shared memory volume.
        volumes:
        - name: dshm
          emptyDir:
              medium: Memory

        containers:
        - name: ray-node
          imagePullPolicy: Always
          # You are free (and encouraged) to use your own container image,
          # but it should have the following installed:
          #   - rsync (used for `ray rsync` commands and file mounts)
          image: rayproject/autoscaler
          # Do not change this command - it keeps the pod alive until it is
          # explicitly killed.
          command: ["/bin/bash", "-c", "--"]
          args: ["trap : TERM INT; sleep infinity & wait;"]
          ports:
              - containerPort: 12345 # Ray internal communication.
              - containerPort: 12346 # Ray internal communication.

          # This volume allocates shared memory for Ray to use for its plasma
          # object store. If you do not provide this, Ray will fall back to
          # /tmp which cause slowdowns if is not a shared memory volume.
          volumeMounts:
              - mountPath: /dev/shm
                name: dshm
          resources:
              requests:
                  cpu: 1000m
                  memory: 512Mi
              limits:
                  # This memory limit will be detected by ray and split into
                  # 30% for plasma, and 70% for workers.
                  memory: 2Gi
          env:
              # This is used in the head_start_ray_commands below so that
              # Ray can spawn the correct number of processes. Omitting this
              # may lead to degraded performance.
              - name: MY_CPU_REQUEST
                valueFrom:
                    resourceFieldRef:
                        resource: requests.cpu

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands:
    - pip install s3fs
    - pip install modin
    - echo 'export MODIN_RAY_CLUSTER=True' >> ~/.bashrc
    - echo 'export MODIN_OUT_OF_CORE=true' >> ~/.bashrc
    - echo 'export MODIN_MEMORY=10000000000' >> ~/.bashrc

# Custom commands that will be run on the head node after common setup.
head_setup_commands:
    - pip install boto3==1.4.8  # 1.4.8 adds InstanceMarketOptions
    - echo 'export MODIN_REDIS_ADDRESS="localhost:6379"' >> ~/.bashrc

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
# Note webui-host is set to 0.0.0.0 so that kubernetes can port forward.
head_start_ray_commands:
    - ray stop
    - mkdir /new-folder/tmp
    - echo 'export MODIN_ON_RAY_PLASMA_DIR="/new-folder/tmp"' >> ~/.bashrc
    # - echo 'export TMPDIR="$(dirname $(mktemp tmp.XXXXXXXXXX -ut))"' >> ~/.bashrc
    - echo 'export MEMORY_STORE_SIZE=7000000000' >> ~/.bashrc
    - ulimit -n 65536; ray start --head --num-cpus=$MY_CPU_REQUEST --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host 0.0.0.0 --plasma-directory=$MODIN_ON_RAY_PLASMA_DIR

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - mkdir /new-folder/tmp
    - echo 'export MODIN_ON_RAY_PLASMA_DIR="/new-folder/tmp"' >> ~/.bashrc
    # - echo 'export TMPDIR="$(dirname $(mktemp tmp.XXXXXXXXXX -ut))"' >> ~/.bashrc
    - echo 'export MEMORY_STORE_SIZE=7000000000' >> ~/.bashrc
    - ulimit -n 65536; ray start --num-cpus=$MY_CPU_REQUEST --address=$RAY_HEAD_IP:6379 --object-manager-port=8076 --plasma-directory=$MODIN_ON_RAY_PLASMA_DIR

Hi @leo, thanks for posting!

Can you try adding this to the - ulimit -n 65536; line:

--object-store-memory 10000000000

And see if that resolves it? I think that ray is using the default value, which is 40% of the RAM capacity.

The MODIN_OUT_OF_CORE flag will not change Ray’s object store size if the cluster already exists, I don’t think we are able to do that in Ray yet. This should probably be added to the documentation, and I am on a big documentation push. Would you be able to open an issue on the GitHub about the documentation for out of core functionality?

Let me know if setting the object store memory solves your issue!

Hi @devin-petersohn, thanks for the response!

I tried ‘–object-store-memory 10000000000’ but then I came across this ray issue, so I had to downgrade the ray version to 0.7.3 (Python 2.7.15) that leads to pandas dependency issue. Morever, I tried (ray==0.7.1, modin==0.5.4) which works for pandas but in this case ray won’t start (/ray-project/ray/issues/5119).

Hi @leo, are you limited to Python 2 only?

No. It is just I am using the templates (/ray-project/ray/tree/ray-0.7.3/kubernetes) by ray to deploy it to a kubernetes cluster. It uses Python 2, and that is what I get inside the cluster to each node.