Conversation
72a7d1b to
f8b7dc9
Compare
2d4d221 to
fc40639
Compare
fc40639 to
ca6d0f6
Compare
|
@jrbourbeau @mrocklin @jakirkham, this is ready for the first round of reviews :) |
| try: | ||
| with cache_dumps_lock: | ||
| result = cache_dumps[func] | ||
| except KeyError: |
There was a problem hiding this comment.
I still don't have a good feeling on what timescales where currently optimizing or whether this is a particularly performance critical section. Therefore, this comment might be irrelevant.
However, exception handling is relatively expensive and if we encounter a lot of cache misses a isin should be faster. That's ns level optimization. I could imagine the pickling is usually an order of magnitude slower and this doesn't matter at all
| with cache_dumps_lock: | ||
| result = cache_dumps[func] | ||
| except KeyError: | ||
| result = pickle.dumps(func, protocol=4) |
There was a problem hiding this comment.
Any reason why protocol=4 is hard coded?
There was a problem hiding this comment.
I am curious too :)
This is taken directly from
distributed/distributed/worker.py
Line 3527 in fa5d993
@jakirkham do you know?
There was a problem hiding this comment.
That line comes from @mrocklin's PR ( #4019 ), which allowed connections to dynamically determine what compression and pickle protocols are supported and then use them in communication. In a few places I think Matt found it easier to simply force pickle protocol 4 than allow it to be configurable. So if this is coming from that worker code, that is the history
| result = cache_dumps[func] | ||
| except KeyError: | ||
| result = pickle.dumps(func, protocol=4) | ||
| if len(result) < 100000: |
There was a problem hiding this comment.
I think we can be a bit more generous with the cache size. currently we're at 100 (LRU maxsize) * 100_000 B (result) ~ 1MB. Considering how much stuff we're logging without taking size into account too much, I would suggest to be more generous with this upper limit since large results are the juicy cache hits.
There was a problem hiding this comment.
Related: in loads_function, what if we used hash(bytes_object) as the key instead of bytes_object itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.
There was a problem hiding this comment.
It sounds like that was just copied and moved over from here
distributed/distributed/worker.py
Line 3528 in fa5d993
Perhaps we can make a new issue and revisit?
There was a problem hiding this comment.
Perhaps we can make a new issue and revisit?
My plan is to remove worker.dumps_function() completely, it shouldn't be required to call it explicitly.
There was a problem hiding this comment.
Ah ok in that case I don't think the protocol=4 bit above will be needed
|
|
||
| def loads_function(bytes_object): | ||
| """ Load a function from bytes, cache bytes """ | ||
| if len(bytes_object) < 100000: |
There was a problem hiding this comment.
personal preference: I would put the size of the cache into a constant s.t. the two function don't drift apart
|
@fjetter good points! When we settle on an overall design I will incorporate you suggestions. Right now I am waiting on @jrbourbeau @mrocklin to review the overall design before continuing :) |
gjoseph92
left a comment
There was a problem hiding this comment.
This does seem a bit cleaner and simpler to me, without being a fundamental change, which is nice. I haven't thought more carefully about the implications yet though.
| result = cache_dumps[func] | ||
| except KeyError: | ||
| result = pickle.dumps(func, protocol=4) | ||
| if len(result) < 100000: |
There was a problem hiding this comment.
Related: in loads_function, what if we used hash(bytes_object) as the key instead of bytes_object itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.
| dask_deserialize = dask.utils.Dispatch("dask_deserialize") | ||
|
|
||
| _cached_allowed_modules = {} | ||
| non_list_collection_types = (tuple, set, frozenset) |
There was a problem hiding this comment.
Are set and frozenset necessary here, since they can't contain lists or dicts, even recursively within tuples?
>>> {([])}
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'| and ( | ||
| "pickle" not in serializers | ||
| or serializers.index("pickle") > serializers.index("msgpack") | ||
| ) |
There was a problem hiding this comment.
Do we still care about whether pickle is used or not, now that we have msgpack_persist_lists?
Related: what happens if a MsgpackList gets pickled? Won't it be passed on (in a task, say) as a MsgpackList, not a plain list? Whereas msgpack_decode_default returns them as plain lists.
| return {"__Set__": True, "as-tuple": tuple(obj)} | ||
|
|
||
| if typ is MsgpackList: | ||
| return {"__MsgpackList__": True, "as-tuple": tuple(obj.data)} |
There was a problem hiding this comment.
| return {"__MsgpackList__": True, "as-tuple": tuple(obj.data)} | |
| return {"__MsgpackList__": True, "as-tuple": obj.data} |
What would happen if we did this instead? obj.data should already be a list, so I'm wondering if the extra copy to a tuple is necessary.
mrocklin
left a comment
There was a problem hiding this comment.
In general things here seem ok. There are issues around passing through the list of serializers. We need to make sure that we can turn pickle off.
| # With <https://github.com/dask/distributed/pull/4699>, | ||
| # deserialization is done as part of communication. |
There was a problem hiding this comment.
@jrbourbeau I think that you might want to be aware of this change
| if typ in (Serialized, SerializedCallable): | ||
| sub_header, sub_frames = obj.header, obj.frames | ||
| elif callable(obj): | ||
| sub_header, sub_frames = {"callable": dumps_function(obj)}, [] |
There was a problem hiding this comment.
Functions can be quite large sometimes, for example if users close over large variables out of function scope. Msgpack may not handle this well in some cases
x = np.arange(1000000000)
def f(y):
return y + x.sum()Obviously users shouldn't do this, but they will.
There was a problem hiding this comment.
It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.
| sub_header, sub_frames = serialize_and_split( | ||
| obj, serializers=serializers, on_error=on_error, context=context | ||
| ) | ||
| _inplace_compress_frames(sub_header, sub_frames) |
There was a problem hiding this comment.
The inplace stuff always makes me uncomfortable. Thoughts on making new header/frames dict/lists here instead?
For reference, it was these sorts of inplace operations that previously caused us to run into the msgpack tuple vs list difference. I think that avoiding them when we can is useful, unless there is a large performance boost (which I wouldn't expect here).
| if deserialize == "delay-exception": | ||
| return DelayedExceptionRaise(e) |
There was a problem hiding this comment.
I am confused about when this is necessary and why it wasn't before. I'm wary of creating new systems like this if we can avoid it.
There was a problem hiding this comment.
I think I understand this now that I've seen the c.submit(identity, Foo()) test below
| # `__MsgpackList__`, we decode it here explicitly. This way | ||
| # we can delay the convertion to a regular `list` until it | ||
| # gets to a worker. | ||
| if "__MsgpackList__" in obj: |
There was a problem hiding this comment.
What is the type of obj here? Is in the right test here, or is this special value in a more specific place?
| header, frames = serialize([[[x]]]) | ||
| assert "dask" in str(header) | ||
| assert len(frames) == 1 | ||
| assert x.data == np.frombuffer(frames[0]).data |
There was a problem hiding this comment.
I'm curious, why did we drop this test?
| function, args, kwargs = await c._recreate_error_locally(f) | ||
| assert f.status == "error" | ||
| assert function.__name__ == "div" | ||
| assert args == (1, 0) |
There was a problem hiding this comment.
I'm curious, what happened here?
distributed/tests/test_client.py
Outdated
| assert results == list(map(inc, range(10))) | ||
| assert a.data and b.data | ||
| assert results == list(map(inc, range(10))) | ||
| assert a.data and b.data |
There was a problem hiding this comment.
Hrm, you mentioned this in meeting a couple of weeks ago. I see now how this is unfortunate.
I would expect this test to now be written as
with pytest.raises(CancelledError):
await c.submit(identity, Foo())I wouldn't expect the other lines here to be indented. In general seeing assert statements under a raises context manager is a sign that something is unclean :)
There was a problem hiding this comment.
I have changed it to make it more clear what is going on:
# Notice, because serialization is delayed until `distributed.batched`
# we don't get an exception immediately. The exception is raised and logged
# when the ongoing communication between the client the scheduler encounters
# the `Foo` class. Before <https://github.com/dask/distributed/pull/4699>
# the serialization happened immediately in `submit()`, which would raise the
# `MyException`.
with captured_logger("distributed") as caplog:
future = c.submit(identity, Foo())
# We sleep to make sure that a `BatchedSend.interval` has passed.
await asyncio.sleep(c.scheduler_comm.interval)
# Check that the serialization error was logged
assert "Failed to serialize" in caplog.getvalue()I'm curious, what do you think of the approach? We cannot easily catch the exception because it happens as part of the ongoing communication and not in the submit() call, but at least we log the exception.
distributed/tests/test_client.py
Outdated
| with pytest.raises(TypeError): | ||
| await c.run_on_scheduler(lambda: inc) | ||
| await c.run(lambda: inc) | ||
| await c.run_on_scheduler(lambda: inc) |
There was a problem hiding this comment.
If the user has specified that they don't want to allow serialization with pickle then these should continue to fail. Probably we need to feed the list of serializers down wherever serialize is beting called. I expect that this might be awkward to do when going through msgpack machinery. Maybe there is some global that we can misuse?
distributed/tests/test_core.py
Outdated
| async with rpc(server.address, serializers=["msgpack"]) as r: | ||
| with pytest.raises(TypeError): | ||
| await r.echo(x=to_serialize(inc)) | ||
| await r.echo(x=to_serialize(inc)) |
There was a problem hiding this comment.
These sorts of changes are probably not ok. They fundamentally change the intent of the test, which is to ensure that things like this can be disallowed.
mrocklin
left a comment
There was a problem hiding this comment.
In general things here seem ok. There are issues around passing through the list of serializers. We need to make sure that we can turn pickle off.
| with cache_dumps_lock: | ||
| result = cache_dumps[func] | ||
| except KeyError: | ||
| result = pickle.dumps(func, protocol=4) |
| if typ in (Serialized, SerializedCallable): | ||
| sub_header, sub_frames = obj.header, obj.frames | ||
| elif callable(obj): | ||
| sub_header, sub_frames = {"callable": dumps_function(obj)}, [] |
There was a problem hiding this comment.
It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.
| cache_dumps = LRU(maxsize=100) | ||
| cache_loads = LRU(maxsize=100) | ||
| cache_dumps_lock = threading.Lock() | ||
| cache_loads_lock = threading.Lock() |
There was a problem hiding this comment.
Since we are discussing getting rid of dumps_function, will these still be needed or will they go away as well?
This PR streamlines the serialization in Distributed by relying on msgpack and only refer to the
serialize()/deserialize()infrastructure when encountering objects not supported by msgpack.black distributed/flake8 distributed/isort distributed