Skip to content

Error when loading dataframe into GPUs #4652

@9849842

Description

@9849842

What happened:

When loading dataframe into multiple GPUs, I get error saying too many bytes.

What you expected to happen:

Load the data into the GPU without giving me an error.

Minimal Complete Verifiable Example:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask import dataframe as dd
import xgboost as xgb
from xgboost.dask import DaskDMatrix
from dask.distributed import wait
import cupy as cp
import numpy as np
import pandas as pd

cluster = LocalCUDACluster()
client = Client(cluster)

xTrain = dd.from_pandas(pd.DataFrame(np.zeros(shape=(14000000,220))),npartitions = 4)
yTrain = dd.from_pandas(pd.DataFrame(np.zeros(shape=(14000000,1))),npartitions = 4)
xTrainG = xTrain.map_partitions(cp.array).persist()
yTrainG = yTrain.map_partitions(cp.array).persist()
wait([xTrainG,yTrainG])

dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client, data=xTrainG, label=yTrainG, feature_names=features, max_bin=100)

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-10-b35d47d9db17> in <module>
      1 xTrain = dd.from_pandas(pd.DataFrame(np.zeros(shape=(14000000,220))),npartitions = 4)
      2 yTrain = dd.from_pandas(pd.DataFrame(np.zeros(shape=(14000000,1))),npartitions = 4)
----> 3 xTrainG = xTrain.map_partitions(cp.array).persist()
      4 yTrainG = yTrain.map_partitions(cp.array).persist()
      5 wait([xTrainG,yTrainG])

~/.conda/envs/python/lib/python3.7/site-packages/dask/base.py in persist(self, **kwargs)
    254         dask.base.persist
    255         """
--> 256         (result,) = persist(self, traverse=False, **kwargs)
    257         return result
    258 

~/.conda/envs/python/lib/python3.7/site-packages/dask/base.py in persist(*args, **kwargs)
    756                 if client.get == schedule:
    757                     results = client.persist(
--> 758                         collections, optimize_graph=optimize_graph, **kwargs
    759                     )
    760                     return repack(results)

~/.conda/envs/python/lib/python3.7/site-packages/distributed/client.py in persist(self, collections, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, **kwargs)
   2950             retries=retries,
   2951             user_priority=priority,
-> 2952             fifo_timeout=fifo_timeout,
   2953             actors=actors,
   2954         )

~/.conda/envs/python/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2540             if not isinstance(dsk, HighLevelGraph):
   2541                 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
-> 2542 
   2543             dsk = highlevelgraph_pack(dsk, self, keyset)
   2544 

~/.conda/envs/python/lib/python3.7/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys)
    962                 }
    963             )
--> 964         return dumps_msgpack({"layers": layers})
    965 
    966     @staticmethod

~/.conda/envs/python/lib/python3.7/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
    161                         coll = put_in(keys[:-1], coll, holder)
    162                     holder[keys[-1]] = val
--> 163                 else:
    164                     coll = val
    165                 return coll

~/.conda/envs/python/lib/python3.7/site-packages/msgpack/__init__.py in packb(o, **kwargs)
     33     See :class:`Packer` for options.
     34     """
---> 35     return Packer(**kwargs).pack(o)
     36 
     37 

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

ValueError: bytes object is too large

Anything else we need to know?:
Amazon AWS g4dn.x12large instance (4x T4's).

Environment:

  • Dask version: 2021.3.1
  • Python version: 3.7
  • Operating System: Ubuntu 18.04
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions