-
Notifications
You must be signed in to change notification settings - Fork 19
Caching object store improvements: chunk coalescing #514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
In particular optimize for chunk-batching and minimzing the outgoing call count.
| // Entry is fresh; we need to evaluate it in the upcoming batch... | ||
| chunk_batches.push((key.clone(), tx)); | ||
| // ... and also create a new sender/receiver for the next pending entry | ||
| (tx, rx) = channel(Ok(Bytes::new())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We end up with one orphan channel here at the end that's never used, right? (since we create one in the beginning, assign it if we need to do a fetch + create a new one for the next fetch)
If it's one channel per chunk, do we hit weird scalability issues if we're trying to download a few thousand 1MB ranges?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct on the orphan channel comment, but it would add even more complexity in trying to avoid it.
Not sure about scalability issues, haven't tested it against anything more involved than TPC-H SF10
| // Evaluate the accumulated chunk batch if either the current entry is not fresh | ||
| // (meaning we can not extend the batch anymore and keep it continuous), or | ||
| // we've reached the last chunk (meaning there are not more chunks to be batched). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, possible that DF already does this pre-coalescing for us (https://github.com/apache/arrow-rs/blob/a999fb86764e9310bb4822c7e7c6551f247e0e0b/object_store/src/lib.rs#L573), but we'll need to look at some read stats to figure out what we need to do besides just providing a cache
| let file_manager = self.file_manager.clone(); | ||
| tokio::task::spawn(async move { | ||
| // Run pending tasks to avert eviction races. | ||
| cache.run_pending_tasks().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be some kind of a critical section/lock? Intuitively, we're in a concurrent context and so whatever state we put the cache in after this run_pending_tasks might not stay the same when we reach the next line. What does the eviction race look like here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the eviction race look like here?
It looks like:
- moka starting the eviction future
- a new entry comes in and gets persisted to disk
- the eviction future completes and deletes the above new entry (but the file pointer in the cache stays)
- the cached file read errors out
There are some nuances above (different outcomes depending on whether we error out on writing the cache file or not).
| // there should be no need to wrap them in an `Arc`. | ||
| result.put(batch_data.clone()); | ||
|
|
||
| // TODO: execute this in a separate task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this -> memory -> disk state transition is in a separate task, we don't need another series of task::spawn to do the disk writes, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do, since we'd need to join/await on the byte range fetches before returning and don't want to block on the file writes as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really? I thought this meant we'd spawn multiple tasks that all have a (key, tx, shared pointer to batch_data) and:
- insert into cache as Memory
- send data to tx
- write to disk
- update in cache as Disk
Spawning these tasks doesn't block us from returning the result of the byte range fetch, right, since the result is ready at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, fair point (i though the question was spawning the outgoing get_range call as well).
Yeah, that is true this can be spawned off to a separate task.
|
Closing for now in favor of #515. Can be re-visited later if needed. |
This PR introduces a new caching mechanism for fetching byte ranges:
Pendingcache value, containing a channel over which the actual bytes will be sent.Pendingvalue, it takes the receiver and waits on it with a timeout.Memoryvariant and trigger persisting to disk (Filevalue variant).The timeout is important to break some hanging issues I've noticed (notably, TPC-H SF10 q18 and q21).