Skip to content

[Object spilling] Queue failed object creation requests until objects have been spilled#11796

Merged
stephanie-wang merged 9 commits intoray-project:masterfrom
stephanie-wang:async-spill
Nov 6, 2020
Merged

[Object spilling] Queue failed object creation requests until objects have been spilled#11796
stephanie-wang merged 9 commits intoray-project:masterfrom
stephanie-wang:async-spill

Conversation

@stephanie-wang
Copy link
Copy Markdown
Contributor

Why are these changes needed?

This adds a queue for object creation requests at the plasma store, so that the store can choose to respond to the client later. Currently, requests are added to the queue if there is likely to be enough space for the object once pending object spills to external storage complete. Requests are removed from the queue once spilling completes, whether successful or not. The plasma store will respond to the client once creation succeeds or fails (because the object store still does not have enough space after both eviction and spilling). In the latter case, the client will follow the existing codepath to retry the request after some time or throw an OutOfMemory error to the application.

Eventually, we should think about removing client retries entirely and handling failed object creation requests in the plasma store, since the plasma store and raylet threads have a better idea of when space may become available than the client.

Related issue number

Closes #11772.

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Unfortunately, the plasma store does not have any existing unit tests, so this PR is a bit hard to test without a major refactor of the plasma store. This PR relies on the Python tests for now.

@rkooo567
Copy link
Copy Markdown
Contributor

rkooo567 commented Nov 4, 2020

Before reviewing the PR, Why don't we add this test? #11673 (comment). Is there any other blocker that makes this still fail?

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

Before reviewing the PR, Why don't we add this test? #11673 (comment). Is there any other blocker that makes this still fail?

I already added that test in #11673.

Copy link
Copy Markdown
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need some timeout or max_requeue to avoid hanging forever when there's an issue with spilling itself.

}

