Skip to content

Refactor scheduler to use TaskState objects rather than dictionaries#1594

Merged
pitrou merged 68 commits intodask:masterfrom
pitrou:scheduler_state_refactor
Dec 11, 2017
Merged

Refactor scheduler to use TaskState objects rather than dictionaries#1594
pitrou merged 68 commits intodask:masterfrom
pitrou:scheduler_state_refactor

Conversation

@pitrou
Copy link
Copy Markdown
Member

@pitrou pitrou commented Nov 29, 2017

This refactors the scheduler to use TaskState, WorkerState, and ClientState objects rather than a forest of dictionaries. It was originally planned in order to make Dask more amenable to acceleration with compilers like PyPy and Cython, but is also probably a more accessible organization for new developers.

See #854

Copy link
Copy Markdown
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general what is here seems sensible to me. I suspect that it will improve readability for others as well.

Comment thread distributed/scheduler.py Outdated
@@ -239,78 +474,115 @@ def __init__(
# Communication state
self.loop = loop or IOLoop.current()
self.worker_comms = dict()
# XXX rename to client_comms?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment thread distributed/scheduler.py Outdated
self.log_event(['all', address], {'action': 'remove-worker',
'worker': address,
'processing-tasks': self.processing[address]})
'processing-tasks': ws.processing})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that we should make a copy of the dict here (unrelated to this PR though).

Comment thread distributed/scheduler.py Outdated
recommendations[dts.key] = 'released'

if not ts.waiters and not ts.who_wants:
# XXX what about 'fire-and-forget'?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fire-and-forget should just be another client in ts.who_wants, no?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, yes, but fire-and-forget doesn't require the task to remain alive.

