Refactor scheduler to use TaskState objects rather than dictionaries#1594
Refactor scheduler to use TaskState objects rather than dictionaries#1594pitrou merged 68 commits intodask:masterfrom
Conversation
…eduler_state_refactor
…eduler_state_refactor
mrocklin
left a comment
There was a problem hiding this comment.
In general what is here seems sensible to me. I suspect that it will improve readability for others as well.
| @@ -239,78 +474,115 @@ def __init__( | |||
| # Communication state | |||
| self.loop = loop or IOLoop.current() | |||
| self.worker_comms = dict() | |||
| # XXX rename to client_comms? | |||
| self.log_event(['all', address], {'action': 'remove-worker', | ||
| 'worker': address, | ||
| 'processing-tasks': self.processing[address]}) | ||
| 'processing-tasks': ws.processing}) |
There was a problem hiding this comment.
It occurs to me that we should make a copy of the dict here (unrelated to this PR though).
| recommendations[dts.key] = 'released' | ||
|
|
||
| if not ts.waiters and not ts.who_wants: | ||
| # XXX what about 'fire-and-forget'? |
There was a problem hiding this comment.
Fire-and-forget should just be another client in ts.who_wants, no?
There was a problem hiding this comment.
Well, yes, but fire-and-forget doesn't require the task to remain alive.
| """ | ||
| Transform a dict of {task state: value} into a dict of {task key: value}. | ||
| """ | ||
| return {ts.key: value for ts, value in task_dict.items()} |
There was a problem hiding this comment.
I suppose that these will eventually go away as you work through more of the peripheral modules?
There was a problem hiding this comment.
Ideally yes. Some though are used a lot in tests, so that will require a fair bit of manual fixing (unless there's a way to automate the changes).
| processing = 0 | ||
| waiting = 0 | ||
| waiting_data = 0 | ||
| for ts in scheduler.task_states.values(): |
There was a problem hiding this comment.
This is a little unfortunate. I'm curious to see in what other situations we lose constant time diagnostics.
There was a problem hiding this comment.
AFAIR, diagnostics and bokeh are the only places.
|
@pitrou in your opinion what remains to be done here? Some questions:
|
Yes, but I'd appreciate your reviewing it.
Yes, I removed some of them when I had finished suppressing all usage of them but that was probably a mistake.
Other than the above, nothing IMHO. |
|
|
||
| These are the values that will eventually be sent to a worker when the task | ||
| is ready to run. | ||
| .. class:: TaskState |
There was a problem hiding this comment.
Would it be possible to move this documentation to the class docstring?
There was a problem hiding this comment.
I'll try to. Hopefully that will render correctly.
If you have time to add these back in that would be helpful. I'm proposing a micro-release by the end of the day. After that I'm comfortable merging this. I might take another pass through tests to see if there are other things to clean up. |
I can do that on Monday. |
|
In the future if memory becomes an issue then we might consider using tuples or lists instead of sets for some of the TaskState attributes. In the common case the number of dependencies/dependents can be quite small and our most common operation is iterating over the collection. |
|
That's a possibility. Though if you look at the measurements above I wonder if other things come into play - 10 kB per task doesn't really match the basic size of a TaskState... and the sets were already there before (they were dict values rather than attribute values). |
|
Do you have any suggestions on how to determine the origin of memory costs? |
|
First I would disable or minimize any sort of persistent logging ( |
|
Btw, disabling stealing seems to make runtimes much more predictable and memory consumption lower as well. |
|
If I monitor memory consumption just before and just after update_graph(), I get the following for 65535 tasks:
so each task actually costs 2.3 kB in the scheduler itself (notwithstanding stealing and other stuff). More generally, I think any potentially costly operation could be wrapped in CPU and memory measurements, so that we have the option of logging what happens. Update: with N=32768, the benchmark creates 65536 tasks not 32767 :-) |
|
OK to merge from me. |
In dask/distributed#1594, the scheduler's internal maps of task objects were changed from using their keys to using TaskState objects. However, dask_drmaa.Adaptive was still querying for keys, causing new workers to never find the memory resource constraints for pending tasks and consequently tasks to never find workers with sufficient resources. This was causing the unit test test_adaptive_memory to wait indefinitely. Try to fix this to support both distributed pre- and post- 1.21.0, and un-skip test_adaptive_memory.
* Compatibility fixes with distributed 1.21.3 - Support passing kwargs to distributed.Adaptive.__init__, which now takes keyword arguments like minimum and maximum [number of workers]. - Add an optional workers argument to _retire_workers() to match dask/distributed#1797 -- currently Adaptive raises a TypeError. * Adaptive memory resource compatibility fix for distributed==1.21.0 In dask/distributed#1594, the scheduler's internal maps of task objects were changed from using their keys to using TaskState objects. However, dask_drmaa.Adaptive was still querying for keys, causing new workers to never find the memory resource constraints for pending tasks and consequently tasks to never find workers with sufficient resources. This was causing the unit test test_adaptive_memory to wait indefinitely. Try to fix this to support both distributed pre- and post- 1.21.0, and un-skip test_adaptive_memory. * basestring -> six.string_types (Was testing on python2, switching to python2/3-compatible) * Add six to requirements.txt Also a couple of miscellaneous comments, including Windows-specific comment for running docker-based tests. * Undo windows comments (moving to a separate PR) * Drop support for distributed < 1.21.0 Update requirements.txt to require distributed >= 1.21.0, since there are some internal changes in the way tasks are stored. Also drop the corresponding backwards- compatibility fixes. Feel free to revert if distributed 1.20.x support if desired.
| for ts in touched_tasks: | ||
| for dts in ts.dependencies: | ||
| if dts.exception_blame: | ||
| ts.exception_blame = dts.exception_blame | ||
| recommendations[key] = 'erred' |
There was a problem hiding this comment.
Guessing key here should now be ts.key. Fixing in PR ( #1900 ).
This refactors the scheduler to use TaskState, WorkerState, and ClientState objects rather than a forest of dictionaries. It was originally planned in order to make Dask more amenable to acceleration with compilers like PyPy and Cython, but is also probably a more accessible organization for new developers.
See #854