void PlasmaStore::ProcessCreateRequests(size_t num_bytes_space) {
for (auto request = create_request_queue_.begin();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (auto request = create_request_queue_.begin();
for (auto request_it = create_request_queue_.begin();


ray::SpaceReleasedCallback OnSpaceReleased() {
return [this](size_t num_bytes_required) {
main_service_.post([this, num_bytes_required]() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea!

@ericl
Copy link
Copy Markdown
Contributor

ericl commented Nov 4, 2020

Seems this hangs for me, is it expected?

import json
import numpy as np
import ray

ray.init(
    num_cpus=1,
    object_store_memory=100 * 1024 * 1024,
    _system_config={
        "automatic_object_spilling_enabled": True,
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
            separators=(",", ":")
        )
    },
)

@ray.remote
def f():
    return np.zeros(10 * 1024 * 1024)

ids = []
for _ in range(2):
    x = f.remote()
    ids.append(x)

print("GET")
print(ray.get(ids))

Copy link
Copy Markdown
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good, left some comments. Another thought was that now that we have a creation queue, we should (later) change the spilling mechanism to calculate spilling based on the creation queue size / contents, rather than have an explicit Spill(n) call. Then the object creation flow becomes pretty simple:

enqueue(createRequest)
TriggerSpillingIfNeeded()
ProcessCreateRequests()

RAY_RETURN_NOT_OK(SendCreateReply(client, object_id, &object, error_code, mmap_size));
if (error_code == PlasmaError::OK && device_num == 0) {
RAY_RETURN_NOT_OK(client->SendFd(object.store_fd));
auto status = HandleCreateObjectRequest(client, message);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to queue further requests if create_request_queue_.size() > 0, to prevent starvation. No objects should be allowed to be created directly while spilling is in progress.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more? I think it's actually okay for other objects to get created, such as if they're smaller than the objects that couldn't fit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if there is a flood of small objects, that can prevent enough space from ever being created for large objects (e.g., a 10GB chunk).

While it may make sense to allow those to get created in the future as an optimization, how about we simplify the queueing behavior by enforcing FIFO priority for all objects?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I think your suggestion about just queuing all requests will address this, right?

/// spilled, we will attempt to process these requests again and respond to
/// the client if successful or out of memory. If more objects must be
/// spilled, the request will be replaced at the head of the queue.
/// TODO(swang): We should also queue objects here even if there is no room
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO is already done right? Since you're deferring responses

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not clear from the comment, but there are now two types of OutOfMemory errors, transient (but we expect there to be space once spilling is done) or definite (unless some objects get released, there's definitely no space). Right now we only queue requests for the former.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the TODO then? I think it doesn't make sense to queue the latter (should just raise OOM if only mmaped objects are in the store).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think it does make sense to eventually queue the latter too. Right now we have to retry from the client on OOM, but that can add unnecessary delays in cases where other objects will actually get released soon. If we queue on the plasma store (with a timeout), we can create the object as soon as there's enough space. Also we could potentially reduce false positives if the plasma store makes the decision instead of the client.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right yeah, I think eventually we want all decision making to be centralized in this queue (including trigger of global gc, etc.).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter is possible after we make create asynchronous right?

RAY_RETURN_NOT_OK(client->SendFd(object.store_fd));
auto status = HandleCreateObjectRequest(client, message);
if (status.IsTransientObjectStoreFull()) {
create_request_queue_.push_back({client, message});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the code be simplified if we always pushed new requests into the queue and called ProcessCreateRequests(), rather than only enqueueing conditionally? The overhead of pushing into a list and immediately popping should be very small.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can try that.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Nov 4, 2020
@ericl
Copy link
Copy Markdown
Contributor

ericl commented Nov 4, 2020

The plasma store will respond to the client once creation succeeds or fails (because the object store still does not have enough space after both eviction and spilling)

Btw this is incorrect right? IIUC requests get queued indefinitely until space is available (which is probably fine, unless we can detect that no further spilling is possible)

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

Seems this hangs for me, is it expected?

import json
import numpy as np
import ray

ray.init(
    num_cpus=1,
    object_store_memory=100 * 1024 * 1024,
    _system_config={
        "automatic_object_spilling_enabled": True,
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
            separators=(",", ":")
        )
    },
)

@ray.remote
def f():
    return np.zeros(10 * 1024 * 1024)

ids = []
for _ in range(2):
    x = f.remote()
    ids.append(x)

print("GET")
print(ray.get(ids))

Probably because the default number of workers is 1 (#11789).

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

The plasma store will respond to the client once creation succeeds or fails (because the object store still does not have enough space after both eviction and spilling)

Btw this is incorrect right? IIUC requests get queued indefinitely until space is available (which is probably fine, unless we can detect that no further spilling is possible)

No, the requests will get queued until spilling either succeeded or failed. In both cases, the raylet will callback to the plasma store. From there, the plasma store will try the creation request again, and then there are three possibilities:

  1. Creation succeeds.
  2. Not enough space even after eviction and considering objects that could be spilled.
  3. Trigger more spilling, which would create enough space for the request once complete. Requeue the request and repeat.

@ericl
Copy link
Copy Markdown
Contributor

ericl commented Nov 4, 2020

Not enough space even after eviction and considering objects that could be spilled.

Where's the code that determines "even after eviction"? It seems to me the retry of the creation request is the same the second time so it would get queued up again even in that case.

Trigger more spilling, which would create enough space for the request once complete. Requeue the request and repeat.

I believe the current code doesn't re-queue; the request stays at the head of the queue since we break out of the loop before calling erase(). Is this not the case?

@ericl
Copy link
Copy Markdown
Contributor

ericl commented Nov 4, 2020

import json
import numpy as np
import ray

ray.init(
    num_cpus=1,
    object_store_memory=100 * 1024 * 1024,
    _system_config={
        "automatic_object_spilling_enabled": True,
        "max_io_workers": 10,
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
            separators=(",", ":")
        )
    },
)

@ray.remote
def f():
    return np.zeros(10 * 1024 * 1024)

ids = []
for _ in range(2):
    x = f.remote()
    ids.append(x)

print("GET")
print(ray.get(ids))

This still fails (10 io workers).

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

Not enough space even after eviction and considering objects that could be spilled.

Where's the code that determines "even after eviction"? It seems to me the retry of the creation request is the same the second time so it would get queued up again even in that case.

Trigger more spilling, which would create enough space for the request once complete. Requeue the request and repeat.

I believe the current code doesn't re-queue; the request stays at the head of the queue since we break out of the loop before calling erase(). Is this not the case?

import json
import numpy as np
import ray

ray.init(
    num_cpus=1,
    object_store_memory=100 * 1024 * 1024,
    _system_config={
        "automatic_object_spilling_enabled": True,
        "max_io_workers": 10,
        "object_spilling_config": json.dumps(
            {"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
            separators=(",", ":")
        )
    },
)

@ray.remote
def f():
    return np.zeros(10 * 1024 * 1024)

ids = []
for _ in range(2):
    x = f.remote()
    ids.append(x)

print("GET")
print(ray.get(ids))

This still fails (10 io workers).

Isn't this expected to fail? We can't fit both of the ids in the plasma store at the same time.

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

Not enough space even after eviction and considering objects that could be spilled.

Where's the code that determines "even after eviction"? It seems to me the retry of the creation request is the same the second time so it would get queued up again even in that case.

  1. Creation succeeds.
  2. Not enough space even after eviction and considering objects that could be spilled. => This will return an OutOfMemory error code, which the plasma store returns to the client. The client can either retry after some time or throw the exception.
  3. Trigger more spilling, which would create enough space for the request once complete. Requeue the request and repeat. => This will return a TransientOutOfMemory error. The plasma store will then break and wait for objects to get spilled before retrying the request.

Trigger more spilling, which would create enough space for the request once complete. Requeue the request and repeat.

I believe the current code doesn't re-queue; the request stays at the head of the queue since we break out of the loop before calling erase(). Is this not the case?

Ah that's right, I meant that the request stays at the head of the queue but will get retried later once a round of spilling succeeds or fails.

@ericl
Copy link
Copy Markdown
Contributor

ericl commented Nov 4, 2020

Isn't this expected to fail? We can't fit both of the ids in the plasma store at the same time.

Yep good catch.

Trigger more spilling, which would create enough space for the request once complete.

Got it. I forgot the spill decision is within the plasma create call itself.

/// \param num_bytes_space A lower bound on the number of bytes of space that
/// have been made newly available, since the last time this method was
/// called.
void ProcessCreateRequests(size_t num_bytes_space);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument isn't actually used, can we remove it?

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

It turns out that Travis is failing because of the same problem that I came across earlier in #11673:

Since there are multiple clients trying to create an object at the same time, only one of them will succeed after an object has been spilled. But then since all the clients retry at around the same time, the others retry while the one that succeeded is still creating its object (so its object can't be evicted or spilled). I managed to get this to work by resetting the normal OutOfMemory retries if the client ever receives the transient error, but I don't think that will work in all cases.

I think we can fix this by moving the OutOfMemory client-side timer into the plasma store, like we talked about in this PR.

@ekl, @rkooo567 are you guys okay with just disabling that unit test for now, and I can get started on a more comprehensive refactor/fix next? I'm worried about adding it to this PR because we're already adding a lot of logic to the plasma store that isn't getting unit-tested at all.

@rkooo567
Copy link
Copy Markdown
Contributor

rkooo567 commented Nov 5, 2020

Sounds good to me. I think we can add this logic (and disable core worker retry) when we develop the diagram we discussed today?

@stephanie-wang
Copy link
Copy Markdown
Contributor Author

Sounds good to me. I think we can add this logic (and disable core worker retry) when we develop the diagram we discussed today?

Yeah, I think we should just do the refactor to pull the queue out of the PlasmaStore and remove the client-side retry in the same PR.

@stephanie-wang stephanie-wang merged commit 61e4125 into ray-project:master Nov 6, 2020
@stephanie-wang stephanie-wang deleted the async-spill branch November 6, 2020 23:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Object Spilling] Avoid client starvation when spilling

3 participants