[Object spilling] Queue failed object creation requests until objects have been spilled#11796
Conversation
|
Before reviewing the PR, Why don't we add this test? #11673 (comment). Is there any other blocker that makes this still fail? |
I already added that test in #11673. |
rkooo567
left a comment
There was a problem hiding this comment.
I wonder if we need some timeout or max_requeue to avoid hanging forever when there's an issue with spilling itself.
| } | ||
|
|
||
| void PlasmaStore::ProcessCreateRequests(size_t num_bytes_space) { | ||
| for (auto request = create_request_queue_.begin(); |
There was a problem hiding this comment.
| for (auto request = create_request_queue_.begin(); | |
| for (auto request_it = create_request_queue_.begin(); |
|
|
||
| ray::SpaceReleasedCallback OnSpaceReleased() { | ||
| return [this](size_t num_bytes_required) { | ||
| main_service_.post([this, num_bytes_required]() { |
|
Seems this hangs for me, is it expected? |
ericl
left a comment
There was a problem hiding this comment.
Looks pretty good, left some comments. Another thought was that now that we have a creation queue, we should (later) change the spilling mechanism to calculate spilling based on the creation queue size / contents, rather than have an explicit Spill(n) call. Then the object creation flow becomes pretty simple:
enqueue(createRequest)
TriggerSpillingIfNeeded()
ProcessCreateRequests()
| RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, mmap_size)); | ||
| if (error_code == PlasmaError::OK && device_num == 0) { | ||
| RAY_RETURN_NOT_OK(client->SendFd(object.store_fd)); | ||
| auto status = HandleCreateObjectRequest(client, message); |
There was a problem hiding this comment.
I think we also need to queue further requests if create_request_queue_.size() > 0, to prevent starvation. No objects should be allowed to be created directly while spilling is in progress.
There was a problem hiding this comment.
Can you explain more? I think it's actually okay for other objects to get created, such as if they're smaller than the objects that couldn't fit.
There was a problem hiding this comment.
Well, if there is a flood of small objects, that can prevent enough space from ever being created for large objects (e.g., a 10GB chunk).
While it may make sense to allow those to get created in the future as an optimization, how about we simplify the queueing behavior by enforcing FIFO priority for all objects?
There was a problem hiding this comment.
Anyway, I think your suggestion about just queuing all requests will address this, right?
| /// spilled, we will attempt to process these requests again and respond to | ||
| /// the client if successful or out of memory. If more objects must be | ||
| /// spilled, the request will be replaced at the head of the queue. | ||
| /// TODO(swang): We should also queue objects here even if there is no room |
There was a problem hiding this comment.
This TODO is already done right? Since you're deferring responses
There was a problem hiding this comment.
Maybe it's not clear from the comment, but there are now two types of OutOfMemory errors, transient (but we expect there to be space once spilling is done) or definite (unless some objects get released, there's definitely no space). Right now we only queue requests for the former.
There was a problem hiding this comment.
Can we remove the TODO then? I think it doesn't make sense to queue the latter (should just raise OOM if only mmaped objects are in the store).
There was a problem hiding this comment.
Actually I think it does make sense to eventually queue the latter too. Right now we have to retry from the client on OOM, but that can add unnecessary delays in cases where other objects will actually get released soon. If we queue on the plasma store (with a timeout), we can create the object as soon as there's enough space. Also we could potentially reduce false positives if the plasma store makes the decision instead of the client.
There was a problem hiding this comment.
Right yeah, I think eventually we want all decision making to be centralized in this queue (including trigger of global gc, etc.).
There was a problem hiding this comment.
The latter is possible after we make create asynchronous right?
| RAY_RETURN_NOT_OK(client->SendFd(object.store_fd)); | ||
| auto status = HandleCreateObjectRequest(client, message); | ||
| if (status.IsTransientObjectStoreFull()) { | ||
| create_request_queue_.push_back({client, message}); |
There was a problem hiding this comment.
Would the code be simplified if we always pushed new requests into the queue and called ProcessCreateRequests(), rather than only enqueueing conditionally? The overhead of pushing into a list and immediately popping should be very small.
There was a problem hiding this comment.
Sure, I can try that.
Btw this is incorrect right? IIUC requests get queued indefinitely until space is available (which is probably fine, unless we can detect that no further spilling is possible) |
Probably because the default number of workers is 1 (#11789). |
No, the requests will get queued until spilling either succeeded or failed. In both cases, the raylet will callback to the plasma store. From there, the plasma store will try the creation request again, and then there are three possibilities:
|
Where's the code that determines "even after eviction"? It seems to me the retry of the creation request is the same the second time so it would get queued up again even in that case.
I believe the current code doesn't re-queue; the request stays at the head of the queue since we break out of the loop before calling erase(). Is this not the case? |
This still fails (10 io workers). |
Isn't this expected to fail? We can't fit both of the ids in the plasma store at the same time. |
Ah that's right, I meant that the request stays at the head of the queue but will get retried later once a round of spilling succeeds or fails. |
Yep good catch.
Got it. I forgot the spill decision is within the plasma create call itself. |
| /// \param num_bytes_space A lower bound on the number of bytes of space that | ||
| /// have been made newly available, since the last time this method was | ||
| /// called. | ||
| void ProcessCreateRequests(size_t num_bytes_space); |
There was a problem hiding this comment.
This argument isn't actually used, can we remove it?
|
It turns out that Travis is failing because of the same problem that I came across earlier in #11673:
I think we can fix this by moving the OutOfMemory client-side timer into the plasma store, like we talked about in this PR. @ekl, @rkooo567 are you guys okay with just disabling that unit test for now, and I can get started on a more comprehensive refactor/fix next? I'm worried about adding it to this PR because we're already adding a lot of logic to the plasma store that isn't getting unit-tested at all. |
|
Sounds good to me. I think we can add this logic (and disable core worker retry) when we develop the diagram we discussed today? |
Yeah, I think we should just do the refactor to pull the queue out of the PlasmaStore and remove the client-side retry in the same PR. |
Why are these changes needed?
This adds a queue for object creation requests at the plasma store, so that the store can choose to respond to the client later. Currently, requests are added to the queue if there is likely to be enough space for the object once pending object spills to external storage complete. Requests are removed from the queue once spilling completes, whether successful or not. The plasma store will respond to the client once creation succeeds or fails (because the object store still does not have enough space after both eviction and spilling). In the latter case, the client will follow the existing codepath to retry the request after some time or throw an OutOfMemory error to the application.
Eventually, we should think about removing client retries entirely and handling failed object creation requests in the plasma store, since the plasma store and raylet threads have a better idea of when space may become available than the client.
Related issue number
Closes #11772.
Checks
scripts/format.shto lint the changes in this PR.Unfortunately, the plasma store does not have any existing unit tests, so this PR is a bit hard to test without a major refactor of the plasma store. This PR relies on the Python tests for now.