Skip to content

Ensure shard config option is actually configurable, #5061

Closed
ian-r-rose wants to merge 3 commits intodask:mainfrom
ian-r-rose:ensure-shard-size-is-configurable
Closed

Ensure shard config option is actually configurable, #5061
ian-r-rose wants to merge 3 commits intodask:mainfrom
ian-r-rose:ensure-shard-size-is-configurable

Conversation

@ian-r-rose
Copy link
Collaborator

@ian-r-rose ian-r-rose commented Jul 14, 2021

Since function defaults are only evaluated once, this isn't actually configurable after distributed is imported.

  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

@ian-r-rose ian-r-rose changed the title Ensure this config option is actually configurable, Ensure shard config option is actually configurable, Jul 14, 2021
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ian-r-rose! I left a couple of small comments, but overall this looks good 👍


from ..utils import nbytes

BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we need to inline this elsewhere. We're getting

E   ImportError: cannot import name 'BIG_BYTES_SHARD_SIZE' from 'distributed.protocol.utils' (/home/runner/work/distributed/distributed/distributed/protocol/utils.py)

in CI



def test_frame_split_is_configurable():
frame = b"1234abcd" * (2 ** 20) # 8 MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a small assert here to ensure/confirm that frame is 8 MiB?

In [1]: frame = b"1234abcd" * (2 ** 20) # 8 MiB

In [2]: from dask.utils import parse_bytes

In [3]: from dask.sizeof import sizeof

In [4]: assert sizeof(frame) == parse_bytes("8 MiB")

Copy link
Collaborator Author

@ian-r-rose ian-r-rose Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, happy to do that. I sort of figured that the test itself functioned as an assert (3*3 ~ 8 and 2*5 ~ 8), but an assertion is more explicit

>>> frame_split_size([b'12345', b'678'], n=3) # doctest: +SKIP
[b'123', b'45', b'678']
"""
n = n or dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens several times on every message sent. There may be thousands of messages per second. It may be worth considering performance here

In [1]: import dask, distributed

In [2]: %timeit dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))
2.2 µs ± 8.69 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [3]: %timeit dask.config.get("distributed.comm.shard") # maybe we choose to add an lru cache on parse_bytes
755 ns ± 4.08 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

This makes me slightly nervous

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve our immediate pain my preference would be something like #5052

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a good point. Perhaps we can read the config somewhat higher in the stack and pass it down to the protocol machinery

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing down the shard size is also done in the linked PR.

Doing this per-message would be good, and probably reduce the pain by 4x or so. It's still at a level that we might want to avoid though.

This just isn't that common of a use case. I don't think that people are likely to be dynamically changing shard size. I think that we want to set a conservative default for websockets, and then use environment variables / config options stored in yaml for other cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I agree it's not a common use-case, and it's not worth incurring the performance hit.

The semi-configurability of the shard size did cause me some pain today when I was testing some things, and for a while was running in circles trying to figure out why the config changes wasn't taking effect.

@ian-r-rose ian-r-rose closed this Jul 14, 2021
@mrocklin
Copy link
Member

mrocklin commented Jul 14, 2021 via email

mrocklin pushed a commit that referenced this pull request Jul 15, 2021
Supersedes #5052 . In addition to making the default websocket maximum-frame-size smaller, this makes the specific value configurable. It's somewhat redundant with distributed.comm.shard, but the constraints on websockets are sufficiently different that a separate config seems okay.

This does not implement the fix in #5061, as that would read a config value for every frame, which is costly. So the config value will in general not be changed after import time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants