-
Notifications
You must be signed in to change notification settings - Fork 19
Data replication interface #527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
21847e6 to
032c065
Compare
src/frontend/flight/handler.rs
Outdated
|
|
||
| debug!("Processing data change with {num_rows} rows for url {url}"); | ||
| // TODO: make timeout configurable | ||
| let accepted = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come to think about it, we might also need to reject the write if it'll put us above some "hard" memory limit. Maybe we should have a "low watermark" (flush when we're using more RAM than this) and "high watermark" (reject writes to never go above this limit) memory limit. We'll find out after load testing I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe? I guess if we're over the low watermark we're going to be flushing syncs into storage and reducing the memory usage anyway, it's not like we're going to continue accumulating stuff in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, if the flushing happens in the same process as the Flight handler, if it happens in the background, we might keep accumulating data even though we started the flush? Guess we'll find out with load testing
| use arrow_flight::sql::{Any, ProstMessageExt}; | ||
| use prost::Message; | ||
|
|
||
| tonic::include_proto!("clade.sync"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing this and not just letting the proto be in plain Rust?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair question—I think a part of the answer is that we're using flight as the actual interface here, and it is protobuf based, so this is too by extension.
On a more practical side we may want to enable clients in different languages using this, and this would facilitate that.
| // for that table. Special care is taken about deducing what is the correct durable | ||
| // sequence to report back to the caller. | ||
| // | ||
| // │sync #1│sync #2│sync #3│sync #4│sync #5│ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync here refers to the result of an RPC? a batch basically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sync corresponds to a single RPC call (so a vector of batches with the same schema).
src/frontend/flight/sync.rs
Outdated
| seqs: HashMap<String, IndexMap<SequenceNumber, DataSyncSequence>>, | ||
| // An indexed queue of table URL => pending batches to upsert/delete | ||
| // sorted by insertion order | ||
| syncs: IndexMap<String, DataSyncCollection>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between these? seqs and syncs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seqs is a map (of maps) of sequences that are still held in memory. We track the insertion time and whether the sequence has had its last sync call yet (and is thus a potential durable candidate), and also keep them ordered for each origin.
syncs is a map of table locations with pending flushes.
| // An indexed queue of table URL => pending batches to upsert/delete | ||
| // sorted by insertion order | ||
| syncs: IndexMap<String, DataSyncCollection>, | ||
| // Total size of all batches in memory currently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The (what feels like) inconsistent use of the term "batch" (versus "sync" or "seq"?) makes it hard for me to understand some of the comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bottom term actually refers to (the size of) all the batches in memory.
The upper one is more descriptive, it's really a map of table URL => syncs (which in turn keep store the actual batches plus some additional info). Let me update that one.
src/frontend/flight/sync.rs
Outdated
| // Map of known memory sequence numbers per origin | ||
| origin_memory: HashMap<String, SequenceNumber>, | ||
| // Map of known durable sequence numbers per origin | ||
| origin_durable: HashMap<String, SequenceNumber>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we treating Origin as a String and not a u64?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question, my impression was that these would be strings.
clade/proto/sync.proto
Outdated
| repeated string pk_column = 3; | ||
|
|
||
| // Opaque string identifying data source | ||
| string origin = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question sorry, why is origin a string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed now, thanks.
| string origin = 4; | ||
|
|
||
| // Monotonically-increasing transaction number (e.g. the LSN) | ||
| uint64 sequence_number = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just note that it can and does wrap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, that's a very important point. How often does this occur roughly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LSN does not wrap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(EDIT: oops, I sent this before seeing Petr's comment)
They shouldn't wrap as per https://www.postgresql.org/docs/16/wal-internals.html and other discussions:
The numbers do not wrap, but it will take a very, very long time to exhaust the available stock of numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake, apologies.
clade/proto/sync.proto
Outdated
| uint64 sequence_number = 5; | ||
|
|
||
| // True if this is the last command in the transaction denoted | ||
| // by this LSN and Seafowl can advance its LSN after flushing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help if we consistently used LSN or Seq as a term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, dropping LSN in favor of sequence throughout.
| client: &mut FlightClient, | ||
| ) -> Result<DataSyncResult> { | ||
| let flight_data = sync_cmd_to_flight_data(cmd.clone(), batch); | ||
| let response = client.do_put(flight_data).await?.next().await.unwrap()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do await().next().await?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the do_put call is itself async (first await), but it also returns a stream which we need to await in order to fetch the result.
In our case we only stream one element back (result) so it''s not very useful, but the mechanism is there for more complex stuff.
| bool accepted = 1; | ||
|
|
||
| // Sequence number up to which the changes are in Seafowl's memory. | ||
| optional uint64 memory_sequence_number = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you return this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're referring to the fact that it is optional, this is so because after a restart we have nothing in memory, and it feels inadequate to convey this through some actual number (e.g. 0 or -1).
Otherwise if you mean why return this at all, I think it's useful for when e.g. the client (sending) side restarts and wants to replay something—it can then skip some stuff as per this value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will also be useful later on as per the discussion in
the system in general needs to know what's in memory to be able to provide reasonable snapshots for queries
| // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ | ||
| // │┌─────┐│ │ │┌─────┐│ │ | ||
| // table_1 │seq:1│ │ 3 │ | ||
| // │└─────┘│ │ │└─────┘│ │ | ||
| // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ | ||
| // │ │┌─────┐│ │ │┌─────┐│ | ||
| // table_2 │ 1 │ │ 3 │ | ||
| // │ │└─────┘│ │ │└─────┘│ | ||
| // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ | ||
| // │ │ │┌─────┐│ │ │ | ||
| // table_3 │ 2 │ | ||
| // │ │ │└─────┘│ │ │ | ||
| // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶ | ||
| // ▼ ▼ ▼ ▼ ▼ ▼ | ||
| // In the above example, the first flush will target table_1, dumping payloads | ||
| // of sync #1 and sync #4 (from sequences 1 and 3). Since this is not the last | ||
| // sync of the first sequence it won't be reported as durably stored yet. | ||
| // | ||
| // Next, table_2 will get flushed (sync #2 and sync #5); this time sequence 1 | ||
| // is entirely persisted to storage so it will be reported as the new durable | ||
| // sequence number. Note that while sequence 3 is also now completely flushed, | ||
| // it isn't durable, since there is a preceding sequence (2) that is still in memory. | ||
| // | ||
| // Finally, once table_3 is flushed `SeafowlDataSyncManager` will advance the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beautiful diagram
| // Create a randomly sized vector of random record batches with | ||
| // a pre-defined schema | ||
| fn random_batches() -> Vec<RecordBatch> { | ||
| let schema = Arc::new(Schema::new(vec![ | ||
| Field::new("c1", DataType::Int32, true), | ||
| Field::new("c2", DataType::Utf8, true), | ||
| Field::new(SEAFOWL_SYNC_DATA_UD_FLAG, DataType::Boolean, false), | ||
| ])); | ||
|
|
||
| // Generate a random length between 1 and 3 | ||
| let len: usize = rand::thread_rng().gen_range(1..=3); | ||
|
|
||
| (0..len) | ||
| .map(|_| create_random_batch(schema.clone(), 10, 0.2, 0.8).unwrap()) | ||
| .collect() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the lengths / contents being random matter for this test, i.e. do we expect the sync behavior to change because an occasionally longer batch triggers a flush?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the values are way below the default size-bound flush threshold.
| assert!(sync_mgr.syncs.is_empty()); | ||
| assert_eq!(sync_mgr.size, 0); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add another test for multiple origins here too.
| #[case::long_sequence_mid_flush( | ||
| &[("table_1", 1), ("table_1", 1), ("table_1", 1), FLUSH, ("table_1", 1), ("table_2", 1), | ||
| ("table_2", 2), ("table_2", 2), FLUSH, ("table_2", 2), ("table_3", 2), FLUSH, ("table_3", 3), | ||
| FLUSH, ("table_3", 3), ("table_1", 4), ("table_3", 4), ("table_1", 4), FLUSH, ("table_3", 4), | ||
| FLUSH, FLUSH], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's just me being paranoid and not worth testing, but something like this perhaps:
(table_1, 1), (table_2, 1), (table_3, 1), (table_1, 2), (table_2, 2), (table_3, 2)
i.e. more than two tables on the same LSN and they're not consecutive in the change log
src/frontend/flight/sync.rs
Outdated
| FLUSH, FLUSH], | ||
| // Reasoning for the observed durable sequences: | ||
| // - seq 1 not seen last sync | ||
| // - seq 1 seen last sync, but it is in a unflushed table (2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused as to why this is None and not 1, but it's because after the previous flush, we received data for table_1 and table_2, and the first flush after that only flushes table_1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct, each FLUSH in the test executes a single flush call, which translates to a single table location with the oldest pending changes.
clade/proto/sync.proto
Outdated
|
|
||
| // Primary key columns. Must not be empty: the table must always | ||
| // have a replica identity / PK. | ||
| repeated string pk_column = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be called pk_columns? (plural)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it is indeed a recommendation, revising.
What
This PR introduces a flight-compatible protocol for syncing record batches from a remote source.
How
It facilitates the arrow flight
do_putcall to upload record batches representing data changes in a remote system. It also passes a command containing the metadata about the action.These are in turn stored in a cache, which for a given lag and size based criteria will flush the batches from memory to object storage.
TODOs (here or in follow-on PRs)