Skip to content

Specify resources for dask builtin functions #2127

@kkraus14

Description

@kkraus14

I'm trying to specify resources for builtin dask functions such a dd.read_csv, with an end goal of running certain functions on "CPU workers" and other functions on "GPU workers". Here's a minimal example of trying to force dd.read_csv to run only on my "CPU worker":

cluster = LocalCluster(processes=False)
cpu_worker = cluster.workers[0]
cpu_worker.name = 'cpu'
cpu_worker.set_resources(CPU=80)
client = Client(cluster)
pdf = pd.DataFrame({"a": [1,2,3], "b": [4,5,6]})
test_df = dd.from_pandas(pdf, npartitions=2)
test_df.compute(resources = {tuple(test_df.__dask_keys__()): {'CPU': 1}})

This returns the following:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'
distributed.scheduler - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/scheduler.py", line 1929, in handle_client
    msgs = yield comm.read()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'
distributed.utils - ERROR - unhashable type: 'list'
Traceback (most recent call last):
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/utils.py", line 622, in log_errors
    yield
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/client.py", line 921, in _handle_report
    six.reraise(*clean_exception(**msg))
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/tcp.py", line 203, in read
    msg = yield from_frames(frames, deserialize=self.deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/asyncio/futures.py", line 294, in result
    raise self._exception
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 75, in from_frames
    res = _from_frames()
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/comm/utils.py", line 61, in _from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 96, in loads
    msg = loads_msgpack(small_header, small_payload)
  File "/home/appuser/Miniconda3/envs/gpu_accel_ml/lib/python3.5/site-packages/distributed/protocol/core.py", line 173, in loads_msgpack
    return msgpack.loads(payload, encoding='utf8')
  File "msgpack/_unpacker.pyx", line 200, in msgpack._unpacker.unpackb
TypeError: unhashable type: 'list'

It would be great if you could specify resources as you create tasks as opposed to when computing them, similar to how you can with client.submit I.E.

test_df = dd.from_pandas(pdf, npartitions=2, resources={'CPU': 1})

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions