ccl/sqlproxyccl: add rebalancer queue for connection rebalancing#79346
ccl/sqlproxyccl: add rebalancer queue for connection rebalancing#79346craig[bot] merged 2 commits intocockroachdb:masterfrom
Conversation
200259f to
0deab39
Compare
0deab39 to
5b279ab
Compare
andy-kimball
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball, @jaylim-crl, and @jeffswenson)
pkg/ccl/sqlproxyccl/proxy_handler.go, line 180 at r4 (raw file):
} ctx, _ = stopper.WithCancelOnQuiesce(ctx)
Should WithCancelOnQuiesce be called before we use ctx in the NewCertManager call? It's strange that we use different ctx instances in different places.
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
} if err := b.stopper.RunAsyncTask(ctx, "processQueue-closer", func(ctx context.Context) {
How come we need this separate async task? I thought that ctx.Done would be closed when the stopper is quiesced, and therefore we could close the queue in processQueue at that point.
jaylim-crl
left a comment
There was a problem hiding this comment.
TFTR!
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball and @jeffswenson)
pkg/ccl/sqlproxyccl/proxy_handler.go, line 180 at r4 (raw file):
Previously, andy-kimball (Andy Kimball) wrote…
Should
WithCancelOnQuiescebe called before we usectxin theNewCertManagercall? It's strange that we use differentctxinstances in different places.
Yes, we could. We would need to thread ctx to setupIncomingCert. Also, that's already an existing issue today. I also don't see why we'd need a separate context for the cert manager. I can do that here.
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
and therefore we could close the queue in processQueue at that point.
The first part is correct, but the second isn't the case. The queue has no notion of context.Context, and there's nothing to wake the callers up whenever ctx.Done has been closed. The ctx object in processQueue is only used to indicate whether we want to continue reading from the queue. When we get blocked when reading from the queue, someone would need to invoke queue.close() explicitly to wake those callers up.
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 152 at r4 (raw file):
Previously, JeffSwenson (Jeff Swenson) wrote…
nit: the DB uses https://github.com/marusama/semaphore as its semaphore implementation. Conveniently its Acquire method accepts a ctx.
Good point. I can make this change.
pkg/ccl/sqlproxyccl/balancer/balancer_test.go, line 163 at r4 (raw file):
Previously, JeffSwenson (Jeff Swenson) wrote…
nit: Instead of adding these two hooks you can increment count before <-waitCh in the onTransferConnection and decrement it after <-waitCh.
Hm, let me look into this again. Maybe there's a simpler approach.
a4224d2 to
2c268bd
Compare
jaylim-crl
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball and @jeffswenson)
pkg/ccl/sqlproxyccl/proxy_handler.go, line 180 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
Yes, we could. We would need to thread ctx to
setupIncomingCert. Also, that's already an existing issue today. I also don't see why we'd need a separate context for the cert manager. I can do that here.
Done.
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 152 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
Good point. I can make this change.
Done. Actually, I see various approaches:
chan struct{}, e.g.:cockroach/pkg/kv/kvserver/store.go
Lines 765 to 766 in 63ea913
marusama/semaphorequotapool.IntPool
Regardless, I've updated this to use (2) since it's cleaner.
pkg/ccl/sqlproxyccl/balancer/balancer_test.go, line 163 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
Hm, let me look into this again. Maybe there's a simpler approach.
I left it as-is. I still need the afterProcessQueueItem hook for other sub-tests.
jeffswenson
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball and @jaylim-crl)
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
and therefore we could close the queue in processQueue at that point.
The first part is correct, but the second isn't the case. The queue has no notion of
context.Context, and there's nothing to wake the callers up wheneverctx.Donehas been closed. Thectxobject inprocessQueueis only used to indicate whether we want to continue reading from the queue. When we get blocked when reading from the queue, someone would need to invokequeue.close()explicitly to wake those callers up.
One idea: we could use a semaphore for tracking the size of the queue instead of the condition variable. That allows us to drop the close state and the goroutine. The implementation would look like:
func (q Queue) Push(item interface{}) {
q.Lock()
defer q.Unlock()
// add the item to the queue
q.semaphore.Release(1)
}
func (q Queue) Dequeue(ctx context.Context) (item interface{}, err error) {
if err := q.semaphore.Acquire(ctx, 1); err != nil {
return err
}
q.Lock()
defer q.Unlock()
// remove element from the queue
}
pkg/ccl/sqlproxyccl/balancer/balancer_test.go, line 163 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
I left it as-is. I still need the
afterProcessQueueItemhook for other sub-tests.
It is possible to avoid the need for the afterProcessQueueItem hook. When setting eventCh<-, we are using it to determine when processing is complete. The test can wait for processing to complete by limiting the concurrency to 1 and publishing a second event that closes eventCh.
jaylim-crl
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball and @jaylim-crl)
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
Previously, JeffSwenson (Jeff Swenson) wrote…
One idea: we could use a semaphore for tracking the size of the queue instead of the condition variable. That allows us to drop the close state and the goroutine. The implementation would look like:
func (q Queue) Push(item interface{}) { q.Lock() defer q.Unlock() // add the item to the queue q.semaphore.Release(1) } func (q Queue) Dequeue(ctx context.Context) (item interface{}, err error) { if err := q.semaphore.Acquire(ctx, 1); err != nil { return err } q.Lock() defer q.Unlock() // remove element from the queue }
I've done something like that before, but if we'd like to stick to github.com/marusama/semaphore, the above won't work for two reasons:
- We need a size limit for that to work.
- Release panics without Acquire: https://github.com/marusama/semaphore/blob/2d3c1eaa054b6e36c7c0dfde398f2b47e4bc5094/semaphore.go#L169-L171.
Does that align with what you think as well?
jeffswenson
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball and @jaylim-crl)
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
I've done something like that before, but if we'd like to stick to
github.com/marusama/semaphore, the above won't work for two reasons:
- We need a size limit for that to work.
- Release panics without Acquire: https://github.com/marusama/semaphore/blob/2d3c1eaa054b6e36c7c0dfde398f2b47e4bc5094/semaphore.go#L169-L171.
Does that align with what you think as well?
During initialization you can set the capacity to something really large and then acquire it all. It looks like the easiest way to do that with the marusama semaphore is:
semaphore := semaphore.New(0)
semaphore.SetLimit(MaxUint32)
cdaf10f to
0ed578d
Compare
jaylim-crl
left a comment
There was a problem hiding this comment.
TFTR! I'll come back again with an updated queue implementation.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball)
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
Previously, JeffSwenson (Jeff Swenson) wrote…
During initialization you can set the capacity to something really large and then acquire it all. It looks like the easiest way to do that with the marusama semaphore is:
semaphore := semaphore.New(0) semaphore.SetLimit(MaxUint32)
😄 I like that idea. I'll rework the queue as I think it's much more ergonomic being able to unblock when ctx is cancelled automatically.
pkg/ccl/sqlproxyccl/balancer/balancer_test.go, line 163 at r4 (raw file):
Previously, JeffSwenson (Jeff Swenson) wrote…
It is possible to avoid the need for the afterProcessQueueItem hook. When setting eventCh<-, we are using it to determine when processing is complete. The test can wait for processing to complete by limiting the concurrency to 1 and publishing a second event that closes eventCh.
Done.
This commit adds a rebalancer queue implementation to the balancer component. The queue will be used for rebalance requests for the connection migration work. This is done to ensure a centralized location that invokes the TransferConnection method on the connection handles. Doing this also enables us to limit the number of concurrent transfers within the proxy. Release note: None
0ed578d to
7aca4e3
Compare
jaylim-crl
left a comment
There was a problem hiding this comment.
Done. Everything has been addressed :) dequeue now reacts to context cancellations, and will be unblocked automatically when that happens.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @andy-kimball)
pkg/ccl/sqlproxyccl/balancer/balancer.go, line 117 at r4 (raw file):
Previously, jaylim-crl (Jay Lim) wrote…
😄 I like that idea. I'll rework the queue as I think it's much more ergonomic being able to unblock when ctx is cancelled automatically.
Done.
|
LGTM |
The previous commit added a rebalancer queue. This commit connects the queue to the balancer, and runs the queue processor in the background. By the default, we limit up to 100 concurrent transfers at any point in time, and each transfer will be retried up to 3 times. Release note: None
7aca4e3 to
02b5be6
Compare
|
TFTR! bors r=JeffSwenson |
|
Build succeeded: |
…G pods In cockroachdb#79346, we added a rebalancer queue for connection rebalancing. This commit adds support for transferring connections away from DRAINING pods. The rebalance loop runs once every 30 seconds for now, and connections will only be moved away from DRAINING pods if the pod has been draining for at least 1 minute. At the same time, we also fix an enqueue bug on the rebalancer queue where we're releasing the semaphore in the case of an update, which is incorrect. Release note: None
…G pods In cockroachdb#79346, we added a rebalancer queue for connection rebalancing. This commit adds support for transferring connections away from DRAINING pods. The rebalance loop runs once every 30 seconds for now, and connections will only be moved away from DRAINING pods if the pod has been draining for at least 1 minute. At the same time, we also fix an enqueue bug on the rebalancer queue where we're releasing the semaphore in the case of an update, which is incorrect. Release note: None
79725: ccl/sqlproxyccl: add support for moving connections away from DRAINING pods r=JeffSwenson a=jaylim-crl In #79346, we added a rebalancer queue for connection rebalancing. This commit adds support for transferring connections away from DRAINING pods. The rebalance loop runs once every 30 seconds for now, and connections will only be moved away from DRAINING pods if the pod has been draining for at least 1 minute. At the same time, we also fix an enqueue bug on the rebalancer queue where we're releasing the semaphore in the case of an update, which is incorrect. Release note: None Co-authored-by: Jay <jay@cockroachlabs.com>
…G pods In #79346, we added a rebalancer queue for connection rebalancing. This commit adds support for transferring connections away from DRAINING pods. The rebalance loop runs once every 30 seconds for now, and connections will only be moved away from DRAINING pods if the pod has been draining for at least 1 minute. At the same time, we also fix an enqueue bug on the rebalancer queue where we're releasing the semaphore in the case of an update, which is incorrect. Release note: None
ccl/sqlproxyccl: add rebalancer queue for rebalance requests
This commit adds a rebalancer queue implementation to the balancer component.
The queue will be used for rebalance requests for the connection migration
work. This is done to ensure a centralized location that invokes the
TransferConnection method on the connection handles. Doing this also enables
us to limit the number of concurrent transfers within the proxy.
Release note: None
ccl/sqlproxyccl: run rebalancer queue processor in the background
The previous commit added a rebalancer queue. This commit connects the queue to
the balancer, and runs the queue processor in the background. By the default,
we limit up to 100 concurrent transfers at any point in time, and each transfer
will be retried up to 3 times.
Release note: None
Jira issue: CRDB-14727