-
-
Notifications
You must be signed in to change notification settings - Fork 756
Open
Description
What happened: When using shuffle = 'disk' merging took 50 minutes compared to 2 minutes when using shuffle = 'tasks'. Also, Dask dashboard was showing very low CPU utilization when using shuffle = 'disk'.
What you expected to happen: I expected merging with shuffle = 'disk' to be faster than merging with shuffle = 'tasks' since the load on the scheduler is supposed to be lower in that case.
Minimal Complete Verifiable Example:
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
from time import time
import pandas.util.testing
import dask.config
# folder in which data will be saved
data_folder = 'path_to_folder_for_saving_data'
# enable shuffle compression
# NOTE: you need to install python-snappy package in order to use Snappy compression!!
dask.config.set({
"dataframe.shuffle-compression": 'Snappy',
'temporary_directory': data_folder
})
# start a local Dask cluster
# (note: I used a single machine with 14 cores and started the cluster with n_workers=14 and threads_per_worker=1)
client = Client(local_directory=data_folder )
# create the folder for fake data for the right side of the merge
os.mkdir('%s/right_ddf' % data_folder)
# number of partitions for the right side of the merge
N_partitions_right = 40
# number of rows per partition
Nrows_right = 1038790
# create data for the right side of the merge
for partition_id in range(N_partitions_right):
df = pd.DataFrame({'ID':pandas.util.testing.rands_array(16, Nrows_right, dtype='O'),
'Date':pd.Timestamp('2019-12-01')})
df.to_parquet('%s/right_ddf/%d.parquet' % (data_folder, partition_id))
# create the folder for fake data for the left side of the merge
os.mkdir('%s/left_ddf' % data_folder)
# number of partitions for the left side of the merge
N_partitions_left = 1325
# number of rows per partition
Nrows_left = 99063
# to create the left side of the merge, sample Nrows_left from a randomly
# selected partion on the right side
partitions_to_sample = np.random.randint(0, high=(N_partitions_right-1), size=N_partitions_left)
for partition_id, partition_to_load in enumerate(partitions_to_sample):
df = pd.read_parquet('%s/right_ddf/%d.parquet' % (data_folder, partition_to_load))
df = df.sample(Nrows_left, replace=True)
df.to_parquet('%s/left_ddf/%d.parquet' % (data_folder, partition_id))
# benchmark merging using tasks and disk shuffle
for shuffle_type in ['tasks', 'disk']:
# load data
left_ddf = dd.read_parquet('%s/left_ddf' % data_folder)
right_ddf = dd.read_parquet('%s/right_ddf' % data_folder)
# merge
left_ddf = left_ddf.merge(right_ddf, on='ID', how='inner', shuffle=shuffle_type)
t0 = time()
left_ddf = left_ddf.persist().head()
elapsed_time = (time() - t0)/60.
print('Merging using %s takes %.1f minutes' % (shuffle_type, elapsed_time))EDIT: If the above MVE is too much for your machine, use Nrows_right = 1038, Nrows_left = 99, and N_partitions_left = 600. Merging with shuffle='tasks' then takes 0.4 min, and merging with shuffle='disk' takes 2.4 minutes, or a factor of 6 difference (with n_workers=14 and threads_per_worker=1).
Environment:
- Dask version: 2021.11.2
- Python version: 3.8.12
- Operating System: Windows Server 2016 64 bits
- Install method (conda, pip, source): conda
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels