Skip to content

feat: last cache implementation#25109

Merged
hiltontj merged 28 commits intomainfrom
hiltontj/lastcache-impl
Jul 9, 2024
Merged

feat: last cache implementation#25109
hiltontj merged 28 commits intomainfrom
hiltontj/lastcache-impl

Conversation

@hiltontj
Copy link
Copy Markdown
Contributor

@hiltontj hiltontj commented Jun 27, 2024

Part of #25093

Last-N-Value Cache Implementation

This PR introduces the core implementation for the Last-N-Value cache (see #25091 for detailed requirements).

At the highest level, the write buffer has access to a LastCacheProvider, which holds 0-to-many LastCaches for any database and table in the database. The API to the LastCacheProvider is fairly limited in this PR to a create API as well as an API to get RecordBatches out of a given cache. The latter is only for testing purposes, as querying the cache will be done via DataFusion trait implementations and need to be linked to the query executor in a future PR (see #25095).

Each LastCache has a hierarchical structure based on its key columns. @pauldix gave a good description of how this works in a comment below (see #25109 (review)). Each cache is a multi-level nested HashMap, where there is a level for each key column in the cache. At the lowest level, the value columns for each unique combination of key column values are stored in individual LastCacheStores, which use a ring buffer of size N to store the last N values for each column being cached.

Writes to the Cache

Writes to the cache are done from the write buffer flusher, after batches have been written to the WAL, and just prior to when they are written into segments in the write buffer. Only batches from the current/latest segment are written to the cache. Internally, the cache will only accept rows that have newer time stamps than the most recent value, and rows that do not include all key columns for a given cache will be ignored.

Time-to-live (TTL)

Each LastCache is constructed with a time-to-live (TTL), which represents the lifetime of elements in the cache. Elements that outlive this duration will be evicted. Currently, eviction is invoked on the write buffer's flush interval.

Not included in this PR

The only core requirement not satisfied by this PR is to have new fields be cached for caches that do have a specified list of value columns. That is, for caches that are created with the default for value columns, i.e., all non-key columns, as new fields are written to the table, those fields should be added to the cache, and back-filled with nulls.

I think this PR is pretty large at this point, so would like to add this in a follow-on PR.

hiltontj added 5 commits June 27, 2024 12:01
Each last cache holds a ring buffer for each column in an index map, which
preserves the insertion order for faster record batch production.

The ring buffer uses a custom type to handle the different supported
data types that we can have in the system.
LastCacheProvider is the API used to create last caches and write
table batches to them. It uses a two-layer RwLock/HashMap: the first for
the database, and the second layer for the table within the database.

This allows for table-level locks when writing in buffered data, and only
gets a database-level lock when creating a cache (and in future, when
removing them as well).
Added basic APIs on the write buffer to access the last cache and then a
test to the last_cache module to see that it works with a simple example
@hiltontj hiltontj self-assigned this Jun 27, 2024
Copy link
Copy Markdown
Member

@pauldix pauldix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments at specific points, but a broader comment here. I think there might be a little confusion about the desired behavior of the cache. An example might be helpful here and you can tell me if this structure will handle it, or if you're leaving it for later, or if this is clarifying.

Say we have a table foo with tags t1 and t2 and field f1. Then say we create a last cache on the table with key columns of [t1] and value columns of [t2, f1]. Then we have this data come in:

foo,t1=asdf,t2=bar f1=1 123
foo,t1=jkl,t2=bar f1=2 123
foo,t1=asdf,t2=xyz f1=3 444

The last value cache would two collections of last values, one for each unique value of t1 (the key column).

t1=asdf [(t2=bar f1=1 123), (t2=xyz, f1=3 444)]
t1=jkl [(t2=bar f1=2 123)]

Now say that we instead had a last value cache on key columns [t1, t2] and value columns [f1]. And then say we have this data instead:

foo,t1=asdf,t2=bar f1=1 123
foo,t1=asdf,t2=zoo f1=2 123
foo,t1=jkl,t2=bar f1=3 123
foo,t1=jkl,t2=bar f1=4 444

The last value cache would have three collections of last values, for each unique t1/t2 combination that we saw. Further, the relationship is hierarchical. So we'd have:

t1=asdf
              t2=bar [(f1=1 123)]
              t2=zoo [(f1=2 123)]
t1=jkl
              t2=bar [(f1=4 444)]

Note that it makes no sense to put the key columns in the value columns as every unique value in the key column will appear in the last cache. When we look up t1=asdf we should see there are two entries in there for t2 with bar and zoo.

So I would expect the last cache VecDqueues to be in a nested HashMap whose depth is the same as how ever many key columns were specified.

_key_columns: HashSet<String>,
schema: SchemaRef,
// use an IndexMap to preserve insertion order:
cache: IndexMap<String, CacheColumn>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why this is being used here. What's the insertion order being preserved here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I insert columns into the cache using the ordering of fields in the schema (here), then when producing a record batch out of the cache (here), IndexMap allows to iterate over the map directly while producing the correct order of columns for the schema.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The links in that comment look to be out-of-date. But I have added docs/comments in the code to help explain this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pauldix - while working on #25125 to enable caches that add new fields, I have found that the insertion order guarantee falls apart in scenarios where writes come in with different fields/orders for different key values. So, we can't rely on that, and I will be removing the comments about insertion order.

I still like the use of IndexMap because it gives fast iteration over keys, as well as fast lookups (see here), but if we want to avoid the dependency, or just optimize for lookup speed, then we could use a HashMap here.

@pauldix
Copy link
Copy Markdown
Member

pauldix commented Jun 28, 2024

I also realized while writing out the PR feedback that there's another setting we'll need for the last values cache, which is an age out. Since it'll keep last values caches for each unique key column combination seen, it means that for ephemeral key column values (i.e. ephemeral series), they'll continue taking up space in the cache until they're cleared out.

So that duration for timeout should be an additional parameter on the settings (like count). A sensible default might be 4 hours. So every once in a while, the last value cache should be walked so that any key set that hasn't seen a new value in that time is cleared from the cache completely.

@hiltontj
Copy link
Copy Markdown
Contributor Author

@pauldix thank you for the feedback!

I think there might be a little confusion about the desired behavior of the cache. An example might be helpful here and you can tell me if this structure will handle it, or if you're leaving it for later, or if this is clarifying.

My understanding was definitely off, especially w.r.t. the key columns (literally keys in the cache), but your example clears that up for me. I can re-work the implementation to satisfy that behaviour.

@hiltontj
Copy link
Copy Markdown
Contributor Author

I also realized while writing out the PR feedback that there's another setting we'll need for the last values cache, which is an age out. [...]

Good call, I can add this to the requirements in the related issues.

Addressed three parts of PR feedback:

1. Remove double-lock on cache map
2. Re-order the get when writing to the cache to be outside the loop
3. Move the time check into the cache itself
hiltontj added 7 commits July 3, 2024 10:48
This refactors the last cache to use a nested caching structure, where
the key columns for a given cache are used to create a hierarchy of
nested maps, terminating in the actual store for the values in the cache.

Access to the cache is done via a set of predicates which can optionally
specify the key column values at any level in the cache hierarchy to only
gather record batches from children of that node in the cache.

Some todos:
- Need to handle the TTL
- Need to move the TableProvider impl up to the LastCache type
This re-writes the datafusion TableProvider implementation on the correct
type, i.e., the LastCache, and adds conversion from the filter Expr's to
the Predicate type for the cache.
Last caches will have expired entries walked when writes come in.
Changed key columns so that they do not accept null values, i.e., rows
that are pushed that are missing key column values will be ignored.

When producing record batches for a cache, if not all key columns are
used in the predicate, then this change makes it so that the non-predicate
key columns are produced as columns in the outputted record batches.

A test with a few cases showing this was added.
Ensure key columns in the last cache that are not included in the
predicate are emitted in the RecordBatches as a column.

Cleaned up and added comments to the new test.
@hiltontj
Copy link
Copy Markdown
Contributor Author

hiltontj commented Jul 3, 2024

My recent set of commits addresses pretty well all of the feedback so far. I refactored the cache to have a hierarchical structure based on the key columns specified, and those are used to evaluate predicates when producing record batches.

I still have several things to do, however:

Implementation

  • Handle null values for non-key columns in the cache
  • Handle * caches, i.e., those that have new fields added when they are written after the cache has been initialized with a certain set of value columns (see Support adding new fields to a cache #25124)

Testing

  • Using fields as key columns
  • Checking the series key is used as default key columns (or the tag set) when key columns are not specified
  • Test with caches bigger than the default size (1)
  • Test cache TTLs
  • Test when using invalid predicates, i.e., for columns that are not key columns or that do not exist, or for column values that do not exist
  • Test null values in the cache
  • Test adding new fields to * caches (See Support adding new fields to a cache #25124)

Documentation

  • Need to add a lot of Rust doc comments
  • Update PR description with overview of the refactored design

@hiltontj hiltontj force-pushed the hiltontj/lastcache-impl branch from 9d1db84 to 979f717 Compare July 3, 2024 20:50
hiltontj added 3 commits July 4, 2024 11:11
Added two tests, as per commit title. Also moved the eviction process
to a separate function so that it was not being done on every write to
the cache, which could be expensive, and this ensures that entries are
evicted regardless of whether writes are coming in or not.
@hiltontj hiltontj force-pushed the hiltontj/lastcache-impl branch from 00bf6cd to eb6ed24 Compare July 4, 2024 17:38
/// Newtype wrapper around a [`VecDeque`] whose values are paired with an [`Instant`] that
/// represents the time at which each value was pushed into the buffer.
#[derive(Debug)]
struct ColumnBuffer<T>(VecDeque<(Instant, T)>);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that since all the column buffers for a cache get new values pushed at the same time, a separate VecDeque of Instants could be stored, vs. having to store an Instant next to each value in each column buffer. Each Instant is 16 bytes (see here).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should only be storing a single instant for each row in the last cache.

@hiltontj hiltontj marked this pull request as ready for review July 5, 2024 17:39
@hiltontj hiltontj requested review from mgattozzi and pauldix July 5, 2024 17:39
Copy link
Copy Markdown
Member

@pauldix pauldix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few comments, but they can be addressed in a follow on PR. Otherwise looks really good!

format!("{tbl_name}_{keys}_last_cache", keys = key_columns.join("_"))
});

// reject creation if there is already a cache with specified database, table, and cache name
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior I like to see on creation of things is that if the user tries to create something with all the same arguments, it returns ok, if the arguments are different, then it returns with an error that it already exists. This might be handled higher up the stack in the API layer, but I'm not sure that this error would give the caller enough information to make the determination.

The reason I opt for this behavior is that it makes automation easier where they can call create as many times as they want and it'll always succeed as long as the settings are the same.

.find(|f| f.name == *key)
.map(|f| KeyValue::from(&f.value))
else {
// ignore the row if it does not contain all key columns
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to represent null key column values in the structure? I'm not sure about this yet though. Something to think about and discuss.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first refactored this, I had the LastCacheKey hold a HashMap<Option<KeyValue>, LastCacheState> to handle null key values, but switched to this behaviour to simplify it.

I think It just needs to do that, i.e., use Option<KeyValue> instead of KeyValue, and then store the datatype in the LastCacheKey, because 1) key columns will always have a fixed data type, and 2) you can't rely on the KeyValue alone to get the data type when it is None (for creating RecordBatches).

I can open up an issue for this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, still not totally sure we want to bother with this, so maybe log an issue to see if anyone cares. For now I'd leave it as is.

/// Newtype wrapper around a [`VecDeque`] whose values are paired with an [`Instant`] that
/// represents the time at which each value was pushed into the buffer.
#[derive(Debug)]
struct ColumnBuffer<T>(VecDeque<(Instant, T)>);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should only be storing a single instant for each row in the last cache.

fn remove_expired(&mut self) {
self.value_map
.iter_mut()
.for_each(|(_, m)| m.remove_expired());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all rows have been expired, this key should be removed from the map. Otherwise, key values that stop sending data (think ephemeral things like container id, etc) will blow up the size of the cache over time with a bunch of entry map entries. (I'm assuming I'm reading this correctly and only the values are getting removed).

This should be true walking up the tree. Each key/value should be removed from the map if all children are empty.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call, this only removes the values and is not walking up and cleaning up the maps. I'll address that with the other immediate issues in a follow-on PR.

@hiltontj
Copy link
Copy Markdown
Contributor Author

hiltontj commented Jul 9, 2024

Going to open a follow-on PR to address the following:

  • Make the cache creation more idempotent, i.e., do not fail when attempting to create a cache that already exists with the same parameters
  • Store a single set of Instants for each cache, vs. an Instant beside each value in each column buffer
  • Clean up the key column maps when their descendants are empty

@hiltontj hiltontj merged commit 53e5c5f into main Jul 9, 2024
@hiltontj hiltontj deleted the hiltontj/lastcache-impl branch July 9, 2024 19:22
@hiltontj
Copy link
Copy Markdown
Contributor Author

hiltontj commented Jul 9, 2024

@mgattozzi - I am going to merge this and #25125 to keep things rolling, but if you have any feedback I am happy to open follow-on PRs to address.

@mgattozzi
Copy link
Copy Markdown
Contributor

Yeah keep things rolling. I haven't had time to look but I'm not worried about your implementations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants