Summary
Store the data in the datastore as dense chunks to drastically reduce the cost of ingestion, as well as the RAM overhead, but at the cost of slower queries in the unhappy path (but amortized by a query cache).
For the happy-path (in-order data) queries should be about as fast as the current store.
For the unhappy path (out-of-order data) we have a parameter to tweak to either optimize for query time or memory use. By default we optimize for the latter, and rely on the query cache to amortize the former.
It should be pretty straight-forward to implement, taking maybe 2-4 weeks.
Details
The proposal is to arrange the data into dense chunks (similar to e.g. Parquet).
Each chunk contains N rows of data for a single Entity.
- Each temporal batch becomes a single chunk
- The SDK-side batcher outputs chunks (e.g. every 8ms)
- The
DataStore contains an index of chunks
During ingestion, each chunk is just inserted straight into the DataStore, which keeps a per-timeline index of the chunks.
The chunks can have overlapping time ranges. Each chunk can also be unsorted. This is what makes queries slower.
Because chunks can overlap in time, we need to use something like a interval tree to index them. We should still have O(log N) insertions and queries into this tree.
In the future we can add compaction and chunk splitting to the data store in order to optimize storage and/or lookups, but we don't need it for the first iteration.
The SDK-side batcher appends data to a chunk until one of the conditions is met:
- The chunk is old enough (e.g. 8ms, our current default)
- Its get too large (e.g. >1MB)
- If any timeline is unsorted (out-of-order), we split after ~256 rows to minimize costs of linear searches during query time
- If the chunk is large and an insertion would make a timeline unsorted, split before the new insert
- If a timeline has been added or removed
- If any component has a different datatype (e.g. it was a
TensorData, but is now a Promise)
Pseudo-code
struct DataStore {
entity_streams: Map<EntityPath, EntityStream>
}
struct EntityStream {
/// Each chunk is found at most once on each timeline.
timelines: Map<TimeLine, IntervalTree<TimeInt, ChunkId>>,
chunks: Map<ChunkId, Arc<Chunk>>,
}
/// Dense storage of N rows of data for a specific EntityPath
struct Chunk {
// Always sorted.
row_ids: Vec<RowId>,
// The time columns.
timelines: Map<TimeLine, ChunkTimeline>,
/// Each `Box<dyn Array>` is effectively a `Vec<Option<Vec<T>>>` backed as raw Arrow data.
/// The outer option here is so that we can log only `Position` one time, but not a `Color`.
/// The `dyn Array` also holds the data type.
components: BTreeMap<ComponentName, Box<dyn Array>>,
}
struct ChunkTimeline {
/// Hopefully sorted, but not necessarily.
times: Vec<TimeInt>,
/// Is [`Self::times`] sorted?
sorted: bool,
/// (min, max) of [`Self::times`]
range: TImeRange,
// Future improvement: keep a shuffle-index if unsorted
}
---------------------------------------------------
type RangeQueryOutput = Vec<(Arc<Bucket>, Range<usize>)>;
impl EntityStream {
fn range_query(&self, component: ComponentName, timeline: TimelineName, range: TimeRange) -> RangeQueryOutput {
self.timelines[timeline].overlaps(range)
.flat_map(|chunk_id| self.chunks[chunk_id].range_query(component, range))
.sorted_by(|(_, range)| range.start)
}
}
impl Chunk {
fn range_query(&Arc<self>, component: ComponentName, timeline: TimelineName, range: TimeRange) -> RangeQueryOutput {
let timeline = &self.timelines[timeline];
if timeline.sorted {
let row_range = timeline.times.binary_search(range);
vec![(self.clone, row_range)]
} else {
// slow linear search:
self.times.iter().enumerate()
.filter_map(|(row_idx, row_time)|{
range.contains(row_time).then(|| (self.clone(), row_idx..=row_idx))
}).collect()
}
}
}
Future improvements
- More efficient encoding of
RowId and TimeInt, using some sort of time-series encoding, e.g. delta-encoding + RLE.
- Chunk compaction in the
DataStore
Summary
Store the data in the datastore as dense chunks to drastically reduce the cost of ingestion, as well as the RAM overhead, but at the cost of slower queries in the unhappy path (but amortized by a query cache).
For the happy-path (in-order data) queries should be about as fast as the current store.
For the unhappy path (out-of-order data) we have a parameter to tweak to either optimize for query time or memory use. By default we optimize for the latter, and rely on the query cache to amortize the former.
It should be pretty straight-forward to implement, taking maybe 2-4 weeks.
Details
The proposal is to arrange the data into dense chunks (similar to e.g. Parquet).
Each chunk contains N rows of data for a single Entity.
DataStorecontains an index of chunksDuring ingestion, each chunk is just inserted straight into the
DataStore, which keeps a per-timeline index of the chunks.The chunks can have overlapping time ranges. Each chunk can also be unsorted. This is what makes queries slower.
Because chunks can overlap in time, we need to use something like a interval tree to index them. We should still have
O(log N)insertions and queries into this tree.In the future we can add compaction and chunk splitting to the data store in order to optimize storage and/or lookups, but we don't need it for the first iteration.
The SDK-side batcher appends data to a chunk until one of the conditions is met:
TensorData, but is now aPromise)Pseudo-code
Future improvements
RowIdandTimeInt, using some sort of time-series encoding, e.g. delta-encoding + RLE.DataStore