[Object manager] don't abort entire pull request on race condition in concurrent chunk receive#18955
[Object manager] don't abort entire pull request on race condition in concurrent chunk receive#18955ericl merged 8 commits intoray-project:masterfrom
Conversation
|
Instead of retrying at a higher level, which can increase load in 1->many broadcast situations, why not fix the race condition in the first place? E.g., if the chunk already exists (but is unsealed), don't raise an IOError (e.g., comment out the exception). |
|
Yeah we should not reduce the retry interval. This will increase the load to systems a lot as our retry policy is pretty naive. I think there are 2 approaches here.
|
|
Thanks @ericl and @rkooo567 for the suggestions. I went with a solution similar to Eric's and Sang's original approach. The difference is instead of allowing multiple inflight create buffer ops, only 1 inflight create buffer ops is allowed. Because if we ignore already exists errors during buffer creation, there still needs to be a way for operations to wait until their corresponding The new logic can be different from the previous logic in thread and memory usages, under race condition. These behaviors should be similar to the solution of ignoring already existed errors. |
src/ray/common/ray_config_def.h
Outdated
|
|
||
| /// Timeout, in milliseconds, to wait before retrying a failed pull in the | ||
| /// ObjectManager. | ||
| /// The object manager's global timer interval in milliseconds. |
There was a problem hiding this comment.
Please revert unrelated changes.
ericl
left a comment
There was a problem hiding this comment.
cc @iycheng for a more detailed review
| RAY_CHECK(lock.owns_lock()); | ||
|
|
||
| // Buffer for object_id already exists. | ||
| if (create_buffer_state_.contains(object_id)) return ray::Status::OK(); |
There was a problem hiding this comment.
I think our style convention is to always put returns on a new line with braces, and never inline.
| cond_var->wait( | ||
| lock, [this, object_id]() { return !create_buffer_ops_.contains(object_id); }); | ||
| // Buffer already created. | ||
| if (create_buffer_state_.contains(object_id)) return ray::Status::OK(); |
| const uint64_t default_chunk_size_; | ||
|
|
||
| /// Mutex to protect create_buffer_ops_ and create_buffer_state_. | ||
| mutable std::mutex pool_mutex_; |
There was a problem hiding this comment.
Can you change this to use absl condition var support instead of std::condition_variable? https://abseil.io/docs/cpp/guides/synchronization
|
@rkooo567 , solution 2 isn't solving the fundamental issue. Solution 1 is simple, faster, and solves the root "bug" here. |
| ray::Status EnsureBufferExists(const ObjectID &object_id, | ||
| const rpc::Address &owner_address, uint64_t data_size, | ||
| uint64_t metadata_size, uint64_t chunk_index) | ||
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(pool_mutex_); |
There was a problem hiding this comment.
Btw I think we are omitting the ABSL_ prefix on annotations.
ericl
left a comment
There was a problem hiding this comment.
Looks good, but please change the annotations to be consistent with others (no ABSL_ prefix).
|
Windows build broken in master |
…ition in concurrent chunk receive (ray-project#18955)" This reverts commit d12e35c.
Why are these changes needed?
See #18062 for investigation and background.
This change ensures there is at most 1 inflight operation to create buffer for an object, when handling multiple chunks pushed from the object. This avoids the race condition where multiple operations race to create the buffer for the object and fail, forcing pulling to be retried.
Test from #18143 is pull into this change.
Related issue number
#18062
Checks
scripts/format.shto lint the changes in this PR.