Skip to content

KV Store / Archival indexer refactor + schema changes#25143

Merged
nickvikeras merged 7 commits intomainfrom
nickv/kv-pipelines
Feb 3, 2026
Merged

KV Store / Archival indexer refactor + schema changes#25143
nickvikeras merged 7 commits intomainfrom
nickv/kv-pipelines

Conversation

@nickvikeras
Copy link
Copy Markdown
Contributor

@nickvikeras nickvikeras commented Jan 28, 2026

Description

  • Splitting into pipeline-per-table
  • Making (backward-compatible) schema changes
    • Write txn signature and data as separate columns (keep writing old column)
    • Write epoch start/end as separate columns (keep writing old column)
    • New object type table (this data is already in object table, but this allows us to avoid loading the entire object for rendering type information in grpc/graphql's txn apis)
    • Continuing to write old watermark (as min of all pipeline watermarks)
  • Use Bytes instead of Vec in the prost-generated code, because the framework sort of forces you to clone all of your data in the commit path to deal with retries.

This can be deployed without breaking any readers. The reader updates to read these new columns/watermarks will follow in a separate PR, and then we can finally stop writing and delete the old columns.

Test plan

  • New test that uses a mocked grpc server to test partial write failures.
  • New emulator-based test (had to install the gcloud cli on the CI container to make this work).

@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 28, 2026 18:11 — with GitHub Actions Inactive
@vercel
Copy link
Copy Markdown

vercel bot commented Jan 28, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
sui-docs Ready Ready Preview, Comment Feb 3, 2026 9:00pm
2 Skipped Deployments
Project Deployment Actions Updated (UTC)
multisig-toolkit Ignored Ignored Preview Feb 3, 2026 9:00pm
sui-kiosk Ignored Ignored Preview Feb 3, 2026 9:00pm

Request Review

@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 28, 2026 19:32 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 28, 2026 19:39 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 28, 2026 22:08 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 28, 2026 22:33 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 28, 2026 23:31 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 00:00 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 00:19 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 14:52 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 15:01 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 21:18 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 21:52 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 29, 2026 22:42 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 30, 2026 00:11 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 30, 2026 00:12 — with GitHub Actions Inactive
@nickvikeras nickvikeras temporarily deployed to sui-typescript-aws-kms-test-env January 30, 2026 01:21 — with GitHub Actions Inactive
Add three new pipelines (EpochStart, EpochEnd, ObjectTypes) and extend
the transaction schema with separate data/signatures columns and
balance_changes/unchanged_loaded fields. Decode logic is unchanged
and will be updated in a follow-up.
pub signatures: AuthorityStrongQuorumSignInfo,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually serialize/deserialize these types you've defined here? If we don't (i think we just serialize/deserialize the individual parts of these structs) then we should remove the impls (and maybe add a comment indicating that we shouldn't implement serialize/deserialize on these types)

Copy link
Copy Markdown
Contributor Author

@nickvikeras nickvikeras Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add these types, I just reorganized the files to match the indexing & RPC's style preferences when refactoring. But I can try deleting the serde stuff and see if it still compiles. I don't think we actually serialize/deserialize it so you are right it shouldn't be there.

Comment on lines +139 to +140
pub end_timestamp_ms: Option<u64>,
pub end_checkpoint: Option<u64>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For indexing these shouldn't ever be None right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good catch, there is no reason these need to be Options.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if we are storing each field as it's own column in bigtable, I think we do want these to be Options so that we can query just a subset of the columns.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep i agree, my comment only makes sense if we keep the current schema

Comment on lines +17 to +18
pub const START: &str = "start";
pub const END: &str = "end";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to break out the individual components into their own columns vs having a start and end column which data that can't be expanded and evolved in the future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah yeah I can split it up. My brain for some reason still hasn't fully registered that BCS isn't extensible. I was just looking at the query in the rpc API and saw the data was small and wasn't sure if it made sense to split it up into multiple columns to read separately.

let timestamp_ms = checkpoint.summary.timestamp_ms;
let mut entries = Vec::with_capacity(checkpoint.object_set.len());

for object in checkpoint.object_set.iter() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing this we'll end up with some redundant writes since this includes input and loaded unchanged objects as well. But this should be effectively idempotent so maybe not worth the optimization yet?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The efficient implementation of this is fairly simple:

let deleted_objects = checkpoint
.eventually_removed_object_refs_post_version()
.into_iter()
.map(|(id, version, _)| {
Ok(StoredObject {
object_id: id.to_vec(),
object_version: version.value() as i64,
serialized_object: None,
})
});
let created_objects = checkpoint.transactions.iter().flat_map(|txn| {
txn.output_objects(&checkpoint.object_set).map(|o| {
let id = o.id();
let version = o.version();
Ok(StoredObject {
object_id: id.to_vec(),
object_version: version.value() as i64,
serialized_object: Some(bcs::to_bytes(o).with_context(|| {
format!("Serializing object {id} version {}", version.value())
})?),
})
})
});
deleted_objects
.chain(created_objects)
.collect::<Result<Vec<_>, _>>()

(the kv_objects pipeline also records a sentinel for deleted objects, but that can be ignored).

.map(tables::transactions::encode_key)
.collect(),
Some(RowFilter {
filter: Some(Filter::ColumnQualifierRegexFilter(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it always a regex or is there a way to just provide exact columns?

Copy link
Copy Markdown
Contributor

@amnn amnn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the post-land comments -- two main things are:

  • schema alignment on end-of-epoch data so that GraphQL can use this data-set.
  • not introducing a new place where we depend on an object's type not changing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Postgres handler for epoch end information is indexing a lot more stuff, which we would need in order to switch over to the archival store instead of relying on this index in GraphQL:

diesel::table! {
kv_epoch_ends (epoch) {
epoch -> Int8,
cp_hi -> Int8,
tx_hi -> Int8,
end_timestamp_ms -> Int8,
safe_mode -> Bool,
total_stake -> Nullable<Int8>,
storage_fund_balance -> Nullable<Int8>,
storage_fund_reinvestment -> Nullable<Int8>,
storage_charge -> Nullable<Int8>,
storage_rebate -> Nullable<Int8>,
stake_subsidy_amount -> Nullable<Int8>,
total_gas_fees -> Nullable<Int8>,
total_stake_rewards_distributed -> Nullable<Int8>,
leftover_storage_fund_inflow -> Nullable<Int8>,
epoch_commitments -> Bytea,
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works well schema-wise for GraphQL, but does it record a record for genesis (epoch 0)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it does, confirmed the record is there in my test db which started out empty and ran from genesis

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this pipeline assumes that an object's type cannot change once it is created, we make this assumption in one other place -- the obj_info pipeline of sui-indexer-alt, but this is one of the reasons why we're getting rid of this pipeline:

This assumption prevents us from allowing people to reclaim derived object IDs (because it would allow the same UID to be re-used under a different type).

For that reason, we should not introduce new pipelines that rely on this assumption. In this case, I think the fix is pretty simple: Adapt the pipeline to check whether the object was created or it was mutated and the mutation involves a type change.

Note that even today, dynamic fields may fail this test: a dynamic field's ID is derived from the parent ID, name type and name content. The value type and content don't impact the ID, which means a transaction can modify a field to change its value's type and it will be treated as a field mutation. If you query the type of the Field<..., ...> object corresponding to the dynamic field, you will then get a stale response.

let timestamp_ms = checkpoint.summary.timestamp_ms;
let mut entries = Vec::with_capacity(checkpoint.object_set.len());

for object in checkpoint.object_set.iter() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The efficient implementation of this is fairly simple:

let deleted_objects = checkpoint
.eventually_removed_object_refs_post_version()
.into_iter()
.map(|(id, version, _)| {
Ok(StoredObject {
object_id: id.to_vec(),
object_version: version.value() as i64,
serialized_object: None,
})
});
let created_objects = checkpoint.transactions.iter().flat_map(|txn| {
txn.output_objects(&checkpoint.object_set).map(|o| {
let id = o.id();
let version = o.version();
Ok(StoredObject {
object_id: id.to_vec(),
object_version: version.value() as i64,
serialized_object: Some(bcs::to_bytes(o).with_context(|| {
format!("Serializing object {id} version {}", version.value())
})?),
})
})
});
deleted_objects
.chain(created_objects)
.collect::<Result<Vec<_>, _>>()

(the kv_objects pipeline also records a sentinel for deleted objects, but that can be ignored).

pub use crate::handlers::set_max_mutations;

/// All pipeline names registered by the indexer. Single source of truth used for:
/// - Pipeline registration in `BigTableIndexer::new()`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This at least doesn't seem to be true (this const is not used for pipeline registration) is it needed, and if so, is it meant to be here in the file? It's flanked on either side by imports.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants