Skip to content

crash in deserialize when using dask.dataframe.from_delayed #7490

@wmalpica

Description

@wmalpica

What happened:
A query in the testing framework for BlazingSQL crashed. I was able to trace the issue to a crash in dask.dataframe.from_delayed. I have put together a reproducer which gives the following error:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 104, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/core.py", line 96, in _decode_default
    return merge_and_deserialize(
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 454, in merge_and_deserialize
    return deserialize(header, merged_frames, deserializers=deserializers)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 388, in deserialize
    return loads(header, frames)
  File "/home/william/miniconda3/envs/bsql/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 75, in pickle_loads
    if writeable[i] == mv.readonly:
IndexError: tuple index out of range
*** IndexError: tuple index out of range

What you expected to happen:
I expect it to not crash :-) It works using version dask=2021.3.0 distributed=2021.3.0 but stopped working with the latest:
dask=2021.3.1 distributed=2021.3.1

Minimal Complete Verifiable Example:

from dask.distributed import Client
import dask.dataframe
import pandas

def load_file(file_path):
    return pandas.read_parquet(file_path)


def main():
    # running dask-scheduler locally with two dask-worker
    dask_client = Client('127.0.0.1:8786')

    workers = list(dask_client.scheduler_info()["workers"])
    futures = []
    files = ['/home/william/test/deserialize_0.parquet', '/home/william/test/deserialize_1.parquet']
    counter = 0
    for worker in workers:
        futures.append(
            dask_client.submit(
                load_file, files[counter],  workers=[worker], pure=False,
            )
        )
        counter = counter+1
        if counter > 1:
            break
    ddf = dask.dataframe.from_delayed(futures)
    
    print(ddf.compute())

if __name__ == "__main__":
    main()

The data for those files can be found here:
https://drive.google.com/file/d/1cE7mVuH9If5T3zTwNyX8vjqD4Njfz1OL/view?usp=sharing

Anything else we need to know?:
This issue also happens when i use cudf and dask-cudf. I actually filed an issue in their repo (rapidsai/cudf#7773), but realized i could replicate it without cudf. The error there is very very similar, which leads me to believe that its not the parquet reader. Its something in the communication in dask or distributed.

Environment:

  • Dask version: It works using version dask=2021.3.0 distributed=2021.3.0 but stopped working with the latest: dask=2021.3.1 distributed=2021.3.1
  • Python version: 3.8
  • Operating System: Ubuntu 16.04
  • Install method (conda, pip, source): conda

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions