Skip to content

Conversation

@gruuya
Copy link
Contributor

@gruuya gruuya commented May 17, 2024

What

This PR introduces a flight-compatible protocol for syncing record batches from a remote source.

How

It facilitates the arrow flight do_put call to upload record batches representing data changes in a remote system. It also passes a command containing the metadata about the action.

These are in turn stored in a cache, which for a given lag and size based criteria will flush the batches from memory to object storage.

TODOs (here or in follow-on PRs)

  • DELETE actions are not wired up atm, meaning only INSERT (or the append part of an UPDATE) actions are correctly replicated
  • even the append support is not idempotent (will duplicate entries for same PKs if replayed)
  • no concurrency/background writes
  • lag-based replication is triggered in a push-like manner, meaning if there's no activity on the interface at all batches will remain dormant in memory indefinitely
  • a bunch of unit and integration tests, particularly focusing on read/write partition pruning (though mostly critical for point 1 above)

@gruuya gruuya force-pushed the do-put-poc branch 2 times, most recently from 21847e6 to 032c065 Compare May 20, 2024 08:12
@gruuya gruuya changed the title Seafowl data replication initial implementation Data replication interface May 20, 2024
@gruuya gruuya marked this pull request as ready for review May 20, 2024 09:51
@gruuya gruuya requested a review from mildbyte May 20, 2024 09:51

debug!("Processing data change with {num_rows} rows for url {url}");
// TODO: make timeout configurable
let accepted =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think about it, we might also need to reject the write if it'll put us above some "hard" memory limit. Maybe we should have a "low watermark" (flush when we're using more RAM than this) and "high watermark" (reject writes to never go above this limit) memory limit. We'll find out after load testing I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? I guess if we're over the low watermark we're going to be flushing syncs into storage and reducing the memory usage anyway, it's not like we're going to continue accumulating stuff in memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, if the flushing happens in the same process as the Flight handler, if it happens in the background, we might keep accumulating data even though we started the flush? Guess we'll find out with load testing

use arrow_flight::sql::{Any, ProstMessageExt};
use prost::Message;

tonic::include_proto!("clade.sync");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing this and not just letting the proto be in plain Rust?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question—I think a part of the answer is that we're using flight as the actual interface here, and it is protobuf based, so this is too by extension.

On a more practical side we may want to enable clients in different languages using this, and this would facilitate that.

// for that table. Special care is taken about deducing what is the correct durable
// sequence to report back to the caller.
//
// │sync #1│sync #2│sync #3│sync #4│sync #5│
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync here refers to the result of an RPC? a batch basically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sync corresponds to a single RPC call (so a vector of batches with the same schema).

Comment on lines 73 to 76
seqs: HashMap<String, IndexMap<SequenceNumber, DataSyncSequence>>,
// An indexed queue of table URL => pending batches to upsert/delete
// sorted by insertion order
syncs: IndexMap<String, DataSyncCollection>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between these? seqs and syncs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seqs is a map (of maps) of sequences that are still held in memory. We track the insertion time and whether the sequence has had its last sync call yet (and is thus a potential durable candidate), and also keep them ordered for each origin.

syncs is a map of table locations with pending flushes.

// An indexed queue of table URL => pending batches to upsert/delete
// sorted by insertion order
syncs: IndexMap<String, DataSyncCollection>,
// Total size of all batches in memory currently
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The (what feels like) inconsistent use of the term "batch" (versus "sync" or "seq"?) makes it hard for me to understand some of the comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bottom term actually refers to (the size of) all the batches in memory.

The upper one is more descriptive, it's really a map of table URL => syncs (which in turn keep store the actual batches plus some additional info). Let me update that one.

// Map of known memory sequence numbers per origin
origin_memory: HashMap<String, SequenceNumber>,
// Map of known durable sequence numbers per origin
origin_durable: HashMap<String, SequenceNumber>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we treating Origin as a String and not a u64?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, my impression was that these would be strings.

repeated string pk_column = 3;

// Opaque string identifying data source
string origin = 4;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question sorry, why is origin a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed now, thanks.

string origin = 4;

// Monotonically-increasing transaction number (e.g. the LSN)
uint64 sequence_number = 5;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just note that it can and does wrap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's a very important point. How often does this occur roughly?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LSN does not wrap

Copy link
Contributor

@mildbyte mildbyte Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(EDIT: oops, I sent this before seeing Petr's comment)

They shouldn't wrap as per https://www.postgresql.org/docs/16/wal-internals.html and other discussions:

The numbers do not wrap, but it will take a very, very long time to exhaust the available stock of numbers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, apologies.

uint64 sequence_number = 5;

// True if this is the last command in the transaction denoted
// by this LSN and Seafowl can advance its LSN after flushing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help if we consistently used LSN or Seq as a term.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, dropping LSN in favor of sequence throughout.

client: &mut FlightClient,
) -> Result<DataSyncResult> {
let flight_data = sync_cmd_to_flight_data(cmd.clone(), batch);
let response = client.do_put(flight_data).await?.next().await.unwrap()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do await().next().await?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the do_put call is itself async (first await), but it also returns a stream which we need to await in order to fetch the result.

In our case we only stream one element back (result) so it''s not very useful, but the mechanism is there for more complex stuff.

bool accepted = 1;

// Sequence number up to which the changes are in Seafowl's memory.
optional uint64 memory_sequence_number = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you return this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're referring to the fact that it is optional, this is so because after a restart we have nothing in memory, and it feels inadequate to convey this through some actual number (e.g. 0 or -1).

Otherwise if you mean why return this at all, I think it's useful for when e.g. the client (sending) side restarts and wants to replay something—it can then skip some stuff as per this value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will also be useful later on as per the discussion in

https://docs.google.com/document/d/1v1HCaw-__fx8JM5DwRe23lc4pt_w8XAC-wRkFoegrbc/edit?disco=AAABKFZw9wA

the system in general needs to know what's in memory to be able to provide reasonable snapshots for queries

Comment on lines 45 to 68
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
// │┌─────┐│ │ │┌─────┐│ │
// table_1 │seq:1│ │ 3 │
// │└─────┘│ │ │└─────┘│ │
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
// │ │┌─────┐│ │ │┌─────┐│
// table_2 │ 1 │ │ 3 │
// │ │└─────┘│ │ │└─────┘│
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
// │ │ │┌─────┐│ │ │
// table_3 │ 2 │
// │ │ │└─────┘│ │ │
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶
// ▼ ▼ ▼ ▼ ▼ ▼
// In the above example, the first flush will target table_1, dumping payloads
// of sync #1 and sync #4 (from sequences 1 and 3). Since this is not the last
// sync of the first sequence it won't be reported as durably stored yet.
//
// Next, table_2 will get flushed (sync #2 and sync #5); this time sequence 1
// is entirely persisted to storage so it will be reported as the new durable
// sequence number. Note that while sequence 3 is also now completely flushed,
// it isn't durable, since there is a preceding sequence (2) that is still in memory.
//
// Finally, once table_3 is flushed `SeafowlDataSyncManager` will advance the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beautiful diagram

Comment on lines +498 to +513
// Create a randomly sized vector of random record batches with
// a pre-defined schema
fn random_batches() -> Vec<RecordBatch> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new(SEAFOWL_SYNC_DATA_UD_FLAG, DataType::Boolean, false),
]));

// Generate a random length between 1 and 3
let len: usize = rand::thread_rng().gen_range(1..=3);

(0..len)
.map(|_| create_random_batch(schema.clone(), 10, 0.2, 0.8).unwrap())
.collect()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the lengths / contents being random matter for this test, i.e. do we expect the sync behavior to change because an occasionally longer batch triggers a flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the values are way below the default size-bound flush threshold.

assert!(sync_mgr.syncs.is_empty());
assert_eq!(sync_mgr.size, 0);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add another test for multiple origins here too.

Comment on lines 547 to 551
#[case::long_sequence_mid_flush(
&[("table_1", 1), ("table_1", 1), ("table_1", 1), FLUSH, ("table_1", 1), ("table_2", 1),
("table_2", 2), ("table_2", 2), FLUSH, ("table_2", 2), ("table_3", 2), FLUSH, ("table_3", 3),
FLUSH, ("table_3", 3), ("table_1", 4), ("table_3", 4), ("table_1", 4), FLUSH, ("table_3", 4),
FLUSH, FLUSH],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's just me being paranoid and not worth testing, but something like this perhaps:

(table_1, 1), (table_2, 1), (table_3, 1), (table_1, 2), (table_2, 2), (table_3, 2)

i.e. more than two tables on the same LSN and they're not consecutive in the change log

FLUSH, FLUSH],
// Reasoning for the observed durable sequences:
// - seq 1 not seen last sync
// - seq 1 seen last sync, but it is in a unflushed table (2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused as to why this is None and not 1, but it's because after the previous flush, we received data for table_1 and table_2, and the first flush after that only flushes table_1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, each FLUSH in the test executes a single flush call, which translates to a single table location with the oldest pending changes.


// Primary key columns. Must not be empty: the table must always
// have a replica identity / PK.
repeated string pk_column = 3;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be called pk_columns? (plural)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, it is indeed a recommendation, revising.

@gruuya gruuya merged commit b7e346e into main Jun 4, 2024
@gruuya gruuya deleted the do-put-poc branch June 4, 2024 11:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants