@@ -87,39 +87,6 @@ def update(source, data):
8787 source .data .update (d )
8888
8989
90- class StateTable (DashboardComponent ):
91- """ Currently running tasks """
92-
93- def __init__ (self , scheduler ):
94- self .scheduler = scheduler
95-
96- names = ['Tasks' , 'Stored' , 'Processing' , 'Waiting' , 'No Worker' ,
97- 'Erred' , 'Released' ]
98- self .source = ColumnDataSource ({name : [] for name in names })
99-
100- columns = {name : TableColumn (field = name , title = name )
101- for name in names }
102-
103- table = DataTable (
104- source = self .source , columns = [columns [n ] for n in names ],
105- height = 70 ,
106- )
107- self .root = table
108-
109- def update (self ):
110- with log_errors ():
111- s = self .scheduler
112- d = {'Tasks' : [len (s .tasks )],
113- 'Stored' : [len (s .who_has )],
114- 'Processing' : ['%d / %d' % (len (s .rprocessing ), s .total_ncores )],
115- 'Waiting' : [len (s .waiting )],
116- 'No Worker' : [len (s .unrunnable )],
117- 'Erred' : [len (s .exceptions )],
118- 'Released' : [len (s .released )]}
119-
120- update (self .source , d )
121-
122-
12390class Occupancy (DashboardComponent ):
12491 """ Occupancy (in time) per worker """
12592
@@ -157,24 +124,23 @@ def __init__(self, scheduler, **kwargs):
157124
158125 def update (self ):
159126 with log_errors ():
160- o = self .scheduler .occupancy
161- workers = list (self .scheduler .workers )
127+ workers = list (self .scheduler .workers .values ())
162128
163129 bokeh_addresses = []
164- for worker in workers :
165- addr = self .scheduler .get_worker_service_addr (worker , 'bokeh' )
130+ for ws in workers :
131+ addr = self .scheduler .get_worker_service_addr (ws . worker_key , 'bokeh' )
166132 bokeh_addresses .append ('%s:%d' % addr if addr is not None else '' )
167133
168134 y = list (range (len (workers )))
169- occupancy = [o [ w ] for w in workers ]
135+ occupancy = [ws . occupancy for ws in workers ]
170136 ms = [occ * 1000 for occ in occupancy ]
171137 x = [occ / 500 for occ in occupancy ]
172138 total = sum (occupancy )
173139 color = []
174- for w in workers :
175- if w in self .scheduler .idle :
140+ for ws in workers :
141+ if ws in self .scheduler .idle :
176142 color .append ('red' )
177- elif w in self .scheduler .saturated :
143+ elif ws in self .scheduler .saturated :
178144 color .append ('green' )
179145 else :
180146 color .append ('blue' )
@@ -188,7 +154,7 @@ def update(self):
188154
189155 if occupancy :
190156 result = {'occupancy' : occupancy ,
191- 'worker' : workers ,
157+ 'worker' : [ ws . worker_key for ws in workers ] ,
192158 'ms' : ms ,
193159 'color' : color ,
194160 'bokeh_address' : bokeh_addresses ,
@@ -223,7 +189,7 @@ def __init__(self, scheduler, **kwargs):
223189 color = 'blue' )
224190
225191 def update (self ):
226- L = list ( map ( len , self .scheduler .processing .values ()))
192+ L = [ len ( ws . processing ) for ws in self .scheduler .workers .values ()]
227193 counts , x = np .histogram (L , bins = 40 )
228194 self .source .data .update ({'left' : x [:- 1 ],
229195 'right' : x [1 :],
@@ -258,7 +224,7 @@ def __init__(self, scheduler, **kwargs):
258224 color = 'blue' )
259225
260226 def update (self ):
261- nbytes = np .asarray (list ( self .scheduler .worker_bytes .values ()) )
227+ nbytes = np .asarray ([ ws . nbytes for ws in self .scheduler .workers .values ()] )
262228 counts , x = np .histogram (nbytes , bins = 40 )
263229 d = {'left' : x [:- 1 ], 'right' : x [1 :], 'top' : counts }
264230 self .source .data .update (d )
@@ -337,32 +303,31 @@ def __init__(self, scheduler, width=600, **kwargs):
337303
338304 def update (self ):
339305 with log_errors ():
340- processing = valmap (len , self .scheduler .processing )
341- workers = list (self .scheduler .workers )
306+ workers = list (self .scheduler .workers .values ())
342307
343308 bokeh_addresses = []
344- for worker in workers :
345- addr = self .scheduler .get_worker_service_addr (worker , 'bokeh' )
309+ for ws in workers :
310+ addr = self .scheduler .get_worker_service_addr (ws . worker_key , 'bokeh' )
346311 bokeh_addresses .append ('%s:%d' % addr if addr is not None else '' )
347312
348313 y = list (range (len (workers )))
349- nprocessing = [processing [ w ] for w in workers ]
314+ nprocessing = [len ( ws . processing ) for ws in workers ]
350315 processing_color = []
351- for w in workers :
352- if w in self .scheduler .idle :
316+ for ws in workers :
317+ if ws in self .scheduler .idle :
353318 processing_color .append ('red' )
354- elif w in self .scheduler .saturated :
319+ elif ws in self .scheduler .saturated :
355320 processing_color .append ('green' )
356321 else :
357322 processing_color .append ('blue' )
358323
359- nbytes = [self . scheduler . worker_bytes [ w ] for w in workers ]
324+ nbytes = [ws . nbytes for ws in workers ]
360325 nbytes_text = [format_bytes (nb ) for nb in nbytes ]
361326 nbytes_color = []
362327 max_limit = 0
363- for w , nb in zip (workers , nbytes ):
328+ for ws , nb in zip (workers , nbytes ):
364329 try :
365- limit = self .scheduler .worker_info [w ]['memory_limit' ]
330+ limit = self .scheduler .worker_info [ws . worker_key ]['memory_limit' ]
366331 except KeyError :
367332 limit = 16e9
368333 if limit > max_limit :
@@ -386,7 +351,7 @@ def update(self):
386351 'nbytes-color' : nbytes_color ,
387352 'nbytes_text' : nbytes_text ,
388353 'bokeh_address' : bokeh_addresses ,
389- 'worker' : workers ,
354+ 'worker' : [ ws . worker_key for ws in workers ] ,
390355 'y' : y }
391356
392357 self .nbytes_figure .title .text = 'Bytes stored: ' + format_bytes (sum (nbytes ))
@@ -890,33 +855,28 @@ def update(self):
890855
891856def systemmonitor_doc (scheduler , extra , doc ):
892857 with log_errors ():
893- table = StateTable (scheduler )
894858 sysmon = SystemMonitor (scheduler , sizing_mode = 'scale_width' )
895859 doc .title = "Dask Scheduler Internal Monitor"
896- doc .add_periodic_callback (table .update , 500 )
897860 doc .add_periodic_callback (sysmon .update , 500 )
898861
899- doc .add_root (column (table .root , sysmon .root ,
900- sizing_mode = 'scale_width' ))
862+ doc .add_root (column (sysmon .root , sizing_mode = 'scale_width' ))
901863 doc .template = template
902864 doc .template_variables ['active_page' ] = 'system'
903865 doc .template_variables .update (extra )
904866
905867
906868def stealing_doc (scheduler , extra , doc ):
907869 with log_errors ():
908- table = StateTable (scheduler )
909870 occupancy = Occupancy (scheduler , height = 200 , sizing_mode = 'scale_width' )
910871 stealing_ts = StealingTimeSeries (scheduler , sizing_mode = 'scale_width' )
911872 stealing_events = StealingEvents (scheduler , sizing_mode = 'scale_width' )
912873 stealing_events .root .x_range = stealing_ts .root .x_range
913874 doc .title = "Dask Workers Monitor"
914- doc .add_periodic_callback (table .update , 500 )
915875 doc .add_periodic_callback (occupancy .update , 500 )
916876 doc .add_periodic_callback (stealing_ts .update , 500 )
917877 doc .add_periodic_callback (stealing_events .update , 500 )
918878
919- doc .add_root (column (table . root , occupancy .root , stealing_ts .root ,
879+ doc .add_root (column (occupancy .root , stealing_ts .root ,
920880 stealing_events .root ,
921881 sizing_mode = 'scale_width' ))
922882
0 commit comments