Skip to content

Review functionality: add worker resources on the fly #6677

@crusaderky

Description

@crusaderky

These methods were added in #857 and were not functionally iterated upon since:

def add_resources(self, worker: str, resources=None):
ws: WorkerState = self.workers[worker]
if resources:
ws.resources.update(resources)
ws.used_resources = {}
for resource, quantity in ws.resources.items():
ws.used_resources[resource] = 0
dr = self.resources.get(resource, None)
if dr is None:
self.resources[resource] = dr = {}
dr[worker] = quantity
return "OK"

async def set_resources(self, **resources) -> None:
for r, quantity in resources.items():
if r in self.total_resources:
self.state.available_resources[r] += quantity - self.total_resources[r]
else:
self.state.available_resources[r] = quantity
self.total_resources[r] = quantity
await retry_operation(
self.scheduler.set_resources,
resources=self.total_resources,
worker=self.contact_address,
)

They are affected by several problems:

  • It seems to be public API (it's not used anywhere internally), but it's undocumented
  • It will cause a deadlock if you remove resources below what an enqueued task requires
  • It should trigger _ensure_computing when increasing resources
  • It's prone to race conditions, where the scheduler sends a ComputeTaskEvent for a task but the necessary resources are no longer available
  • The matching method in Scheduler is ambiguous: it's called add_resources, but its RPC comms handler is called set_resources
  • The Scheduler handler should use batched comms, not RPC
  • Scheduler.add_resources is also invoked to replace (not add to) the worker resources at every heartbeat - which is unnecessary

CC @mrocklin (who originally wrote #857), @hendrikmakait

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions