Skip to content

Commit 8404684

Browse files
authored
Refactor scheduler to use TaskState objects rather than dictionaries (#1594)
1 parent 2d90193 commit 8404684

33 files changed

Lines changed: 2628 additions & 2479 deletions

distributed/bokeh/scheduler.py

Lines changed: 23 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -87,39 +87,6 @@ def update(source, data):
8787
source.data.update(d)
8888

8989

90-
class StateTable(DashboardComponent):
91-
""" Currently running tasks """
92-
93-
def __init__(self, scheduler):
94-
self.scheduler = scheduler
95-
96-
names = ['Tasks', 'Stored', 'Processing', 'Waiting', 'No Worker',
97-
'Erred', 'Released']
98-
self.source = ColumnDataSource({name: [] for name in names})
99-
100-
columns = {name: TableColumn(field=name, title=name)
101-
for name in names}
102-
103-
table = DataTable(
104-
source=self.source, columns=[columns[n] for n in names],
105-
height=70,
106-
)
107-
self.root = table
108-
109-
def update(self):
110-
with log_errors():
111-
s = self.scheduler
112-
d = {'Tasks': [len(s.tasks)],
113-
'Stored': [len(s.who_has)],
114-
'Processing': ['%d / %d' % (len(s.rprocessing), s.total_ncores)],
115-
'Waiting': [len(s.waiting)],
116-
'No Worker': [len(s.unrunnable)],
117-
'Erred': [len(s.exceptions)],
118-
'Released': [len(s.released)]}
119-
120-
update(self.source, d)
121-
122-
12390
class Occupancy(DashboardComponent):
12491
""" Occupancy (in time) per worker """
12592

@@ -157,24 +124,23 @@ def __init__(self, scheduler, **kwargs):
157124

158125
def update(self):
159126
with log_errors():
160-
o = self.scheduler.occupancy
161-
workers = list(self.scheduler.workers)
127+
workers = list(self.scheduler.workers.values())
162128

163129
bokeh_addresses = []
164-
for worker in workers:
165-
addr = self.scheduler.get_worker_service_addr(worker, 'bokeh')
130+
for ws in workers:
131+
addr = self.scheduler.get_worker_service_addr(ws.worker_key, 'bokeh')
166132
bokeh_addresses.append('%s:%d' % addr if addr is not None else '')
167133

168134
y = list(range(len(workers)))
169-
occupancy = [o[w] for w in workers]
135+
occupancy = [ws.occupancy for ws in workers]
170136
ms = [occ * 1000 for occ in occupancy]
171137
x = [occ / 500 for occ in occupancy]
172138
total = sum(occupancy)
173139
color = []
174-
for w in workers:
175-
if w in self.scheduler.idle:
140+
for ws in workers:
141+
if ws in self.scheduler.idle:
176142
color.append('red')
177-
elif w in self.scheduler.saturated:
143+
elif ws in self.scheduler.saturated:
178144
color.append('green')
179145
else:
180146
color.append('blue')
@@ -188,7 +154,7 @@ def update(self):
188154

189155
if occupancy:
190156
result = {'occupancy': occupancy,
191-
'worker': workers,
157+
'worker': [ws.worker_key for ws in workers],
192158
'ms': ms,
193159
'color': color,
194160
'bokeh_address': bokeh_addresses,
@@ -223,7 +189,7 @@ def __init__(self, scheduler, **kwargs):
223189
color='blue')
224190

225191
def update(self):
226-
L = list(map(len, self.scheduler.processing.values()))
192+
L = [len(ws.processing) for ws in self.scheduler.workers.values()]
227193
counts, x = np.histogram(L, bins=40)
228194
self.source.data.update({'left': x[:-1],
229195
'right': x[1:],
@@ -258,7 +224,7 @@ def __init__(self, scheduler, **kwargs):
258224
color='blue')
259225

260226
def update(self):
261-
nbytes = np.asarray(list(self.scheduler.worker_bytes.values()))
227+
nbytes = np.asarray([ws.nbytes for ws in self.scheduler.workers.values()])
262228
counts, x = np.histogram(nbytes, bins=40)
263229
d = {'left': x[:-1], 'right': x[1:], 'top': counts}
264230
self.source.data.update(d)
@@ -337,32 +303,31 @@ def __init__(self, scheduler, width=600, **kwargs):
337303

338304
def update(self):
339305
with log_errors():
340-
processing = valmap(len, self.scheduler.processing)
341-
workers = list(self.scheduler.workers)
306+
workers = list(self.scheduler.workers.values())
342307

343308
bokeh_addresses = []
344-
for worker in workers:
345-
addr = self.scheduler.get_worker_service_addr(worker, 'bokeh')
309+
for ws in workers:
310+
addr = self.scheduler.get_worker_service_addr(ws.worker_key, 'bokeh')
346311
bokeh_addresses.append('%s:%d' % addr if addr is not None else '')
347312

348313
y = list(range(len(workers)))
349-
nprocessing = [processing[w] for w in workers]
314+
nprocessing = [len(ws.processing) for ws in workers]
350315
processing_color = []
351-
for w in workers:
352-
if w in self.scheduler.idle:
316+
for ws in workers:
317+
if ws in self.scheduler.idle:
353318
processing_color.append('red')
354-
elif w in self.scheduler.saturated:
319+
elif ws in self.scheduler.saturated:
355320
processing_color.append('green')
356321
else:
357322
processing_color.append('blue')
358323

359-
nbytes = [self.scheduler.worker_bytes[w] for w in workers]
324+
nbytes = [ws.nbytes for ws in workers]
360325
nbytes_text = [format_bytes(nb) for nb in nbytes]
361326
nbytes_color = []
362327
max_limit = 0
363-
for w, nb in zip(workers, nbytes):
328+
for ws, nb in zip(workers, nbytes):
364329
try:
365-
limit = self.scheduler.worker_info[w]['memory_limit']
330+
limit = self.scheduler.worker_info[ws.worker_key]['memory_limit']
366331
except KeyError:
367332
limit = 16e9
368333
if limit > max_limit:
@@ -386,7 +351,7 @@ def update(self):
386351
'nbytes-color': nbytes_color,
387352
'nbytes_text': nbytes_text,
388353
'bokeh_address': bokeh_addresses,
389-
'worker': workers,
354+
'worker': [ws.worker_key for ws in workers],
390355
'y': y}
391356

392357
self.nbytes_figure.title.text = 'Bytes stored: ' + format_bytes(sum(nbytes))
@@ -890,33 +855,28 @@ def update(self):
890855

891856
def systemmonitor_doc(scheduler, extra, doc):
892857
with log_errors():
893-
table = StateTable(scheduler)
894858
sysmon = SystemMonitor(scheduler, sizing_mode='scale_width')
895859
doc.title = "Dask Scheduler Internal Monitor"
896-
doc.add_periodic_callback(table.update, 500)
897860
doc.add_periodic_callback(sysmon.update, 500)
898861

899-
doc.add_root(column(table.root, sysmon.root,
900-
sizing_mode='scale_width'))
862+
doc.add_root(column(sysmon.root, sizing_mode='scale_width'))
901863
doc.template = template
902864
doc.template_variables['active_page'] = 'system'
903865
doc.template_variables.update(extra)
904866

905867

906868
def stealing_doc(scheduler, extra, doc):
907869
with log_errors():
908-
table = StateTable(scheduler)
909870
occupancy = Occupancy(scheduler, height=200, sizing_mode='scale_width')
910871
stealing_ts = StealingTimeSeries(scheduler, sizing_mode='scale_width')
911872
stealing_events = StealingEvents(scheduler, sizing_mode='scale_width')
912873
stealing_events.root.x_range = stealing_ts.root.x_range
913874
doc.title = "Dask Workers Monitor"
914-
doc.add_periodic_callback(table.update, 500)
915875
doc.add_periodic_callback(occupancy.update, 500)
916876
doc.add_periodic_callback(stealing_ts.update, 500)
917877
doc.add_periodic_callback(stealing_events.update, 500)
918878

919-
doc.add_root(column(table.root, occupancy.root, stealing_ts.root,
879+
doc.add_root(column(occupancy.root, stealing_ts.root,
920880
stealing_events.root,
921881
sizing_mode='scale_width'))
922882

distributed/bokeh/scheduler_html.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,23 +93,47 @@ def get(self, key):
9393

9494
class CountsJSON(RequestHandler):
9595
def get(self):
96+
scheduler = self.server
97+
erred = 0
98+
nbytes = 0
99+
ncores = 0
100+
memory = 0
101+
processing = 0
102+
released = 0
103+
waiting = 0
104+
waiting_data = 0
105+
106+
for ts in scheduler.tasks.values():
107+
if ts.exception_blame is not None:
108+
erred += 1
109+
elif ts.state == 'released':
110+
released += 1
111+
if ts.waiting_on:
112+
waiting += 1
113+
if ts.waiters:
114+
waiting_data += 1
115+
for ws in scheduler.workers.values():
116+
ncores += ws.ncores
117+
memory += len(ws.has_what)
118+
nbytes += ws.nbytes
119+
processing += len(ws.processing)
120+
96121
response = {
97-
'bytes': sum(self.server.nbytes.values()),
98-
'clients': len(self.server.wants_what),
99-
'cores': sum(self.server.ncores.values()),
100-
'erred': len(self.server.exceptions_blame),
101-
'hosts': len(self.server.host_info),
102-
'idle': len(self.server.idle),
103-
'memory': len(self.server.who_has),
104-
'processing': len(self.server.rprocessing),
105-
'ready': len(self.server.ready),
106-
'released': len(self.server.released),
107-
'saturated': len(self.server.saturated),
108-
'tasks': len(self.server.tasks),
109-
'unrunnable': len(self.server.unrunnable),
110-
'waiting': len(self.server.waiting),
111-
'waiting_data': len(self.server.waiting_data),
112-
'workers': len(self.server.workers),
122+
'bytes': nbytes,
123+
'clients': len(scheduler.clients),
124+
'cores': ncores,
125+
'erred': erred,
126+
'hosts': len(scheduler.host_info),
127+
'idle': len(scheduler.idle),
128+
'memory': memory,
129+
'processing': processing,
130+
'released': released,
131+
'saturated': len(scheduler.saturated),
132+
'tasks': len(scheduler.tasks),
133+
'unrunnable': len(scheduler.unrunnable),
134+
'waiting': waiting,
135+
'waiting_data': waiting_data,
136+
'workers': len(scheduler.workers),
113137
}
114138
self.write(response)
115139

distributed/bokeh/task_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self, scheduler):
2020

2121
def transition(self, key, start, finish, *args, **kwargs):
2222
if start == 'processing':
23-
if key not in self.scheduler.task_state:
23+
if key not in self.scheduler.tasks:
2424
return
2525
kwargs['key'] = key
2626
if finish == 'memory' or finish == 'erred':

distributed/bokeh/templates/task.html

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,35 @@
11
{% extends main.html %}
22
{% block content %}
3-
<h1 class="title"> Task: {{ Task }} </h1>
3+
4+
{% set ts = tasks[Task] %}
5+
<h1 class="title"> Task: {{ ts.key }} </h1>
46

57
<table class="table">
68
<tr>
79
<th> status </th>
8-
<td> {{ task_state[Task] }} </td>
10+
<td> {{ ts.state }} </td>
911
</tr>
10-
{% if Task in rprocessing %}
12+
{% if ts.processing_on %}
1113
<tr>
1214
<th> processing on </th>
13-
<td> {{ rprocessing.get(Task) }} </td>
15+
<td> {{ ts.processing_on.worker_key }} </td>
1416
</tr>
1517
<tr>
1618
<th> call stack </th>
1719
<td><a class="button is-primary" href="/info/call-stack/{{ url_escape(Task) }}.html">Call Stack</a></td>
1820
</tr>
1921
{% end %}
20-
{% if Task in nbytes %}
22+
{% if ts.nbytes %}
2123
<tr>
2224
<th> bytes in memory </th>
23-
<td> {{ nbytes.get(Task) }} </td>
25+
<td> {{ ts.nbytes }} </td>
2426
</tr>
2527
{% end %}
26-
{% if task_state[Task] == 'waiting' %}
27-
{% for dep in waiting[dep] %}
28+
{% if ts.state == 'waiting' %}
29+
{% for dts in ts.waiting_on %}
2830
<tr>
2931
<td> waiting on </td>
30-
<td><a href="/info/task/{{ url_escape(dep) }}.html">{{dep}}</a> </td>
32+
<td><a href="/info/task/{{ url_escape(dts.key) }}.html">{{dts.key}}</a> </td>
3133
</tr>
3234
{% end %}
3335
{% end %}
@@ -41,10 +43,10 @@ <h3 class="title is-5"> Dependencies </h3>
4143
<th> Key </th>
4244
<th> State </th>
4345
</thead>
44-
{% for dep in dependencies.get(Task, ()) %}
46+
{% for dts in ts.dependencies %}
4547
<tr>
46-
<td> <a href="/info/task/{{ url_escape(dep) }}.html">{{dep}}</a> </td>
47-
<td> {{ task_state[dep] }} </td>
48+
<td> <a href="/info/task/{{ url_escape(dts.key) }}.html">{{dts.key}}</a> </td>
49+
<td> {{ dts.state }} </td>
4850
</tr>
4951
{% end %}
5052
</table>
@@ -57,32 +59,32 @@ <h3 class="title is-5"> Dependents </h3>
5759
<th> Key </th>
5860
<th> State </th>
5961
</thead>
60-
{% for dep in dependents.get(Task, ()) %}
62+
{% for dts in ts.dependents %}
6163
<tr>
62-
<td> <a href="/info/task/{{ url_escape(dep) }}.html">{{dep}}</a> </td>
63-
<td> {{ task_state[dep] }} </td>
64+
<td> <a href="/info/task/{{ url_escape(dts.key) }}.html">{{dts.key}}</a> </td>
65+
<td> {{ dts.state }} </td>
6466
</tr>
6567
{% end %}
6668
</table>
6769
</div><!-- #dependents -->
6870
</div><!-- #dependencies-dependents -->
6971

7072
<div id="workers-clients" class="columns">
71-
{% if task_state[Task] == 'memory' %}
73+
{% if ts.state == 'memory' %}
7274
<div id="workers" class="column">
7375
<h3 class="title is-5"> Workers with data </h3>
74-
{% set workers = who_has[Task] %}
76+
{% set worker_list = ts.who_has %}
7577
{% include "worker-table.html" %}
7678
</div><!-- #workers -->
7779
{% end %}
7880

79-
{% if Task in who_wants %}
81+
{% if ts.who_wants %}
8082
<div id="clients" class="column">
8183
<h3 class="title is-5"> Clients with future </h3>
8284
<div class="content">
8385
<ul>
84-
{% for client in who_wants.get(Task, ()) %}
85-
<li> <a href="/info/client/{{ url_escape(client) }}.html">{{client}}</a></li>
86+
{% for cs in ts.who_wants %}
87+
<li> <a href="/info/client/{{ url_escape(cs.client_key) }}.html">{{cs.client_key}}</a></li>
8688
{% end %}
8789
</ul>
8890
</div>

distributed/bokeh/templates/worker-table.html

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,22 @@
1010
<th> Services</th>
1111
<th> Logs </th>
1212
</tr>
13-
{% for worker in workers %}
13+
{% for ws in worker_list %}
14+
{% set wi = worker_info[ws.worker_key] %}
1415
<tr>
15-
<td><a href="/info/worker/{{ url_escape(worker) }}.html">{{worker}}</a></td>
16-
<td> {{ ncores[worker] }} </td>
17-
<td> {{ format_bytes(worker_info[worker]['memory_limit']) }} </td>
18-
<td> <progress class="progress" value="{{ worker_info[worker].get('memory-rss') }}" max="{{ worker_info[worker]['memory_limit'] }}"></progress> </td>
19-
<td> {{ format_time(occupancy[worker]) }} </td>
20-
<td> {{ len(processing[worker]) }} </td>
21-
<td> {{ len(has_what[worker]) }} </td>
22-
{% if 'bokeh' in worker_info[worker]['services'] %}
23-
<td> <a href="http://{{ worker_info[worker]['host'] }}:{{ worker_info[worker]['services']['bokeh'] }}">bokeh</a> </td>
16+
<td><a href="/info/worker/{{ url_escape(ws.worker_key) }}.html">{{ws.worker_key}}</a></td>
17+
<td> {{ ws.ncores }} </td>
18+
<td> {{ format_bytes(wi['memory_limit']) }} </td>
19+
<td> <progress class="progress" value="{{ wi.get('memory-rss') }}" max="{{ wi['memory_limit'] }}"></progress> </td>
20+
<td> {{ format_time(ws.occupancy) }} </td>
21+
<td> {{ len(ws.processing) }} </td>
22+
<td> {{ len(ws.has_what) }} </td>
23+
{% if 'bokeh' in wi['services'] %}
24+
<td> <a href="http://{{ wi['host'] }}:{{ wi['services']['bokeh'] }}">bokeh</a> </td>
2425
{% else %}
2526
<td> </td>
2627
{% end %}
27-
<td> <a href="/info/logs/{{ url_escape(worker) }}.html">logs</a></td>
28+
<td> <a href="/info/logs/{{ url_escape(ws.worker_key) }}.html">logs</a></td>
2829
</tr>
2930
{% end %}
3031
</table>

0 commit comments

Comments
 (0)