Comment thread distributed/scheduler.py
"""
Transform a dict of {task state: value} into a dict of {task key: value}.
"""
return {ts.key: value for ts, value in task_dict.items()}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that these will eventually go away as you work through more of the peripheral modules?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally yes. Some though are used a lot in tests, so that will require a fair bit of manual fixing (unless there's a way to automate the changes).

Comment thread distributed/bokeh/scheduler_html.py Outdated
processing = 0
waiting = 0
waiting_data = 0
for ts in scheduler.task_states.values():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little unfortunate. I'm curious to see in what other situations we lose constant time diagnostics.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR, diagnostics and bokeh are the only places.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 7, 2017

@pitrou in your opinion what remains to be done here?

Some questions:

  1. Are you satisfied with current scheduling state documentation? If so then I may take a pass over it.
  2. Are there legacy mappings that we should add back in for a release or two?

@pitrou
Copy link
Copy Markdown
Member Author

pitrou commented Dec 7, 2017

Are you satisfied with current scheduling state documentation? If so then I may take a pass over it.

Yes, but I'd appreciate your reviewing it.

Are there legacy mappings that we should add back in for a release or two?

Yes, I removed some of them when I had finished suppressing all usage of them but that was probably a mistake.

what remains to be done here?

Other than the above, nothing IMHO.

Comment thread docs/source/scheduling-state.rst Outdated

These are the values that will eventually be sent to a worker when the task
is ready to run.
.. class:: TaskState
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to move this documentation to the class docstring?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to. Hopefully that will render correctly.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 7, 2017

Yes, I removed some of them when I had finished suppressing all usage of them but that was probably a mistake.

If you have time to add these back in that would be helpful.

I'm proposing a micro-release by the end of the day. After that I'm comfortable merging this. I might take another pass through tests to see if there are other things to clean up.

@pitrou
Copy link
Copy Markdown
Member Author

pitrou commented Dec 7, 2017

If you have time to add these back in that would be helpful.

I can do that on Monday.

@mrocklin mrocklin changed the title Scheduler state refactor Refactor scheduler to use TaskState objects rather than dictionaries Dec 7, 2017
@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 8, 2017

In the future if memory becomes an issue then we might consider using tuples or lists instead of sets for some of the TaskState attributes. In the common case the number of dependencies/dependents can be quite small and our most common operation is iterating over the collection.

@pitrou
Copy link
Copy Markdown
Member Author

pitrou commented Dec 8, 2017

That's a possibility. Though if you look at the measurements above I wonder if other things come into play - 10 kB per task doesn't really match the basic size of a TaskState... and the sets were already there before (they were dict values rather than attribute values).

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 8, 2017

Do you have any suggestions on how to determine the origin of memory costs?

@pitrou
Copy link
Copy Markdown
Member Author

pitrou commented Dec 8, 2017

First I would disable or minimize any sort of persistent logging (transition_log etc.). Then perhaps tracemalloc can help diagnose what is going on.

@pitrou
Copy link
Copy Markdown
Member Author

pitrou commented Dec 8, 2017

Btw, disabling stealing seems to make runtimes much more predictable and memory consumption lower as well.

@pitrou
Copy link
Copy Markdown
Member Author

pitrou commented Dec 8, 2017

If I monitor memory consumption just before and just after update_graph(), I get the following for 65535 tasks:

  • before: 194 MB
  • after: 342 MB (and that number doesn't really grow afterwards)

so each task actually costs 2.3 kB in the scheduler itself (notwithstanding stealing and other stuff).

More generally, I think any potentially costly operation could be wrapped in CPU and memory measurements, so that we have the option of logging what happens.

Update: with N=32768, the benchmark creates 65536 tasks not 32767 :-)

@mrocklin
Copy link
Copy Markdown
Member

OK to merge from me.

@pitrou pitrou merged commit 8404684 into dask:master Dec 11, 2017
@pitrou pitrou deleted the scheduler_state_refactor branch December 11, 2017 15:14
azjps added a commit to azjps/dask-drmaa that referenced this pull request Mar 15, 2018
In dask/distributed#1594, the scheduler's
internal maps of task objects were changed from using their keys to
using TaskState objects. However, dask_drmaa.Adaptive was still
querying for keys, causing new workers to never find the memory
resource constraints for pending tasks and consequently tasks
to never find workers with sufficient resources. This was causing
the unit test test_adaptive_memory to wait indefinitely.

Try to fix this to support both distributed pre- and post- 1.21.0,
and un-skip test_adaptive_memory.
jakirkham pushed a commit to dask/dask-drmaa that referenced this pull request Mar 24, 2018
* Compatibility fixes with distributed 1.21.3

- Support passing kwargs to distributed.Adaptive.__init__, which now
  takes keyword arguments like minimum and maximum [number of workers].
- Add an optional workers argument to _retire_workers() to match
  dask/distributed#1797 -- currently Adaptive
  raises a TypeError.

* Adaptive memory resource compatibility fix for distributed==1.21.0

In dask/distributed#1594, the scheduler's
internal maps of task objects were changed from using their keys to
using TaskState objects. However, dask_drmaa.Adaptive was still
querying for keys, causing new workers to never find the memory
resource constraints for pending tasks and consequently tasks
to never find workers with sufficient resources. This was causing
the unit test test_adaptive_memory to wait indefinitely.

Try to fix this to support both distributed pre- and post- 1.21.0,
and un-skip test_adaptive_memory.

* basestring -> six.string_types

(Was testing on python2, switching to python2/3-compatible)

* Add six to requirements.txt

Also a couple of miscellaneous comments, including
Windows-specific comment for running docker-based tests.

* Undo windows comments (moving to a separate PR)

* Drop support for distributed < 1.21.0

Update requirements.txt to require distributed >= 1.21.0,
since there are some internal changes in the way tasks
are stored. Also drop the corresponding backwards-
compatibility fixes. Feel free to revert if
distributed 1.20.x support if desired.
Comment thread distributed/scheduler.py
for ts in touched_tasks:
for dts in ts.dependencies:
if dts.exception_blame:
ts.exception_blame = dts.exception_blame
recommendations[key] = 'erred'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing key here should now be ts.key. Fixing in PR ( #1900 ).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants