Skip to content

Merging using shuffle = 'disk' is 25x slower than when using shuffle='tasks' #5554

@bsesar

Description

@bsesar

What happened: When using shuffle = 'disk' merging took 50 minutes compared to 2 minutes when using shuffle = 'tasks'. Also, Dask dashboard was showing very low CPU utilization when using shuffle = 'disk'.

What you expected to happen: I expected merging with shuffle = 'disk' to be faster than merging with shuffle = 'tasks' since the load on the scheduler is supposed to be lower in that case.

Minimal Complete Verifiable Example:

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
from time import time
import pandas.util.testing
import dask.config

# folder in which data will be saved
data_folder = 'path_to_folder_for_saving_data'

# enable shuffle compression
# NOTE: you need to install python-snappy package in order to use Snappy compression!!
dask.config.set({
  "dataframe.shuffle-compression": 'Snappy',
  'temporary_directory': data_folder
  })

# start a local Dask cluster
# (note: I used a single machine with 14 cores and started the cluster with n_workers=14 and threads_per_worker=1)
client = Client(local_directory=data_folder )

# create the folder for fake data for the right side of the merge
os.mkdir('%s/right_ddf' % data_folder)

# number of partitions for the right side of the merge
N_partitions_right = 40

# number of rows per partition
Nrows_right = 1038790

# create data for the right side of the merge
for partition_id in range(N_partitions_right):
  df = pd.DataFrame({'ID':pandas.util.testing.rands_array(16, Nrows_right, dtype='O'),
                     'Date':pd.Timestamp('2019-12-01')})
  df.to_parquet('%s/right_ddf/%d.parquet' % (data_folder, partition_id))

# create the folder for fake data for the left side of the merge
os.mkdir('%s/left_ddf' % data_folder)

# number of partitions for the left side of the merge
N_partitions_left = 1325

# number of rows per partition
Nrows_left = 99063

# to create the left side of the merge, sample Nrows_left from a randomly
# selected partion on the right side
partitions_to_sample = np.random.randint(0, high=(N_partitions_right-1), size=N_partitions_left)
for partition_id, partition_to_load in enumerate(partitions_to_sample):
  df = pd.read_parquet('%s/right_ddf/%d.parquet' % (data_folder, partition_to_load))
  df = df.sample(Nrows_left, replace=True)
  df.to_parquet('%s/left_ddf/%d.parquet' % (data_folder, partition_id))

# benchmark merging using tasks and disk shuffle
for shuffle_type in ['tasks', 'disk']:  
  
  # load data
  left_ddf = dd.read_parquet('%s/left_ddf' % data_folder)
  right_ddf = dd.read_parquet('%s/right_ddf' % data_folder)

  # merge
  left_ddf = left_ddf.merge(right_ddf, on='ID', how='inner', shuffle=shuffle_type)
  
  t0 = time()
  left_ddf = left_ddf.persist().head()
  elapsed_time = (time() - t0)/60.
  print('Merging using %s takes %.1f minutes' % (shuffle_type, elapsed_time))

EDIT: If the above MVE is too much for your machine, use Nrows_right = 1038, Nrows_left = 99, and N_partitions_left = 600. Merging with shuffle='tasks' then takes 0.4 min, and merging with shuffle='disk' takes 2.4 minutes, or a factor of 6 difference (with n_workers=14 and threads_per_worker=1).

Environment:

  • Dask version: 2021.11.2
  • Python version: 3.8.12
  • Operating System: Windows Server 2016 64 bits
  • Install method (conda, pip, source): conda

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions