Skip to content

Document communication protocol #3357

@Kobzol

Description

@Kobzol

Hi! Together with @spirali we are attempting to rewrite the Dask scheduler in Rust (#3139). We had some initial success, but when we moved to more advanced Dask programs (for example distributed pandas), the communication protocol between the clients/workers and the scheduler became very difficult to handle.

Right now we are facing two issues:

  1. The serialization format of the protocol is somewhat static-type-language unfriendly, which is described here.
  2. We haven't found a proper and complete documentation of the Dask communication protocol (which messages are available between client/scheduler and worker/scheduler, what are their parameters, which parameters are optional etc.). It is difficult to reimplement the scheduler without the protocol documented. There is some information at https://distributed.dask.org/en/latest/protocol.html, but that's just a very small part of the protocol.

For example, we thought that the compute-task message which is sent from the scheduler to workers needs to have function and args parameters which contain the necessary code and data to run on the worker. However, when running the following Python script:

import dask
from dask.distributed import Client

client = Client("tcp://localhost:8786")
df = dask.datasets.timeseries(start="2020-01-30", end="2020-01-31")
print(len(df))

df.groupby("name")["x"].mean().compute()
df[(df["x"] > 0) | (df["y"] < 0)].compute()

the scheduler sends some compute-task messages to workers which do not contain the function and args keys:

{
    'duration': 0.5,
    'key': "('series-groupby-count-chunk-series-groupby-count-agg-24c2448278b10581d563ee8a2bb5c45b', '0)'",
    'nbytes': {"('make-timeseries-11817facbee00a90e53448d7973e9de2', 0)": 8205680},
    'op': 'compute-task',
    'priority': (0, 1, 1),
    'who_has': {"('make-timeseries-11817facbee00a90e53448d7973e9de2', 0)": [   'tcp://127.0.0.1:45605']}
}

Another problem is that the definition of the task itself (inside tasks dictionary in update-graph messages) may be serialized. Even if it only uses msgpack, the serialized format can be quite complex and we are not sure how to reimplement the (de)serialization in Rust. Without a proper documentation of the format we can only guess how to implement the scheduler properly.

I suspect that there may be some legacy cruft hidden inside the Dask communication protocol (both the serialization format and the message API itself) and it may be worth it to document it properly and possibly make some simplifications. If this documentation exists somewhere already please let us know. What do you think?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions