Skip to content

Commit b3b1940

Browse files
feat(db,electric,query): separate cursor expressions for flexible pagination (#960)
* feat(db,electric,query): separate cursor expressions from where clause in loadSubset - Add CursorExpressions type with whereFrom, whereCurrent, and lastKey - LoadSubsetOptions.where no longer includes cursor - passed separately via cursor property - Add offset to LoadSubsetOptions for offset-based pagination support - Electric sync layer makes two parallel requestSnapshot calls when cursor present - Query collection serialization includes offset for query key generation This allows sync layers to choose between cursor-based or offset-based pagination, and Electric can efficiently handle tie-breaking with targeted requests. test(react-db): update useLiveInfiniteQuery test mock to handle cursor expressions The test mock's loadSubset handler now handles the new cursor property in LoadSubsetOptions by combining whereCurrent (ties) and whereFrom (next page) data, deduplicating by id, and re-sorting. fix(electric): make cursor requestSnapshot calls sequential Changed parallel requestSnapshot calls to sequential to avoid potential issues with concurrent snapshot requests that may cause timeouts in CI. fix(electric): combine cursor expressions into single requestSnapshot Instead of making two separate requestSnapshot calls (one for whereFrom, one for whereCurrent), combine them using OR into a single request. This avoids potential issues with multiple sequential snapshot requests that were causing timeouts in CI. The combined expression (whereFrom OR whereCurrent) matches the original behavior where cursor was combined with the where clause. wip working? update changeset fix query test * update docs * ci: apply automated fixes * fixups --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent e418426 commit b3b1940

17 files changed

Lines changed: 777 additions & 131 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
---
2+
'@tanstack/db': patch
3+
'@tanstack/electric-db-collection': patch
4+
'@tanstack/query-db-collection': patch
5+
---
6+
7+
Enhanced LoadSubsetOptions with separate cursor expressions and offset for flexible pagination.
8+
9+
**⚠️ Breaking Change for Custom Sync Layers / Query Collections:**
10+
11+
`LoadSubsetOptions.where` no longer includes cursor expressions for pagination. If you have a custom sync layer or query collection that implements `loadSubset`, you must now handle pagination separately:
12+
13+
- **Cursor-based pagination:** Use the new `cursor` property (`cursor.whereFrom` and `cursor.whereCurrent`) and combine them with `where` yourself
14+
- **Offset-based pagination:** Use the new `offset` property
15+
16+
Previously, cursor expressions were baked into the `where` clause. Now they are passed separately so sync layers can choose their preferred pagination strategy.
17+
18+
**Changes:**
19+
20+
- Added `CursorExpressions` type with `whereFrom`, `whereCurrent`, and optional `lastKey` properties
21+
- Added `cursor` to `LoadSubsetOptions` for cursor-based pagination (separate from `where`)
22+
- Added `offset` to `LoadSubsetOptions` for offset-based pagination support
23+
- Electric sync layer now makes two parallel `requestSnapshot` calls when cursor is present:
24+
- One for `whereCurrent` (all ties at boundary, no limit)
25+
- One for `whereFrom` (rows after cursor, with limit)
26+
- Query collection serialization now includes `offset` for query key generation
27+
- Added `truncate` event to collections, emitted when synced data is truncated (e.g., after `must-refetch`)
28+
- Fixed `setWindow` pagination: cursor expressions are now correctly built when paging through results
29+
- Fixed offset tracking: `loadNextItems` now passes the correct window offset to prevent incorrect deduplication
30+
- `CollectionSubscriber` now listens for `truncate` events to reset cursor tracking state
31+
32+
**Benefits:**
33+
34+
- Sync layers can choose between cursor-based or offset-based pagination strategies
35+
- Electric can efficiently handle tie-breaking with two targeted requests
36+
- Better separation of concerns between filtering (`where`) and pagination (`cursor`/`offset`)
37+
- `setWindow` correctly triggers backend loading for subsequent pages in multi-column orderBy queries
38+
- Cursor state is properly reset after truncation, preventing stale cursor data from being used

docs/collections/query-collection.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ All direct write methods are available on `collection.utils`:
521521

522522
## QueryFn and Predicate Push-Down
523523

524-
When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, and limit) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset.
524+
When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, limit, and offset) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset.
525525

526526
### How LoadSubsetOptions Are Passed
527527

@@ -530,9 +530,13 @@ LoadSubsetOptions are passed to your `queryFn` via the query context's `meta` pr
530530
```typescript
531531
queryFn: async (ctx) => {
532532
// Extract LoadSubsetOptions from the context
533-
const { limit, where, orderBy } = ctx.meta.loadSubsetOptions
533+
const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions
534534

535535
// Use these to fetch only the data you need
536+
// - where: filter expression (AST)
537+
// - orderBy: sort expression (AST)
538+
// - limit: maximum number of rows
539+
// - offset: number of rows to skip (for pagination)
536540
// ...
537541
}
538542
```
@@ -572,7 +576,7 @@ const productsCollection = createCollection(
572576
syncMode: 'on-demand', // Enable predicate push-down
573577

574578
queryFn: async (ctx) => {
575-
const { limit, where, orderBy } = ctx.meta.loadSubsetOptions
579+
const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions
576580

577581
// Parse the expressions into simple format
578582
const parsed = parseLoadSubsetOptions({ where, orderBy, limit })
@@ -605,6 +609,11 @@ const productsCollection = createCollection(
605609
params.set('limit', String(parsed.limit))
606610
}
607611

612+
// Add offset for pagination
613+
if (offset) {
614+
params.set('offset', String(offset))
615+
}
616+
608617
const response = await fetch(`/api/products?${params}`)
609618
return response.json()
610619
},
@@ -629,6 +638,7 @@ const affordableElectronics = createLiveQueryCollection({
629638

630639
// This triggers a queryFn call with:
631640
// GET /api/products?category=electronics&price_lt=100&sort=price:asc&limit=10
641+
// When paginating, offset is included: &offset=20
632642
```
633643

634644
### Custom Handlers for Complex APIs
@@ -731,10 +741,11 @@ queryFn: async (ctx) => {
731741
Convenience function that parses all LoadSubsetOptions at once. Good for simple use cases.
732742

733743
```typescript
734-
const { filters, sorts, limit } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions)
744+
const { filters, sorts, limit, offset } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions)
735745
// filters: [{ field: ['category'], operator: 'eq', value: 'electronics' }]
736746
// sorts: [{ field: ['price'], direction: 'asc', nulls: 'last' }]
737747
// limit: 10
748+
// offset: 20 (for pagination)
738749
```
739750

740751
#### `parseWhereExpression(expr, options)`

packages/db/src/collection/events.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,19 @@ export interface CollectionLoadingSubsetChangeEvent {
4343
loadingSubsetTransition: `start` | `end`
4444
}
4545

46+
/**
47+
* Event emitted when the collection is truncated (all data cleared)
48+
*/
49+
export interface CollectionTruncateEvent {
50+
type: `truncate`
51+
collection: Collection<any, any, any, any, any>
52+
}
53+
4654
export type AllCollectionEvents = {
4755
'status:change': CollectionStatusChangeEvent
4856
'subscribers:change': CollectionSubscribersChangeEvent
4957
'loadingSubset:change': CollectionLoadingSubsetChangeEvent
58+
truncate: CollectionTruncateEvent
5059
} & {
5160
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent<K>
5261
}
@@ -56,6 +65,7 @@ export type CollectionEvent =
5665
| CollectionStatusChangeEvent
5766
| CollectionSubscribersChangeEvent
5867
| CollectionLoadingSubsetChangeEvent
68+
| CollectionTruncateEvent
5969

6070
export type CollectionEventHandler<T extends keyof AllCollectionEvents> = (
6171
event: AllCollectionEvents[T],

packages/db/src/collection/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ export class CollectionImpl<
365365
lifecycle: this._lifecycle,
366366
changes: this._changes,
367367
indexes: this._indexes,
368+
events: this._events,
368369
})
369370
this._sync.setDeps({
370371
collection: this, // Required for passing to config.sync callback

packages/db/src/collection/state.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type { CollectionImpl } from './index.js'
1111
import type { CollectionLifecycleManager } from './lifecycle'
1212
import type { CollectionChangesManager } from './changes'
1313
import type { CollectionIndexesManager } from './indexes'
14+
import type { CollectionEventsManager } from './events'
1415

1516
interface PendingSyncedTransaction<
1617
T extends object = Record<string, unknown>,
@@ -37,6 +38,7 @@ export class CollectionStateManager<
3738
public lifecycle!: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
3839
public changes!: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
3940
public indexes!: CollectionIndexesManager<TOutput, TKey, TSchema, TInput>
41+
private _events!: CollectionEventsManager
4042

4143
// Core state - make public for testing
4244
public transactions: SortedMap<string, Transaction<any>>
@@ -79,11 +81,13 @@ export class CollectionStateManager<
7981
lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
8082
changes: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
8183
indexes: CollectionIndexesManager<TOutput, TKey, TSchema, TInput>
84+
events: CollectionEventsManager
8285
}) {
8386
this.collection = deps.collection
8487
this.lifecycle = deps.lifecycle
8588
this.changes = deps.changes
8689
this.indexes = deps.indexes
90+
this._events = deps.events
8791
}
8892

8993
/**
@@ -525,6 +529,12 @@ export class CollectionStateManager<
525529
for (const key of changedKeys) {
526530
currentVisibleState.delete(key)
527531
}
532+
533+
// 4) Emit truncate event so subscriptions can reset their cursor tracking state
534+
this._events.emit(`truncate`, {
535+
type: `truncate`,
536+
collection: this.collection,
537+
})
528538
}
529539

530540
for (const operation of transaction.operations) {

packages/db/src/collection/subscription.ts

Lines changed: 64 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ type RequestLimitedSnapshotOptions = {
3434
limit: number
3535
/** All column values for cursor (first value used for local index, all values for sync layer) */
3636
minValues?: Array<unknown>
37+
/** Row offset for offset-based pagination (passed to sync layer) */
38+
offset?: number
3739
}
3840

3941
type CollectionSubscriptionOptions = {
@@ -63,6 +65,12 @@ export class CollectionSubscription
6365
// Keep track of the keys we've sent (needed for join and orderBy optimizations)
6466
private sentKeys = new Set<string | number>()
6567

68+
// Track the count of rows sent via requestLimitedSnapshot for offset-based pagination
69+
private limitedSnapshotRowCount = 0
70+
71+
// Track the last key sent via requestLimitedSnapshot for cursor-based pagination
72+
private lastSentKey: string | number | undefined
73+
6674
private filteredCallback: (changes: Array<ChangeMessage<any, any>>) => void
6775

6876
private orderByIndex: IndexInterface<string | number> | undefined
@@ -258,6 +266,7 @@ export class CollectionSubscription
258266
orderBy,
259267
limit,
260268
minValues,
269+
offset,
261270
}: RequestLimitedSnapshotOptions) {
262271
if (!limit) throw new Error(`limit is required`)
263272

@@ -354,77 +363,75 @@ export class CollectionSubscription
354363
keys = index.take(valuesNeeded(), biggestObservedValue, filterFn)
355364
}
356365

366+
// Track row count for offset-based pagination (before sending to callback)
367+
// Use the current count as the offset for this load
368+
const currentOffset = this.limitedSnapshotRowCount
369+
357370
this.callback(changes)
358371

359-
// Build the WHERE filter for sync layer loadSubset
360-
// buildCursor handles both single-column and multi-column cases
361-
let whereWithValueFilter = where
372+
// Update the row count and last key after sending (for next call's offset/cursor)
373+
this.limitedSnapshotRowCount += changes.length
374+
if (changes.length > 0) {
375+
this.lastSentKey = changes[changes.length - 1]!.key
376+
}
377+
378+
// Build cursor expressions for sync layer loadSubset
379+
// The cursor expressions are separate from the main where clause
380+
// so the sync layer can choose cursor-based or offset-based pagination
381+
let cursorExpressions:
382+
| {
383+
whereFrom: BasicExpression<boolean>
384+
whereCurrent: BasicExpression<boolean>
385+
lastKey?: string | number
386+
}
387+
| undefined
388+
362389
if (minValues !== undefined && minValues.length > 0) {
363-
const cursor = buildCursor(orderBy, minValues)
364-
if (cursor) {
365-
whereWithValueFilter = where ? and(where, cursor) : cursor
390+
const whereFromCursor = buildCursor(orderBy, minValues)
391+
392+
if (whereFromCursor) {
393+
const { expression } = orderBy[0]!
394+
const minValue = minValues[0]
395+
396+
// Build the whereCurrent expression for the first orderBy column
397+
// For Date values, we need to handle precision differences between JS (ms) and backends (μs)
398+
// A JS Date represents a 1ms range, so we query for all values within that range
399+
let whereCurrentCursor: BasicExpression<boolean>
400+
if (minValue instanceof Date) {
401+
const minValuePlus1ms = new Date(minValue.getTime() + 1)
402+
whereCurrentCursor = and(
403+
gte(expression, new Value(minValue)),
404+
lt(expression, new Value(minValuePlus1ms)),
405+
)
406+
} else {
407+
whereCurrentCursor = eq(expression, new Value(minValue))
408+
}
409+
410+
cursorExpressions = {
411+
whereFrom: whereFromCursor,
412+
whereCurrent: whereCurrentCursor,
413+
lastKey: this.lastSentKey,
414+
}
366415
}
367416
}
368417

369418
// Request the sync layer to load more data
370419
// don't await it, we will load the data into the collection when it comes in
371-
const loadOptions1: LoadSubsetOptions = {
372-
where: whereWithValueFilter,
420+
// Note: `where` does NOT include cursor expressions - they are passed separately
421+
// The sync layer can choose to use cursor-based or offset-based pagination
422+
const loadOptions: LoadSubsetOptions = {
423+
where, // Main filter only, no cursor
373424
limit,
374425
orderBy,
426+
cursor: cursorExpressions, // Cursor expressions passed separately
427+
offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset
375428
subscription: this,
376429
}
377-
const syncResult = this.collection._sync.loadSubset(loadOptions1)
430+
const syncResult = this.collection._sync.loadSubset(loadOptions)
378431

379432
// Track this loadSubset call
380-
this.loadedSubsets.push(loadOptions1)
381-
382-
// Make parallel loadSubset calls for values equal to minValue and values greater than minValue
383-
const promises: Array<Promise<void>> = []
384-
385-
// First promise: load all values equal to minValue
386-
if (typeof minValue !== `undefined`) {
387-
const { expression } = orderBy[0]!
388-
389-
// For Date values, we need to handle precision differences between JS (ms) and backends (μs)
390-
// A JS Date represents a 1ms range, so we query for all values within that range
391-
let exactValueFilter
392-
if (minValue instanceof Date) {
393-
const minValuePlus1ms = new Date(minValue.getTime() + 1)
394-
exactValueFilter = and(
395-
gte(expression, new Value(minValue)),
396-
lt(expression, new Value(minValuePlus1ms)),
397-
)
398-
} else {
399-
exactValueFilter = eq(expression, new Value(minValue))
400-
}
401-
402-
const loadOptions2: LoadSubsetOptions = {
403-
where: exactValueFilter,
404-
subscription: this,
405-
}
406-
const equalValueResult = this.collection._sync.loadSubset(loadOptions2)
407-
408-
// Track this loadSubset call
409-
this.loadedSubsets.push(loadOptions2)
410-
411-
if (equalValueResult instanceof Promise) {
412-
promises.push(equalValueResult)
413-
}
414-
}
415-
416-
// Second promise: load values greater than minValue
417-
if (syncResult instanceof Promise) {
418-
promises.push(syncResult)
419-
}
420-
421-
// Track the combined promise
422-
if (promises.length > 0) {
423-
const combinedPromise = Promise.all(promises).then(() => {})
424-
this.trackLoadSubsetPromise(combinedPromise)
425-
} else {
426-
this.trackLoadSubsetPromise(syncResult)
427-
}
433+
this.loadedSubsets.push(loadOptions)
434+
this.trackLoadSubsetPromise(syncResult)
428435
}
429436

430437
// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function

packages/db/src/query/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export {
6565
minusWherePredicates,
6666
isOrderBySubset,
6767
isLimitSubset,
68+
isOffsetLimitSubset,
6869
isPredicateSubset,
6970
} from './predicate-utils.js'
7071

0 commit comments

Comments
 (0)