Skip to content

Conversation

@gruuya
Copy link
Contributor

@gruuya gruuya commented Apr 15, 2024

This PR introduces a new caching mechanism for fetching byte ranges:

  • If chunk not present in cache, greedily extend the range to fetch by coalescing adjacent chunks that are also missing from the cache, so as to minimize the number of outgoing requests.
  • For each such chunk put a Pending cache value, containing a channel over which the actual bytes will be sent.
  • If a task runs into the Pending value, it takes the receiver and waits on it with a timeout.
  • Once no more chunks can be coalesced issue a get_range request for the extended range.
  • If that errors out send the error value to any awaiting task, otherwise cache the bytes as Memory variant and trigger persisting to disk (File value variant).

The timeout is important to break some hanging issues I've noticed (notably, TPC-H SF10 q18 and q21).

@gruuya gruuya requested a review from mildbyte April 15, 2024 10:47
// Entry is fresh; we need to evaluate it in the upcoming batch...
chunk_batches.push((key.clone(), tx));
// ... and also create a new sender/receiver for the next pending entry
(tx, rx) = channel(Ok(Bytes::new()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We end up with one orphan channel here at the end that's never used, right? (since we create one in the beginning, assign it if we need to do a fetch + create a new one for the next fetch)

If it's one channel per chunk, do we hit weird scalability issues if we're trying to download a few thousand 1MB ranges?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct on the orphan channel comment, but it would add even more complexity in trying to avoid it.

Not sure about scalability issues, haven't tested it against anything more involved than TPC-H SF10

Comment on lines +330 to +332
// Evaluate the accumulated chunk batch if either the current entry is not fresh
// (meaning we can not extend the batch anymore and keep it continuous), or
// we've reached the last chunk (meaning there are not more chunks to be batched).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, possible that DF already does this pre-coalescing for us (https://github.com/apache/arrow-rs/blob/a999fb86764e9310bb4822c7e7c6551f247e0e0b/object_store/src/lib.rs#L573), but we'll need to look at some read stats to figure out what we need to do besides just providing a cache

let file_manager = self.file_manager.clone();
tokio::task::spawn(async move {
// Run pending tasks to avert eviction races.
cache.run_pending_tasks().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be some kind of a critical section/lock? Intuitively, we're in a concurrent context and so whatever state we put the cache in after this run_pending_tasks might not stay the same when we reach the next line. What does the eviction race look like here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the eviction race look like here?

It looks like:

  • moka starting the eviction future
  • a new entry comes in and gets persisted to disk
  • the eviction future completes and deletes the above new entry (but the file pointer in the cache stays)
  • the cached file read errors out

There are some nuances above (different outcomes depending on whether we error out on writing the cache file or not).

// there should be no need to wrap them in an `Arc`.
result.put(batch_data.clone());

// TODO: execute this in a separate task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this -> memory -> disk state transition is in a separate task, we don't need another series of task::spawn to do the disk writes, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do, since we'd need to join/await on the byte range fetches before returning and don't want to block on the file writes as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? I thought this meant we'd spawn multiple tasks that all have a (key, tx, shared pointer to batch_data) and:

  • insert into cache as Memory
  • send data to tx
  • write to disk
  • update in cache as Disk

Spawning these tasks doesn't block us from returning the result of the byte range fetch, right, since the result is ready at this point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair point (i though the question was spawning the outgoing get_range call as well).

Yeah, that is true this can be spawned off to a separate task.

@gruuya
Copy link
Contributor Author

gruuya commented Apr 16, 2024

Closing for now in favor of #515.

Can be re-visited later if needed.

@gruuya gruuya closed this Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants