-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
What happened:
A query in the testing framework for BlazingSQL crashed. I was able to trace the issue to a crash in dask.dataframe.from_delayed. I have put together a reproducer which gives the following error:
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
return msgpack.loads(
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 96, in _decode_default
return merge_and_deserialize(
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 454, in merge_and_deserialize
return deserialize(header, merged_frames, deserializers=deserializers)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 388, in deserialize
return loads(header, frames)
File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 75, in pickle_loads
if writeable[i] == mv.readonly:
IndexError: tuple index out of range
*** IndexError: tuple index out of range
What you expected to happen:
I expect it to not crash :-) It works using version dask=2021.3.0 distributed=2021.3.0 but stopped working with the latest:
dask=2021.3.1 distributed=2021.3.1
Minimal Complete Verifiable Example:
from dask.distributed import Client
import dask.dataframe
import pandas
def load_file(file_path):
return pandas.read_parquet(file_path)
def main():
# running dask-scheduler locally with two dask-worker
dask_client = Client('127.0.0.1:8786')
workers = list(dask_client.scheduler_info()["workers"])
futures = []
files = ['/home/william/test/deserialize_0.parquet', '/home/william/test/deserialize_1.parquet']
counter = 0
for worker in workers:
futures.append(
dask_client.submit(
load_file, files[counter], workers=[worker], pure=False,
)
)
counter = counter+1
if counter > 1:
break
ddf = dask.dataframe.from_delayed(futures)
print(ddf.compute())
if __name__ == "__main__":
main()The data for those files can be found here:
https://drive.google.com/file/d/1cE7mVuH9If5T3zTwNyX8vjqD4Njfz1OL/view?usp=sharing
Anything else we need to know?:
This issue also happens when i use cudf and dask-cudf. I actually filed an issue in their repo (rapidsai/cudf#7773), but realized i could replicate it without cudf. The error there is very very similar, which leads me to believe that its not the parquet reader. Its something in the communication in dask or distributed.
Environment:
- Dask version: It works using version
dask=2021.3.0 distributed=2021.3.0but stopped working with the latest:dask=2021.3.1 distributed=2021.3.1 - Python version: 3.8
- Operating System: Ubuntu 16.04
- Install method (conda, pip, source): conda