-
Notifications
You must be signed in to change notification settings - Fork 1k
Description
Describe the bug
I get a crash when i use dask.dataframe.from_delayed on specific data.
Traceback (most recent call last):
File "/home/william/repos/willTest/deserialize_bug.py", line 41, in <module>
main()
File "/home/william/repos/willTest/deserialize_bug.py", line 38, in main
print(ddf.compute())
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/dask/base.py", line 283, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/dask/base.py", line 565, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 2665, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 1974, in gather
return self.sync(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 844, in sync
return sync(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 353, in sync
raise exc.with_traceback(tb)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 336, in f
result[0] = yield future
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 1868, in _gather
response = await future
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/client.py", line 1919, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation
return await retry(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry
return await coro()
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/core.py", line 862, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/core.py", line 645, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/comm/tcp.py", line 218, in read
msg = await from_frames(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/comm/utils.py", line 78, in from_frames
res = await offload(_from_frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 1440, in offload
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/william/miniconda3/envs/bsql/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/utils.py", line 1440, in <lambda>
return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/comm/utils.py", line 63, in _from_frames
return protocol.loads(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 96, in _decode_default
return merge_and_deserialize(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 454, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 388, in deserialize
return loads(header, frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 52, in dask_loads
return loads(header, frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/comm/serialize.py", line 27, in dask_deserialize_cudf_object
return Serializable.host_deserialize(header, frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/abc.py", line 128, in host_deserialize
obj = cls.device_deserialize(header, frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/abc.py", line 82, in device_deserialize
obj = typ.deserialize(header, frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/dataframe.py", line 584, in deserialize
columns = column.deserialize_columns(header["columns"], column_frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/column/column.py", line 2250, in deserialize_columns
colobj = col_typ.deserialize(meta, frames[:col_frame_count])
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/column/string.py", line 4984, in deserialize
children.append(column_type.deserialize(h, [b]))
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/column/column.py", line 1261, in deserialize
data = Buffer.deserialize(header["data"], [frames[0]])
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/cudf/core/buffer.py", line 131, in deserialize
raise ValueError(
ValueError: Recieved a `Buffer` with the wrong size. Expected (127359480,), but got (67108864,)
Steps/Code to reproduce bug
I can replicate the issue with the following script:
import cudf
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask.dataframe
import dask_cudf
def load_file(file_path):
return cudf.read_parquet(file_path)
def main():
dask_client = Client(LocalCUDACluster())
workers = list(dask_client.scheduler_info()["workers"])
futures = []
files = ['/home/william/miniconda3/envs/deserialize_0.parquet', '/home/william/miniconda3/envs/deserialize_1.parquet']
counter = 0
for worker in workers:
futures.append(
dask_client.submit(
load_file, files[counter], workers=[worker], pure=False,
)
)
counter = counter+1
ddf = dask.dataframe.from_delayed(futures)
print(ddf.compute())
if __name__ == "__main__":
main()
print("main done")
# breakpoint()
The data for those files can be found here:
https://drive.google.com/file/d/1cE7mVuH9If5T3zTwNyX8vjqD4Njfz1OL/view?usp=sharing
Expected behavior
Should not crash
Environment overview (please complete the following information)
Using a conda environment with the latest cudf nightlies:
cudf 0.19.0a210330 cuda_10.2_py38_g635dc9c640_309 rapidsai-nightly
dask-cuda 0.19.0a210330 py38_45 rapidsai-nightly
dask-cudf 0.19.0a210330 py38_g635dc9c640_309 rapidsai-nightly
libcudf 0.19.0a210330 cuda10.2_g635dc9c640_309 rapidsai-nightly
librmm 0.19.0a210330 cuda10.2_g9d1ba02_50 rapidsai-nightly
rmm 0.19.0a210330 cuda_10.2_py38_g9d1ba02_50 rapidsai-nightly
Additional context
We started noticing this issue from one of the end to end tests of BlazingSQL a couple days ago. We did not see this issue when we merged in a PR 4 days ago. When gpuCI ran on that PR it was using cudf-0.19.0a210326 . So whatever the issue is, was introduced afterwards.