Skip to content

Cannot call compute() on a dask dataframe when inside a task #3791

@pborgen

Description

@pborgen

Below is the code to reproduce. I am running this with the latest version. Specified below.

I have a dedicated vm for the scheduler. I have other vm's for the workers. I have seen this pass before when I only have one worker running. Seems to always fail when I have 2 workers.

Related Issue:
#2336

Version:
dask==2.16.0
distributed==2.16.0

How I start the scheduler:
dask-scheduler

How I start my workers:
dask-worker tcp://x.x.x.x:8786 --nanny-port 8001 --worker-port 8002 --no-dashboard

from distributed import Client, get_client
import dask

if __name__ == '__main__':

    def getCompute(id):
        ddf = get_client().get_dataset("foo")

        #filter
        return ddf[ddf.id == id].compute()

    daskClient = Client('x.x.x.x:8786')
    daskClient.restart()

    ddf = dask.datasets.timeseries()
    ddf = ddf.persist()
    daskClient.publish_dataset(foo=ddf)

    #get id's
    id_list = ddf['id'].unique().compute()

    futures = []
    try:
        for id in id_list:
            futures.append(daskClient.submit(getCompute, id))
    finally:
        daskClient.unpublish_dataset("foo")

    results = daskClient.gather(futures)

Traceback (most recent call last):
  File "D:/dev/code/netsense.support/facts_ingester/facts_ingester/test.py", line 29, in <module>
    results = daskClient.gather(futures)
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\client.py", line 1961, in gather
    return self.sync(
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\client.py", line 815, in sync
    return sync(
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\utils.py", line 331, in f
    result[0] = yield future
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\tornado\gen.py", line 735, in run
    value = future.result()
  File "D:\dev\code\netsense.support\facts_ingester\facts_ingester\venv\lib\site-packages\distributed\client.py", line 1826, in _gather
    raise exception.with_traceback(traceback)
  File "D:/dev/code/netsense.support/facts_ingester/facts_ingester/test.py", line 10, in getCompute
    return ddf[ddf.id == id].compute()
  File "c:\python38\lib\site-packages\dask\base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "c:\python38\lib\site-packages\dask\base.py", line 444, in compute
    results = schedule(dsk, keys, **kwargs)
  File "c:\python38\lib\site-packages\distributed\client.py", line 2646, in get
    used as an optimization to avoid recomputation.
  File "c:\python38\lib\site-packages\distributed\client.py", line 2543, in _graph_to_futures
    
ValueError: Inputs contain futures that were created by another client.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions