Conversation
refactor wal truncation restart workers even if wal truncation fails
f0481d0 to
15e914f
Compare
| // Swap to new sender - new operations will go to the new channel | ||
| let _old_sender = self.update_sender.swap(Arc::new(update_sender)); | ||
| // Signal all workers to stop | ||
| update_handler.stop_flush_worker(); | ||
| update_handler.stop_update_worker(); | ||
|
|
||
| // Wait for workers to finish and get pending operations from the old channel | ||
| let pending_receiver = update_handler.wait_workers_stops().await?; | ||
|
|
||
| // Forward pending operations from old receiver to new channel | ||
| if let Some(mut old_receiver) = pending_receiver { | ||
| let sender = self.update_sender.load(); | ||
| while let Ok(signal) = old_receiver.try_recv() { | ||
| // Forward pending operations to new channel | ||
| // Use try_send to avoid blocking - if channel is full, operations are dropped | ||
| let _ = sender.try_send(signal); | ||
| } | ||
| } |
There was a problem hiding this comment.
Why do we create a new channel here? Can't we repurpose the existing receiver since we now get it back?
There was a problem hiding this comment.
Not sure how to behave properly.
If we repurpose the existing one - we cannot change the update queue size, only after restart.
If we change the update queue size, we may lost operations if a new size is smaller.
What I can propose. Create a new channel only when there is a new update queue size and old update queue fits the new size
There was a problem hiding this comment.
Because update queue size is a node-level config and cannot be changed in runtime, I changed this logic and reuse old receiver.
In case of a non-existing receiver, I propose to create a new channel instead of returning an error.
There was a problem hiding this comment.
This cannot be changed at runtime. We agreed to handle the case of 'lower setting than current required queue size' in a separate PR.
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughThe PR refactors LocalShard's worker management and shutdown flow by replacing channel-based Stop signals with direct method invocations. It introduces a CancellationToken mechanism for the update worker instead of atomic skip-updates guards. The UpdateSignal enum is simplified by removing the Stop variant and adjusting Plunger. The wait_update_workers_stop method signature changes to return pending operation receiver information. WAL truncation is reimplemented to acquire locks, stop workers, collect pending operations, perform truncation, and restart workers with fresh channels. Test expectations for WAL recovery are tightened accordingly. Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
* remove UpdateSignal Stop refactor wal truncation restart workers even if wal truncation fails * dont duplicate notification code in update worker * send manually cancellation token to update worker * revert stop_update_workers and keep as is * refactor wal truncation * are you happy clippy * Inline breaks into single block * Remove closure and call directly * dont recreate channels in config update * more comments * debug assert * fmt --------- Co-authored-by: timvisee <tim@visee.me> Co-authored-by: Andrey Vasnetsov <andrey@vasnetsov.com>
Summary
This PR removes
UpdateSignal::Stopand changes WAL truncation mechanism.Description
When implementing the update queue feature, we discovered a problem with the current worker stop mechanism. If the update worker has many pending operations in the channel, sending UpdateSignal::Stop means we have to wait until all operations are processed before the worker actually stops - the stop signal sits at the end of the queue behind all pending operations.
As the solution, update worker does not have a stop signal. Instead, channel and stop signal are checked using
tokio::select. Also, update worker returns the channel with all pending operations.Wal truncation
Because new stopping mechanism allows to stop update worker immediately and get all pending operations, this PR also refactors WAL truncation, where it's not necessary anymore to make overcomplicated update worker.
Open problems
Can we change the update channel size via config update? If yes, there is a problem in the config optimizer update, where we cannot fill the new update worker if the config shrinks the update queue size