Skip to content

[BUG] crash in deserialize when using dask.dataframe.from_delayed #7773

@wmalpica

Description

@wmalpica

Describe the bug
I get a crash when i use dask.dataframe.from_delayed on specific data.

Traceback (most recent call last):
  File "/home/william/repos/willTest/deserialize_bug.py", line 41, in <module>
    main()
  File "/home/william/repos/willTest/deserialize_bug.py", line 38, in main
    print(ddf.compute())
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 2665, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 1974, in gather
    return self.sync(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 844, in sync
    return sync(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 353, in sync
    raise exc.with_traceback(tb)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 336, in f
    result[0] = yield future
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 1868, in _gather
    response = await future
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 1919, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation
    return await retry(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry
    return await coro()
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/core.py", line 645, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/comm/tcp.py", line 218, in read
    msg = await from_frames(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/comm/utils.py", line 78, in from_frames
    res = await offload(_from_frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 1440, in offload
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 1440, in <lambda>
    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/comm/utils.py", line 63, in _from_frames
    return protocol.loads(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 96, in _decode_default
    return merge_and_deserialize(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 454, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 388, in deserialize
    return loads(header, frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 52, in dask_loads
    return loads(header, frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/comm/serialize.py", line 27, in dask_deserialize_cudf_object
    return Serializable.host_deserialize(header, frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/abc.py", line 128, in host_deserialize
    obj = cls.device_deserialize(header, frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/abc.py", line 82, in device_deserialize
    obj = typ.deserialize(header, frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/dataframe.py", line 584, in deserialize
    columns = column.deserialize_columns(header["columns"], column_frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/column/column.py", line 2250, in deserialize_columns
    colobj = col_typ.deserialize(meta, frames[:col_frame_count])
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/column/string.py", line 4984, in deserialize
    children.append(column_type.deserialize(h, [b]))
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/column/column.py", line 1261, in deserialize
    data = Buffer.deserialize(header["data"], [frames[0]])
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/buffer.py", line 131, in deserialize
    raise ValueError(
ValueError: Recieved a `Buffer` with the wrong size. Expected (127359480,), but got (67108864,)

Steps/Code to reproduce bug
I can replicate the issue with the following script:

import cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

import dask.dataframe
import dask_cudf

def load_file(file_path):
    return cudf.read_parquet(file_path)


def main():

    dask_client = Client(LocalCUDACluster())

    workers = list(dask_client.scheduler_info()["workers"])
    futures = []
    files = ['/home/william/miniconda3/envs/deserialize_0.parquet', '/home/william/miniconda3/envs/deserialize_1.parquet']
    counter = 0
    for worker in workers:
        futures.append(
            dask_client.submit(
                load_file, files[counter],  workers=[worker], pure=False,
            )
        )
        counter = counter+1
    ddf = dask.dataframe.from_delayed(futures)

    print(ddf.compute())

if __name__ == "__main__":
    main()
    print("main done")
    # breakpoint()

The data for those files can be found here:
https://drive.google.com/file/d/1cE7mVuH9If5T3zTwNyX8vjqD4Njfz1OL/view?usp=sharing

Expected behavior
Should not crash

Environment overview (please complete the following information)
Using a conda environment with the latest cudf nightlies:
cudf 0.19.0a210330 cuda_10.2_py38_g635dc9c640_309 rapidsai-nightly
dask-cuda 0.19.0a210330 py38_45 rapidsai-nightly
dask-cudf 0.19.0a210330 py38_g635dc9c640_309 rapidsai-nightly
libcudf 0.19.0a210330 cuda10.2_g635dc9c640_309 rapidsai-nightly
librmm 0.19.0a210330 cuda10.2_g9d1ba02_50 rapidsai-nightly
rmm 0.19.0a210330 cuda_10.2_py38_g9d1ba02_50 rapidsai-nightly

Additional context
We started noticing this issue from one of the end to end tests of BlazingSQL a couple days ago. We did not see this issue when we merged in a PR 4 days ago. When gpuCI ran on that PR it was using cudf-0.19.0a210326 . So whatever the issue is, was introduced afterwards.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions