Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 30m 0s ⏱️ - 5m 44s For more details on these failures, see this check. Results for commit d5320b9. ± Comparison against base commit 69a5709. ♻️ This comment has been updated with latest results. |
|
As a heads-up I may not have time to review this during the next couple of days (prepping presentations, travelling, delivering presentations). The opening comment seems not-scary to me though. I wouldn't block on my review if you're feeling confident. |
hendrikmakait
left a comment
There was a problem hiding this comment.
Generally, I like this change, the new separation of concerns looks like a great improvement. I have a few questions about minor parts of this PR as well as a number of small nits and suggestions.
distributed/shuffle/_buffer.py
Outdated
| sizes: defaultdict[str, int] | ||
| _exception: None | Exception | ||
|
|
||
| _queues: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary() |
There was a problem hiding this comment.
From what I understand, we do not need this anymore.
distributed/shuffle/_buffer.py
Outdated
| _queues: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary() | ||
|
|
||
| @property | ||
| def _instances(self) -> set: |
There was a problem hiding this comment.
Should we implement this or mark as # TODO?
There was a problem hiding this comment.
Dead code. Should also be removed
| return { | ||
| "memory": self.bytes_memory, | ||
| "total": self.bytes_total, | ||
| "buckets": len(self.shards), |
There was a problem hiding this comment.
Are self.shards shards or buckets?
There was a problem hiding this comment.
shards is a mapping from buckets to shards. buckets are basically the partition IDs
There was a problem hiding this comment.
Let's maybe rename self.shards to self.bucketed_shards or self.partitioned_shards to highlight that.
There was a problem hiding this comment.
IMO this is OK. We're not naming every single mapping as key_value. Context is typically sufficient to infer this. In this case I think the shards mapping is fine and a more verbose attribute name make the code harder to read
There was a problem hiding this comment.
Together with the new docstrings I agree this is now fine.
| self.bytes_memory -= size | ||
|
|
||
| async def _process(self, id: str, shards: list[ShardType]) -> None: | ||
| raise NotImplementedError() |
There was a problem hiding this comment.
For my understanding: Do we (not) want to utilize abc.ABC to mark ShardsBuffer as an abstract base class?
There was a problem hiding this comment.
What would be the benefit of this?
There was a problem hiding this comment.
Marking ShardsBuffer as an abc.ABC as well as decorating _process with @abc.abstractmethod feels mildly cleaner since ShardsBuffer should not be initialized and any concrete subclass needs to implement _process[1]. This would allow mypy to catch errors such as trying to instantiate ShardsBuffer or a subclass that did not implement _process. Similarly, Python would throw somewhat more informative errors at runtime.
I guess this is mainly a question about conventions: If a class is an abstract class, should it always be marked as such or do we not care about that. If it is the latter, why do we not care? (We don't see any value in this case is a perfectly valid answer).
Feel free to shut this discussion down, the case at hand revolves around an abstract class that will have two concrete subclasses for the foreseeable future and is in a private module, so I'm rather interested in whether we have some conventions/guidelines around marking ABCs, not splitting hairs about this specific case.
[1] read not so much, which leads me off-trail to asking whether read belongs here in the class hierarchy, but that's a fringe problem that requires more changes and I don't want to deal with this right now.
There was a problem hiding this comment.
This is a first shot. These classes will still change over time significantly and I'm not worried about anybody inheriting from this class since it's private.
If this is something we want to do we can do this later. This PR is definitely too big to have this conversation. I also don't want to block because of this
|
|
||
| async def shuffle_inputs_done(self, shuffle_id: ShuffleId) -> None: | ||
| """ | ||
| Hander: Inform the extension that all input partitions have been handed off to extensions. |
There was a problem hiding this comment.
Driveby:
| Hander: Inform the extension that all input partitions have been handed off to extensions. | |
| Handler: Inform the extension that all input partitions have been handed off to extensions. |
(same in L327)
|
|
||
| worker_for_mapping = {} | ||
|
|
||
| for part in range(npartitions): |
There was a problem hiding this comment.
nit for readability:
| for part in range(npartitions): | |
| for output_partition in range(npartitions): |
(requires similar adjustments below)
| total_bytes_recvd += metrics["disk"]["total"] | ||
| total_bytes_recvd_shuffle += s.total_recvd | ||
|
|
||
| assert total_bytes_recvd_shuffle == total_bytes_sent |
There was a problem hiding this comment.
| assert total_bytes_recvd_shuffle == total_bytes_sent | |
| assert total_bytes_recvd_shuffle == total_bytes_sent == total_bytes_recvd |
From what I understand, we probably want to test that as well.
There was a problem hiding this comment.
these are not identical. there appears to be a small drift between sizes when comparing pyarrow buffers and bytes directly. Therefore, what the comms measure is slightly different from what is actually received. This is very ugly but I'm not willing to fix this right now. Refactoring the serialization parts is out of scope
There was a problem hiding this comment.
Let's highlight this in the test then or drop total_bytes_recvd altogether. Right now it looks like we just forgot to do something useful with total_bytes_recvd.
distributed/shuffle/_comms.py
Outdated
| queue: asyncio.Queue | ||
| A queue holding tokens used to limit concurrency |
There was a problem hiding this comment.
| queue: asyncio.Queue | |
| A queue holding tokens used to limit concurrency |
| total_bytes_recvd += metrics["disk"]["total"] | ||
| total_bytes_recvd_shuffle += s.total_recvd | ||
|
|
||
| assert total_bytes_recvd_shuffle == total_bytes_sent |
There was a problem hiding this comment.
Let's highlight this in the test then or drop total_bytes_recvd altogether. Right now it looks like we just forgot to do something useful with total_bytes_recvd.
| return { | ||
| "memory": self.bytes_memory, | ||
| "total": self.bytes_total, | ||
| "buckets": len(self.shards), |
There was a problem hiding this comment.
Let's maybe rename self.shards to self.bucketed_shards or self.partitioned_shards to highlight that.
| self.bytes_memory -= size | ||
|
|
||
| async def _process(self, id: str, shards: list[ShardType]) -> None: | ||
| raise NotImplementedError() |
There was a problem hiding this comment.
Marking ShardsBuffer as an abc.ABC as well as decorating _process with @abc.abstractmethod feels mildly cleaner since ShardsBuffer should not be initialized and any concrete subclass needs to implement _process[1]. This would allow mypy to catch errors such as trying to instantiate ShardsBuffer or a subclass that did not implement _process. Similarly, Python would throw somewhat more informative errors at runtime.
I guess this is mainly a question about conventions: If a class is an abstract class, should it always be marked as such or do we not care about that. If it is the latter, why do we not care? (We don't see any value in this case is a perfectly valid answer).
Feel free to shut this discussion down, the case at hand revolves around an abstract class that will have two concrete subclasses for the foreseeable future and is in a private module, so I'm rather interested in whether we have some conventions/guidelines around marking ABCs, not splitting hairs about this specific case.
[1] read not so much, which leads me off-trail to asking whether read belongs here in the class hierarchy, but that's a fringe problem that requires more changes and I don't want to deal with this right now.
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
hendrikmakait
left a comment
There was a problem hiding this comment.
Thanks for adding the additional documentation, @fjetter. This looks good to me, my remaining questions/nits can be safely ignored.
| return { | ||
| "memory": self.bytes_memory, | ||
| "total": self.bytes_total, | ||
| "buckets": len(self.shards), |
There was a problem hiding this comment.
Together with the new docstrings I agree this is now fine.
This builds on #7195 which I was not able to finish because I ran into concurrency issues due to leaky tests. Particularly the global memory_limits on the MultiFile and MultiComm classes caused problems.
I went ahead and rewrote major parts of the concurrency model
CommShardsBufferandDiskShardsBufferrespectivelyShardsBufferthat implements the concurrency and limiting of both classesShuffleneeded to be rewritten slightly and uses another offload (specifically inShuffle.add_partitions). We're still doing all the compute on a thread but it's a different thread now. The actual worker thread is mostly idle now.ResourceLimiterprimitive. This is a fairly simple class that allows one to acquire as much of a given resources as one wants but allows us to wait until the acquired resource drops below the specified level. This behavior represents our buffer usage pretty well. Blocking on acquire is not applicable since the memory is already in memory. However, after this acquire we'd like to wait for the memory to calm down before releasing again. This is effectively the same backpressure protocol as on main. This also allows for more flexible control over the buffer size (e.g. comm + disk buffer could have the same limit if we wanted to) and allows for much safer testing by now leaking global stateI haven't touched serialization, worker dispatching, etc. This is all merely about control flow and responsibility. In follow up PRs I intend to modify actual behavior (e.g. fail on leaving workers)
cc @hendrikmakait @mrocklin