Skip to content

Behaviour of map_partitions with a Pandas dataframe as argument - In this case uncorrect merge #6357

@odovad

Description

@odovad

I want to perform a merge between a Dask dataframe and a Pandas Dataframe and it seems like the pandas dataframe I pass as argument is partitionned in a way that it does not give the result we expect in the end.
I wanted to be able to able to keep the index on the Dask Dataframe, that's why I tried to use a map_partitions instead of a merge on the Dask Dataframe directly.

I think the reason of this behaviour is explained by "DataFrame-like args (both dask and pandas) will be repartitioned to align (if necessary) before applying the function." in the API documentation.

food_stores = pd.DataFrame({'foodstore_id':[1,2,3], 
                            'seller_firstname':['Bob', 'Alice', 'Monique'], 
                            'seller_lastname':['Marchand', 'Boul', 'Mul'],  
                            'online_status':[1,1,0]})
records = pd.DataFrame({'id':[1,2,3,4,5,6,7,8,9,10], 
                        'name':['product1', 'product2', 'product3', 'product4', 'product5', 
                                'product6', 'product7', 'product8', 'product9', 'product10'],
                        'seller_firstname':['Bob', 'Alice', 'Monique', 'Alice', 'Bob', 
                                            'Alice', 'Monique', 'Bob', 'Alice', 'Bob'],
                        'seller_lastname':['Marchand', 'Boul', 'Mul', 'Boul', 'Marchand', 
                                           'Boul', 'Mul', 'Marchand', 'Boul', 'Marchand'],
                        })
records = records.set_index('id')
records = dd.from_pandas(records, npartitions=3)

def mapping(part, pandas_df):
    pandas_df = pandas_df.set_index(['seller_firstname', 'seller_lastname'])    
    return part.merge(pandas_df, how='left', 
                      left_on=['seller_firstname', 'seller_lastname'], 
                      right_on=['seller_firstname', 'seller_lastname'], right_index=True)

print(records.map_partitions(mapping, food_stores).compute())
id name seller_firstname seller_lastname foodstore_id online_status
1 product1 Bob Marchand nan nan
2 product2 Alice Boul nan nan
3 product3 Monique Mul 3 0
4 product4 Alice Boul nan nan
5 product5 Bob Marchand nan nan
6 product6 Alice Boul nan nan
7 product7 Monique Mul nan nan
8 product8 Bob Marchand nan nan
9 product9 Alice Boul nan nan
10 product10 Bob Marchand nan nan

I thought I would get the result I expect with the following :

food_stores = pd.DataFrame({'foodstore_id':[1,2,3], 
                            'seller_firstname':['Bob', 'Alice', 'Monique'], 
                            'seller_lastname':['Marchand', 'Boul', 'Mul'],  
                            'online_status':[1,1,0]})
records = pd.DataFrame({'id':[1,2,3,4,5,6,7,8,9,10], 
                        'name':['product1', 'product2', 'product3', 'product4', 'product5', 
                                'product6', 'product7', 'product8', 'product9', 'product10'],
                        'seller_firstname':['Bob', 'Alice', 'Monique', 'Alice', 'Bob', 'Alice', 
                                            'Monique', 'Bob', 'Alice', 'Bob'],
                        'seller_lastname':['Marchand', 'Boul', 'Mul', 'Boul', 'Marchand', 'Boul', 
                                           'Mul', 'Marchand', 'Boul', 'Marchand'],
                        })
records = records.set_index('id')
records = dd.from_pandas(records, npartitions=3)

def mapping(part, pandas_df):
    pandas_df = pandas_df.set_index(['seller_firstname', 'seller_lastname'])    
    return part.merge(pandas_df, how='left', left_on=['seller_firstname', 'seller_lastname'], right_on=['seller_firstname', 'seller_lastname'], right_index=True)

records.map_partitions(lambda part: mapping(part, food_stores)).compute()
id name seller_firstname seller_lastname foodstore_id online_status
1 product1 Bob Marchand 1 1
2 product2 Alice Boul 2 1
3 product3 Monique Mul 3 0
4 product4 Alice Boul 2 1
5 product5 Bob Marchand 1 1
6 product6 Alice Boul 2 1
7 product7 Monique Mul 3 0
8 product8 Bob Marchand 1 1
9 product9 Alice Boul 2 1
10 product10 Bob Marchand 1 1

Environment:

  • Dask version: 2.16.0 (same with latest - 2.19)
  • Python version: 3.7.3
  • Operating System: Ubuntu 18.04.3 LTS
  • Install method (conda, pip, source): image daskdev/dask-notebook 2.16.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions