SDK batching/revamp 1: impl DataTableBatcher#1980
Conversation
c7dfb72 to
8b91b65
Compare
eba29b7 to
67dc616
Compare
jleibs
left a comment
There was a problem hiding this comment.
Some small comments and maybe a deadlock but the overall structure looks good. Nice standalone clean PR!
| pub flush_tick: Duration, | ||
|
|
||
| /// Flush if the accumulated payload has a size in bytes equal or greater than this. | ||
| pub flush_num_bytes: u64, |
There was a problem hiding this comment.
There's a subtle distinction between "batch size" and "flush threshold" -- I suspect they are related but it's not entirely explicit here. A bit more explanation could be helpful.
There was a problem hiding this comment.
I'm not sure what you're referring to? What's "batch size"? There's no config with that name 🤔
There was a problem hiding this comment.
No, but this thing is a "Batcher" and so it produces batches and those batches have a size. In particular the thing I think I was curious about was whether the batch would have a size > flush_num_bytes (I believe the answer to this is yes), but I wanted to be explicit that when we flush, all of the outstanding bytes (including those above the flush threshold) will end up in the batch.
There was a problem hiding this comment.
Ah yes, that's what the equal or greater than this alludes to. I'll make it extra clear.
|
|
||
| let cmds_to_tables_handle = { | ||
| const NAME: &str = "DataTableBatcher::cmds_to_tables"; | ||
| std::thread::Builder::new() |
There was a problem hiding this comment.
Out of scope for this PR but I'm a bit unsure myself how we should be deciding when we use tokio vs when we just spawn threads.
There was a problem hiding this comment.
My general motto is: if you can avoid async, avoid async.
| fn drop(&mut self) { | ||
| // NOTE: The command channel is private, if we're here, nothing is currently capable of | ||
| // sending data down the pipeline. | ||
| self.tx_cmds.send(Command::Shutdown).ok(); |
There was a problem hiding this comment.
I think we want to drop self.rx_tables before we call this send to avoid a deadlock?
Basically, if we are using a bounded channels and rx_tables isn't being drained by any listeners, then tx_table.send(table) could be blocking the batching thread, preventing it from ever processing the Shutdown. Dropping rx_tables first should at least cause tx_table to either error or it means some other outstanding sender thread is still holding a receiver and then at least it isn't our problem.
There was a problem hiding this comment.
Yeah the docs mention this:
/// Shutting down cannot ever block, unless the output channel is bounded and happens to be full
/// (see [DataTableBatcherConfig::max_tables_in_flight]).
My thought process being that if the user deliberately configures the channel sizes to be bounded, then they should expect that the system can and will block at any (inconvenient) time if they don't consume as needed.
Eh, I guess we can be extra polite...
|
|
||
| // --- Subscribe to tables --- | ||
|
|
||
| /// Returns a _shared_ channel in which are sent the batched [`DataTable`]s. |
There was a problem hiding this comment.
Any particular reason to make this shared? Is there value in a new receiver jumping in mid-stream? I worry about the usefulness of the data in that case.
There was a problem hiding this comment.
The channel is already mpmc by nature, so I didn't see the point of umping through extra hoops to turn it back into an mpsc one. Also parallel consumers might come in handy at some point.
| ) -> bool { | ||
| // TODO(#1760): now that we're re doing this here, it really is a massive waste not to send | ||
| // it over the wire... | ||
| row.compute_all_size_bytes(); |
There was a problem hiding this comment.
How expensive is this? Wasn't this a bottle-neck before?
There was a problem hiding this comment.
It was and still is and that's what's so nice about it: it's now done in a background thread on the clients...
But now we still need to send that information to the server to make things really fantastic.
There was a problem hiding this comment.
it's now done in a background thread on the clients
We should be careful about that though... we don't want to make this a bottleneck or a CPU drain on the clients either.
There was a problem hiding this comment.
It really isn't: it's extremely costly compared to the rest of the operations that we do on the very fast paths in the store, but it's still order of magnitude faster that most of what goes on on the client... especially if it's a python client 😒
| acc.pending_num_rows >= config.flush_num_rows | ||
| || acc.pending_num_bytes >= config.flush_num_bytes |
There was a problem hiding this comment.
Rather than returning a bool it seems like it would be clearer to call do_flush_all here.
There was a problem hiding this comment.
I find it clearer to be able to see all flush triggers in the main loop
There was a problem hiding this comment.
In that case why not move the check into the main loop as well?
do_push_row(&mut acc, row);
if acc.pending_num_rows >= config.flush_num_rows || acc.pending_num_bytes >= config.flush_num_bytes
{
do_flush_all(&mut acc, &tx_table, "bytes|rows");
acc.reset();
}
But, at the very least add a comment to do_push_row indicating that it returns a bool with the assumption that the caller will flush the data if it returns true?
There was a problem hiding this comment.
Moving the check sounds good to me 👍
This PR implements
DataTableBatcher, which... batchesDataTables.Not used anywhere yet, just the type itself.
DataTableBatcher#1980SessionwithRecordingContext#1983clockexample for Rust #2000PythonSession#1985Part of #1619
Related:
DataCell's size (& other metadata) over the wire #1760Future work:
DataTable::sortshared withDataStore#1981