Conversation
Each last cache holds a ring buffer for each column in an index map, which preserves the insertion order for faster record batch production. The ring buffer uses a custom type to handle the different supported data types that we can have in the system.
LastCacheProvider is the API used to create last caches and write table batches to them. It uses a two-layer RwLock/HashMap: the first for the database, and the second layer for the table within the database. This allows for table-level locks when writing in buffered data, and only gets a database-level lock when creating a cache (and in future, when removing them as well).
Added basic APIs on the write buffer to access the last cache and then a test to the last_cache module to see that it works with a simple example
pauldix
left a comment
There was a problem hiding this comment.
Some comments at specific points, but a broader comment here. I think there might be a little confusion about the desired behavior of the cache. An example might be helpful here and you can tell me if this structure will handle it, or if you're leaving it for later, or if this is clarifying.
Say we have a table foo with tags t1 and t2 and field f1. Then say we create a last cache on the table with key columns of [t1] and value columns of [t2, f1]. Then we have this data come in:
foo,t1=asdf,t2=bar f1=1 123
foo,t1=jkl,t2=bar f1=2 123
foo,t1=asdf,t2=xyz f1=3 444
The last value cache would two collections of last values, one for each unique value of t1 (the key column).
t1=asdf [(t2=bar f1=1 123), (t2=xyz, f1=3 444)]
t1=jkl [(t2=bar f1=2 123)]
Now say that we instead had a last value cache on key columns [t1, t2] and value columns [f1]. And then say we have this data instead:
foo,t1=asdf,t2=bar f1=1 123
foo,t1=asdf,t2=zoo f1=2 123
foo,t1=jkl,t2=bar f1=3 123
foo,t1=jkl,t2=bar f1=4 444
The last value cache would have three collections of last values, for each unique t1/t2 combination that we saw. Further, the relationship is hierarchical. So we'd have:
t1=asdf
t2=bar [(f1=1 123)]
t2=zoo [(f1=2 123)]
t1=jkl
t2=bar [(f1=4 444)]
Note that it makes no sense to put the key columns in the value columns as every unique value in the key column will appear in the last cache. When we look up t1=asdf we should see there are two entries in there for t2 with bar and zoo.
So I would expect the last cache VecDqueues to be in a nested HashMap whose depth is the same as how ever many key columns were specified.
influxdb3_write/src/last_cache.rs
Outdated
| _key_columns: HashSet<String>, | ||
| schema: SchemaRef, | ||
| // use an IndexMap to preserve insertion order: | ||
| cache: IndexMap<String, CacheColumn>, |
There was a problem hiding this comment.
It's not clear to me why this is being used here. What's the insertion order being preserved here?
There was a problem hiding this comment.
The links in that comment look to be out-of-date. But I have added docs/comments in the code to help explain this.
There was a problem hiding this comment.
@pauldix - while working on #25125 to enable caches that add new fields, I have found that the insertion order guarantee falls apart in scenarios where writes come in with different fields/orders for different key values. So, we can't rely on that, and I will be removing the comments about insertion order.
I still like the use of IndexMap because it gives fast iteration over keys, as well as fast lookups (see here), but if we want to avoid the dependency, or just optimize for lookup speed, then we could use a HashMap here.
|
I also realized while writing out the PR feedback that there's another setting we'll need for the last values cache, which is an age out. Since it'll keep last values caches for each unique key column combination seen, it means that for ephemeral key column values (i.e. ephemeral series), they'll continue taking up space in the cache until they're cleared out. So that duration for timeout should be an additional parameter on the settings (like count). A sensible default might be 4 hours. So every once in a while, the last value cache should be walked so that any key set that hasn't seen a new value in that time is cleared from the cache completely. |
|
@pauldix thank you for the feedback!
My understanding was definitely off, especially w.r.t. the key columns (literally keys in the cache), but your example clears that up for me. I can re-work the implementation to satisfy that behaviour. |
Good call, I can add this to the requirements in the related issues. |
Addressed three parts of PR feedback: 1. Remove double-lock on cache map 2. Re-order the get when writing to the cache to be outside the loop 3. Move the time check into the cache itself
This refactors the last cache to use a nested caching structure, where the key columns for a given cache are used to create a hierarchy of nested maps, terminating in the actual store for the values in the cache. Access to the cache is done via a set of predicates which can optionally specify the key column values at any level in the cache hierarchy to only gather record batches from children of that node in the cache. Some todos: - Need to handle the TTL - Need to move the TableProvider impl up to the LastCache type
This re-writes the datafusion TableProvider implementation on the correct type, i.e., the LastCache, and adds conversion from the filter Expr's to the Predicate type for the cache.
Last caches will have expired entries walked when writes come in.
Changed key columns so that they do not accept null values, i.e., rows that are pushed that are missing key column values will be ignored. When producing record batches for a cache, if not all key columns are used in the predicate, then this change makes it so that the non-predicate key columns are produced as columns in the outputted record batches. A test with a few cases showing this was added.
Ensure key columns in the last cache that are not included in the predicate are emitted in the RecordBatches as a column. Cleaned up and added comments to the new test.
|
My recent set of commits addresses pretty well all of the feedback so far. I refactored the cache to have a hierarchical structure based on the key columns specified, and those are used to evaluate predicates when producing record batches. I still have several things to do, however: Implementation
Testing
Documentation
|
9d1db84 to
979f717
Compare
Added two tests, as per commit title. Also moved the eviction process to a separate function so that it was not being done on every write to the cache, which could be expensive, and this ensures that entries are evicted regardless of whether writes are coming in or not.
00bf6cd to
eb6ed24
Compare
CacheAlreadyExists errors were only being based on the database and table names, and not including the cache names, which was not correct.
This also adds explicit support for series key columns to distinguish them from normal tags in terms of nullability A test was added to check nulls work
| /// Newtype wrapper around a [`VecDeque`] whose values are paired with an [`Instant`] that | ||
| /// represents the time at which each value was pushed into the buffer. | ||
| #[derive(Debug)] | ||
| struct ColumnBuffer<T>(VecDeque<(Instant, T)>); |
There was a problem hiding this comment.
I think that since all the column buffers for a cache get new values pushed at the same time, a separate VecDeque of Instants could be stored, vs. having to store an Instant next to each value in each column buffer. Each Instant is 16 bytes (see here).
There was a problem hiding this comment.
Yeah, we should only be storing a single instant for each row in the last cache.
pauldix
left a comment
There was a problem hiding this comment.
Just a few comments, but they can be addressed in a follow on PR. Otherwise looks really good!
| format!("{tbl_name}_{keys}_last_cache", keys = key_columns.join("_")) | ||
| }); | ||
|
|
||
| // reject creation if there is already a cache with specified database, table, and cache name |
There was a problem hiding this comment.
The behavior I like to see on creation of things is that if the user tries to create something with all the same arguments, it returns ok, if the arguments are different, then it returns with an error that it already exists. This might be handled higher up the stack in the API layer, but I'm not sure that this error would give the caller enough information to make the determination.
The reason I opt for this behavior is that it makes automation easier where they can call create as many times as they want and it'll always succeed as long as the settings are the same.
| .find(|f| f.name == *key) | ||
| .map(|f| KeyValue::from(&f.value)) | ||
| else { | ||
| // ignore the row if it does not contain all key columns |
There was a problem hiding this comment.
We might want to represent null key column values in the structure? I'm not sure about this yet though. Something to think about and discuss.
There was a problem hiding this comment.
When I first refactored this, I had the LastCacheKey hold a HashMap<Option<KeyValue>, LastCacheState> to handle null key values, but switched to this behaviour to simplify it.
I think It just needs to do that, i.e., use Option<KeyValue> instead of KeyValue, and then store the datatype in the LastCacheKey, because 1) key columns will always have a fixed data type, and 2) you can't rely on the KeyValue alone to get the data type when it is None (for creating RecordBatches).
I can open up an issue for this.
There was a problem hiding this comment.
Yeah, still not totally sure we want to bother with this, so maybe log an issue to see if anyone cares. For now I'd leave it as is.
| /// Newtype wrapper around a [`VecDeque`] whose values are paired with an [`Instant`] that | ||
| /// represents the time at which each value was pushed into the buffer. | ||
| #[derive(Debug)] | ||
| struct ColumnBuffer<T>(VecDeque<(Instant, T)>); |
There was a problem hiding this comment.
Yeah, we should only be storing a single instant for each row in the last cache.
| fn remove_expired(&mut self) { | ||
| self.value_map | ||
| .iter_mut() | ||
| .for_each(|(_, m)| m.remove_expired()); |
There was a problem hiding this comment.
If all rows have been expired, this key should be removed from the map. Otherwise, key values that stop sending data (think ephemeral things like container id, etc) will blow up the size of the cache over time with a bunch of entry map entries. (I'm assuming I'm reading this correctly and only the values are getting removed).
This should be true walking up the tree. Each key/value should be removed from the map if all children are empty.
There was a problem hiding this comment.
Yes, good call, this only removes the values and is not walking up and cleaning up the maps. I'll address that with the other immediate issues in a follow-on PR.
|
Going to open a follow-on PR to address the following:
|
|
@mgattozzi - I am going to merge this and #25125 to keep things rolling, but if you have any feedback I am happy to open follow-on PRs to address. |
|
Yeah keep things rolling. I haven't had time to look but I'm not worried about your implementations. |
Part of #25093
Last-N-Value Cache Implementation
This PR introduces the core implementation for the Last-N-Value cache (see #25091 for detailed requirements).
At the highest level, the write buffer has access to a
LastCacheProvider, which holds 0-to-manyLastCaches for any database and table in the database. The API to theLastCacheProvideris fairly limited in this PR to a create API as well as an API to getRecordBatches out of a given cache. The latter is only for testing purposes, as querying the cache will be done via DataFusion trait implementations and need to be linked to the query executor in a future PR (see #25095).Each
LastCachehas a hierarchical structure based on its key columns. @pauldix gave a good description of how this works in a comment below (see #25109 (review)). Each cache is a multi-level nestedHashMap, where there is a level for each key column in the cache. At the lowest level, the value columns for each unique combination of key column values are stored in individualLastCacheStores, which use a ring buffer of size N to store the last N values for each column being cached.Writes to the Cache
Writes to the cache are done from the write buffer flusher, after batches have been written to the WAL, and just prior to when they are written into segments in the write buffer. Only batches from the current/latest segment are written to the cache. Internally, the cache will only accept rows that have newer time stamps than the most recent value, and rows that do not include all key columns for a given cache will be ignored.
Time-to-live (TTL)
Each
LastCacheis constructed with a time-to-live (TTL), which represents the lifetime of elements in the cache. Elements that outlive this duration will be evicted. Currently, eviction is invoked on the write buffer's flush interval.Not included in this PR
The only core requirement not satisfied by this PR is to have new fields be cached for caches that do have a specified list of value columns. That is, for caches that are created with the default for value columns, i.e., all non-key columns, as new fields are written to the table, those fields should be added to the cache, and back-filled with
nulls.I think this PR is pretty large at this point, so would like to add this in a follow-on PR.