-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Labels
Description
I want to perform a merge between a Dask dataframe and a Pandas Dataframe and it seems like the pandas dataframe I pass as argument is partitionned in a way that it does not give the result we expect in the end.
I wanted to be able to able to keep the index on the Dask Dataframe, that's why I tried to use a map_partitions instead of a merge on the Dask Dataframe directly.
I think the reason of this behaviour is explained by "DataFrame-like args (both dask and pandas) will be repartitioned to align (if necessary) before applying the function." in the API documentation.
food_stores = pd.DataFrame({'foodstore_id':[1,2,3],
'seller_firstname':['Bob', 'Alice', 'Monique'],
'seller_lastname':['Marchand', 'Boul', 'Mul'],
'online_status':[1,1,0]})
records = pd.DataFrame({'id':[1,2,3,4,5,6,7,8,9,10],
'name':['product1', 'product2', 'product3', 'product4', 'product5',
'product6', 'product7', 'product8', 'product9', 'product10'],
'seller_firstname':['Bob', 'Alice', 'Monique', 'Alice', 'Bob',
'Alice', 'Monique', 'Bob', 'Alice', 'Bob'],
'seller_lastname':['Marchand', 'Boul', 'Mul', 'Boul', 'Marchand',
'Boul', 'Mul', 'Marchand', 'Boul', 'Marchand'],
})
records = records.set_index('id')
records = dd.from_pandas(records, npartitions=3)
def mapping(part, pandas_df):
pandas_df = pandas_df.set_index(['seller_firstname', 'seller_lastname'])
return part.merge(pandas_df, how='left',
left_on=['seller_firstname', 'seller_lastname'],
right_on=['seller_firstname', 'seller_lastname'], right_index=True)
print(records.map_partitions(mapping, food_stores).compute())| id | name | seller_firstname | seller_lastname | foodstore_id | online_status |
|---|---|---|---|---|---|
| 1 | product1 | Bob | Marchand | nan | nan |
| 2 | product2 | Alice | Boul | nan | nan |
| 3 | product3 | Monique | Mul | 3 | 0 |
| 4 | product4 | Alice | Boul | nan | nan |
| 5 | product5 | Bob | Marchand | nan | nan |
| 6 | product6 | Alice | Boul | nan | nan |
| 7 | product7 | Monique | Mul | nan | nan |
| 8 | product8 | Bob | Marchand | nan | nan |
| 9 | product9 | Alice | Boul | nan | nan |
| 10 | product10 | Bob | Marchand | nan | nan |
I thought I would get the result I expect with the following :
food_stores = pd.DataFrame({'foodstore_id':[1,2,3],
'seller_firstname':['Bob', 'Alice', 'Monique'],
'seller_lastname':['Marchand', 'Boul', 'Mul'],
'online_status':[1,1,0]})
records = pd.DataFrame({'id':[1,2,3,4,5,6,7,8,9,10],
'name':['product1', 'product2', 'product3', 'product4', 'product5',
'product6', 'product7', 'product8', 'product9', 'product10'],
'seller_firstname':['Bob', 'Alice', 'Monique', 'Alice', 'Bob', 'Alice',
'Monique', 'Bob', 'Alice', 'Bob'],
'seller_lastname':['Marchand', 'Boul', 'Mul', 'Boul', 'Marchand', 'Boul',
'Mul', 'Marchand', 'Boul', 'Marchand'],
})
records = records.set_index('id')
records = dd.from_pandas(records, npartitions=3)
def mapping(part, pandas_df):
pandas_df = pandas_df.set_index(['seller_firstname', 'seller_lastname'])
return part.merge(pandas_df, how='left', left_on=['seller_firstname', 'seller_lastname'], right_on=['seller_firstname', 'seller_lastname'], right_index=True)
records.map_partitions(lambda part: mapping(part, food_stores)).compute()| id | name | seller_firstname | seller_lastname | foodstore_id | online_status |
|---|---|---|---|---|---|
| 1 | product1 | Bob | Marchand | 1 | 1 |
| 2 | product2 | Alice | Boul | 2 | 1 |
| 3 | product3 | Monique | Mul | 3 | 0 |
| 4 | product4 | Alice | Boul | 2 | 1 |
| 5 | product5 | Bob | Marchand | 1 | 1 |
| 6 | product6 | Alice | Boul | 2 | 1 |
| 7 | product7 | Monique | Mul | 3 | 0 |
| 8 | product8 | Bob | Marchand | 1 | 1 |
| 9 | product9 | Alice | Boul | 2 | 1 |
| 10 | product10 | Bob | Marchand | 1 | 1 |
Environment:
- Dask version: 2.16.0 (same with latest - 2.19)
- Python version: 3.7.3
- Operating System: Ubuntu 18.04.3 LTS
- Install method (conda, pip, source): image daskdev/dask-notebook 2.16.0
Reactions are currently unavailable