changefeedccl: Propagate pushback throughout changefeed pipeline.#68288
changefeedccl: Propagate pushback throughout changefeed pipeline.#68288craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
ajwerner
left a comment
There was a problem hiding this comment.
Now we're really getting somewhere!
| if r.(*bufferEntry).wasBlocked { | ||
| metrics.BufferPushbackNanos.Inc(timeutil.Since(start).Nanoseconds()) | ||
| } | ||
| })) |
There was a problem hiding this comment.
Consider this change and using OnWait, the quotapool is already doing all the bookkeeping you need, shame to duplicate it here.
diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go
index 1e117acc54..7b07a7aff7 100644
--- a/pkg/kv/kvserver/tenantrate/limiter.go
+++ b/pkg/kv/kvserver/tenantrate/limiter.go
@@ -91,7 +91,7 @@ func (rl *limiter) init(
func(ctx context.Context, poolName string, r quotapool.Request) {
rl.metrics.currentBlocked.Inc(1)
},
- func(ctx context.Context, poolName string, r quotapool.Request) {
+ func(ctx context.Context, poolName string, r quotapool.Request, _ time.Time) {
rl.metrics.currentBlocked.Dec(1)
},
))
diff --git a/pkg/util/quotapool/config.go b/pkg/util/quotapool/config.go
index 128d70160d..21f58c022b 100644
--- a/pkg/util/quotapool/config.go
+++ b/pkg/util/quotapool/config.go
@@ -37,16 +37,16 @@ func OnAcquisition(f AcquisitionFunc) Option {
})
}
-// OnWaitFunc is the prototype for functions called to notify the start or
-// finish of a waiting period when a request is blocked.
-type OnWaitFunc func(
+// OnWaitStartFunc is the prototype for functions called to notify the start
+// of a waiting period when a request is blocked.
+type OnWaitStartFunc func(
ctx context.Context, poolName string, r Request,
)
// OnWait creates an Option to configure two callbacks which are called when a
// request blocks and has to wait for quota (at the start and end of the
// wait).
-func OnWait(onStart, onFinish OnWaitFunc) Option {
+func OnWait(onStart OnWaitStartFunc, onFinish AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onWaitStart = onStart
cfg.onWaitFinish = onFinish
@@ -112,13 +112,14 @@ func WithMinimumWait(duration time.Duration) Option {
}
type config struct {
- onAcquisition AcquisitionFunc
- onSlowAcquisition SlowAcquisitionFunc
- onWaitStart, onWaitFinish OnWaitFunc
- slowAcquisitionThreshold time.Duration
- timeSource timeutil.TimeSource
- closer <-chan struct{}
- minimumWait time.Duration
+ onAcquisition AcquisitionFunc
+ onSlowAcquisition SlowAcquisitionFunc
+ onWaitStart OnWaitStartFunc
+ onWaitFinish AcquisitionFunc
+ slowAcquisitionThreshold time.Duration
+ timeSource timeutil.TimeSource
+ closer <-chan struct{}
+ minimumWait time.Duration
}
var defaultConfig = config{
diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go
index 7be0319eb7..ec9b24bc5e 100644
--- a/pkg/util/quotapool/quotapool.go
+++ b/pkg/util/quotapool/quotapool.go
@@ -220,7 +220,7 @@ func (qp *AbstractPool) Acquire(ctx context.Context, r Request) (err error) {
qp.config.onWaitStart(ctx, qp.name, r)
}
if qp.config.onWaitFinish != nil {
- defer qp.config.onWaitFinish(ctx, qp.name, r)
+ defer qp.config.onWaitFinish(ctx, qp.name, r, start)
}
// Set up the infrastructure to report slow requests.| // amount of resources used to process such event varies. So, instead of coming up | ||
| // with complex schemes to accurate measure and adjust current memory usage, we'll request | ||
| // the amount of memory multiplied by this fudge factor. | ||
| const eventMemoryUsedMultiplier = 2 |
There was a problem hiding this comment.
Make this a cluster setting. You'll thank yourself later.
| resource := &callbackResource{ | ||
| cb: func() { | ||
| b.qp.Update(func(r quotapool.Resource) (shouldNotify bool) { | ||
| res := r.(*blockingBufferQuotaPool) | ||
| res.release(got) | ||
| return true | ||
| }) | ||
| }, |
There was a problem hiding this comment.
I'd prefer if you just built yourself a data type like quotapool.IntAlloc. It'd be literally exactly like that. This callback thing doesn't feel like it's giving you all that much. If you make a data type you can also stick a Merge method on it to accumulate inside the sink which should also allow for way less synchronization.
14ca238 to
2a61a68
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 55 at r2 (raw file):
Previously, ajwerner wrote…
Consider this change and using
OnWait, thequotapoolis already doing all the bookkeeping you need, shame to duplicate it here.diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index 1e117acc54..7b07a7aff7 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -91,7 +91,7 @@ func (rl *limiter) init( func(ctx context.Context, poolName string, r quotapool.Request) { rl.metrics.currentBlocked.Inc(1) }, - func(ctx context.Context, poolName string, r quotapool.Request) { + func(ctx context.Context, poolName string, r quotapool.Request, _ time.Time) { rl.metrics.currentBlocked.Dec(1) }, )) diff --git a/pkg/util/quotapool/config.go b/pkg/util/quotapool/config.go index 128d70160d..21f58c022b 100644 --- a/pkg/util/quotapool/config.go +++ b/pkg/util/quotapool/config.go @@ -37,16 +37,16 @@ func OnAcquisition(f AcquisitionFunc) Option { }) } -// OnWaitFunc is the prototype for functions called to notify the start or -// finish of a waiting period when a request is blocked. -type OnWaitFunc func( +// OnWaitStartFunc is the prototype for functions called to notify the start +// of a waiting period when a request is blocked. +type OnWaitStartFunc func( ctx context.Context, poolName string, r Request, ) // OnWait creates an Option to configure two callbacks which are called when a // request blocks and has to wait for quota (at the start and end of the // wait). -func OnWait(onStart, onFinish OnWaitFunc) Option { +func OnWait(onStart OnWaitStartFunc, onFinish AcquisitionFunc) Option { return optionFunc(func(cfg *config) { cfg.onWaitStart = onStart cfg.onWaitFinish = onFinish @@ -112,13 +112,14 @@ func WithMinimumWait(duration time.Duration) Option { } type config struct { - onAcquisition AcquisitionFunc - onSlowAcquisition SlowAcquisitionFunc - onWaitStart, onWaitFinish OnWaitFunc - slowAcquisitionThreshold time.Duration - timeSource timeutil.TimeSource - closer <-chan struct{} - minimumWait time.Duration + onAcquisition AcquisitionFunc + onSlowAcquisition SlowAcquisitionFunc + onWaitStart OnWaitStartFunc + onWaitFinish AcquisitionFunc + slowAcquisitionThreshold time.Duration + timeSource timeutil.TimeSource + closer <-chan struct{} + minimumWait time.Duration } var defaultConfig = config{ diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go index 7be0319eb7..ec9b24bc5e 100644 --- a/pkg/util/quotapool/quotapool.go +++ b/pkg/util/quotapool/quotapool.go @@ -220,7 +220,7 @@ func (qp *AbstractPool) Acquire(ctx context.Context, r Request) (err error) { qp.config.onWaitStart(ctx, qp.name, r) } if qp.config.onWaitFinish != nil { - defer qp.config.onWaitFinish(ctx, qp.name, r) + defer qp.config.onWaitFinish(ctx, qp.name, r, start) } // Set up the infrastructure to report slow requests.
You're right of course. This is part of another PR that's already merging (just bors-). I'll make the change there (#68256) and rebase this PR.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 90 at r2 (raw file):
Previously, ajwerner wrote…
I'd prefer if you just built yourself a data type like
quotapool.IntAlloc. It'd be literally exactly like that. This callback thing doesn't feel like it's giving you all that much. If you make a data type you can also stick aMergemethod on it to accumulate inside the sink which should also allow for way less synchronization.
Good idea -- done. See bufferEntryAlloc.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 127 at r2 (raw file):
Previously, ajwerner wrote…
Make this a cluster setting. You'll thank yourself later.
Done.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 55 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
You're right of course. This is part of another PR that's already merging (just bors-). I'll make the change there (#68256) and rebase this PR.
Done.
2a61a68 to
2fe9a34
Compare
ajwerner
left a comment
There was a problem hiding this comment.
I think with another pass this can be cleaner.
Reviewed 1 of 1 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, @miretskiy, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 196 at r4 (raw file):
// bufferEntryAlloc is a Resource associated with buffer entry allocation. type bufferEntryAlloc struct { alloc *bufferEntry
I think you can invert this to be the following. I don't think the bufferEntry needs to know about the pool in any other way. Then you can change Acquire around to just give you back an alloc. Then Move() doesn't need to do anything fancy like allocate or anything like that.
type alloc struct {
bytes int64
qp *quotapool.AbstractPool
}pkg/ccl/changefeedccl/kvevent/event.go, line 39 at r4 (raw file):
type Writer interface { // Add adds event, along with resources allocated so far for this event. Add(ctx context.Context, event Event, resource Resource) error
My sense is that making the Writer in terms of Event is a good change, however, I don't love the way Resource is getting shoved into it. I think the interface would feel better if Resource were a part of the Event structure. Consider a DetachResource() method on event itself and an unexported attachResource(r Resource) which the blocking buffer calls.
2fe9a34 to
9fd4ae3
Compare
miretskiy
left a comment
There was a problem hiding this comment.
@ajwerner Ready for another good look.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 196 at r4 (raw file):
Previously, ajwerner wrote…
I think you can invert this to be the following. I don't think the bufferEntry needs to know about the pool in any other way. Then you can change
Acquirearound to just give you back an alloc. ThenMove()doesn't need to do anything fancy like allocate or anything like that.type alloc struct { bytes int64 qp *quotapool.AbstractPool }
Alright, I think I got it working correctly. Quite a bit of refactoring around this class. Eliminated blockingBufferQuotaPool altogether; introduced simple alloc struct (like you have here); also simply allocation request.
Most importantly, I implemented Close which closes underlying bound account correct -- I hope.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 251 at r5 (raw file):
// single request, but we may succeed if we try again later since some other // process may release it into the pool. // TODO(yevgeniy): Consider making retry configurable; possibly with backoff.
@ajwerner Just want to make sure you don't miss this change vs old behavior.
I think it makes sense.
pkg/ccl/changefeedccl/kvevent/event.go, line 39 at r4 (raw file):
Previously, ajwerner wrote…
My sense is that making the
Writerin terms ofEventis a good change, however, I don't love the wayResourceis getting shoved into it. I think the interface would feel better ifResourcewere a part of theEventstructure. Consider aDetachResource()method on event itself and an unexportedattachResource(r Resource)which the blocking buffer calls.
Yeah; I've been going back and forth w/ this and I have seriously considered it before I sent out this
version.
Add no longer takes the resource.
pkg/ccl/changefeedccl/kvevent/resource.go, line 47 at r5 (raw file):
// // Resource can be Move()d only once. Calling Move multiple times panics. //
@ajwerner Also, want to point out significant changes in this (new) file. I split resource interface into 2 parts (Resource and ScopedResource).
I've been running into issues around making sure that the resources are released correctly. It was very difficult to get right.
ScopedResource, that's never heap allocated, is the mechanism I came up with that hopefully makes resource leaks less likely. At least that's my hope.
Let me know if you have better ideas.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 90 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Good idea -- done. See
bufferEntryAlloc.
Done.
7314cc0 to
8165653
Compare
|
@stevendanna @HonoreDB @ajwerner We finally have green CI. Definitely ready for a good look. |
stevendanna
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @miretskiy)
a discussion (no related file):
Overall, I really like the idea of acquiring this resource once things arrive into the changefeed code from rangefeeds and then passing it around to the end.
I've left a couple minor comments, but will try to review again in more detail this afternoon.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 101 at r7 (raw file):
// Add implements Writer interface. func (b *blockingBuffer) Add(ctx context.Context, e Event) error { ensureOpened := func(acquireLock bool) error {
In some other packages, rather than using a boolean, they use the Locked suffix to indicate that the function should be called with the lock already held. I wonder if it is worth following that pattern here (and in other changefeedccl code):
func (b *blockingBuffer) ensureOpened(ctx context.Context, reason string) error {
b.mu.Lock()
defer b.mu.Unlock()
return ensureOpenedLocked(ctx, reason)
}
func (b *blockingBuffer) ensureOpenedLocked(ctx context.Context, reason string) error {
if b.mu.closed {
...
}
return nil
}pkg/ccl/changefeedccl/kvevent/resource.go, line 11 at r7 (raw file):
package kvevent // Resource describes the resource allocated by on behalf of an event.
allocated by on behalf of -> allocated on behalf of
stevendanna
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @miretskiy)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 122 at r7 (raw file):
// Acquire the quota first. alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.approxSize))
One small oddness in the current shape of the API is that some of our Buffer type's respect the resource from the passed in Event and other's don't. We don't here because we know this is the entry point of the changefeed system since this is the buffer we push into from the channel that we've given the rangefeed. However, it does mean that we currently need to take care if we were to move around the type of buffer used in different places.
To prevent misuse in the future, perhaps we should return an error if the passed in event already has a resource attached or add to the documentation comment.
Relatedly, it seems Get returning an event (which may have a resource attached) and a resource is a potential for future confusion.
pkg/ccl/changefeedccl/kvevent/throttling_buffer.go, line 39 at r7 (raw file):
if err := b.throttle.AcquireMessageQuota(ctx, evt.ApproximateSize()); err != nil { sr.Release()
I think the defer above takes care of this?
pkg/ccl/changefeedccl/kvfeed/kv_feed.go, line 456 at r7 (raw file):
return err } return sink.Add(ctx, e)
Do we need to attach the resource to the event here as well?
miretskiy
left a comment
There was a problem hiding this comment.
Dismissed @ajwerner from a discussion.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
a discussion (no related file):
Previously, stevendanna (Steven Danna) wrote…
Overall, I really like the idea of acquiring this resource once things arrive into the changefeed code from rangefeeds and then passing it around to the end.
I've left a couple minor comments, but will try to review again in more detail this afternoon.
Ack.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 90 at r2 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Done.
Done.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 101 at r7 (raw file):
Previously, stevendanna (Steven Danna) wrote…
In some other packages, rather than using a boolean, they use the
Lockedsuffix to indicate that the function should be called with the lock already held. I wonder if it is worth following that pattern here (and in other changefeedccl code):func (b *blockingBuffer) ensureOpened(ctx context.Context, reason string) error { b.mu.Lock() defer b.mu.Unlock() return ensureOpenedLocked(ctx, reason) } func (b *blockingBuffer) ensureOpenedLocked(ctx context.Context, reason string) error { if b.mu.closed { ... } return nil }
Yeah; I had that before and was going back and forth on that. Moved to dedicated methods.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 122 at r7 (raw file):
Previously, stevendanna (Steven Danna) wrote…
One small oddness in the current shape of the API is that some of our Buffer type's respect the resource from the passed in Event and other's don't. We don't here because we know this is the entry point of the changefeed system since this is the buffer we push into from the channel that we've given the rangefeed. However, it does mean that we currently need to take care if we were to move around the type of buffer used in different places.
To prevent misuse in the future, perhaps we should return an error if the passed in event already has a resource attached or add to the documentation comment.
Relatedly, it seems
Getreturning an event (which may have a resource attached) and a resource is a potential for future confusion.
I agree there is some confusion here. Earlier versions of my PR had Add also take in an explicit resource. Andrew wasn't too thrilled about it particularly because it's literally only 1 place that needs to transfer resources (kv_feed, copy go routine). I still think we may have to do that (if not for other reason than to be consistent with Get); but before that we'd need to figure out how to merge such resources (maybe I'll do it as part of this PR)
While the resource is not accessible outside this package, I like the defensive approach re returning an error if an event has an associated resource.
pkg/ccl/changefeedccl/kvevent/resource.go, line 11 at r7 (raw file):
Previously, stevendanna (Steven Danna) wrote…
allocated by on behalf of->allocated on behalf of
Done.
pkg/ccl/changefeedccl/kvevent/throttling_buffer.go, line 39 at r7 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I think the defer above takes care of this?
Yup. Good catch
pkg/ccl/changefeedccl/kvfeed/kv_feed.go, line 456 at r7 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Do we need to attach the resource to the event here as well?
Probably -- the entirely of resolved events is a bit "under developed" wrt to resources... Probably needs to be cleaned up.
7da6540 to
2617805
Compare
|
@ajwerner New updates pushed. No more scoped resource -- the need for that disappeared (since buffer close takes care of releasing resources correctly). |
HonoreDB
left a comment
There was a problem hiding this comment.
Reviewed 1 of 14 files at r3, 4 of 25 files at r5, 2 of 5 files at r7, 21 of 21 files at r8.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @miretskiy, and @stevendanna)
pkg/ccl/changefeedccl/sink_cloudstorage_test.go, line 175 at r8 (raw file):
opts[changefeedbase.OptCompression] = before }() // for _, compression := range []string{"", "gzip"} {
Marker that a test is commented out here
pkg/ccl/changefeedccl/sink_webhook.go, line 446 at r8 (raw file):
err = s.sendMessageWithRetries(ctx, payload) s.inflight.maybeSetError(err) s.inflight.FinishRequest(ctx, kvevent.Alloc{})
No alloc here? You are giving resolved timestamps approximate sizes elsewhere.
2617805 to
aa4171a
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/sink_cloudstorage_test.go, line 175 at r8 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
Marker that a test is commented out here
Doh... Thanks.
pkg/ccl/changefeedccl/sink_webhook.go, line 446 at r8 (raw file):
Previously, HonoreDB (Aaron Zinger) wrote…
No alloc here? You are giving resolved timestamps approximate sizes elsewhere.
Very true -- and a good catch -- I forgot to release it in change aggregator.
In general, this is something that we probably need to resolve one way or another.
This method is only called by change frontier -- when it advances this frontier. There is no resource
associated with it since that file/message is written out immediately.
But perhaps it would be better and more consistent if all sink methods received an alloc.
| alloc.bytes += be.alloc | ||
| alloc.entries++ |
| // TODO(yevgeniy): Consider making retry configurable; possibly with backoff. | ||
| return true, time.Second |
There was a problem hiding this comment.
This seems scary, we may never succeed. This would be a very hard thing to debug.
| e.alloc = Alloc{ | ||
| bytes: got.alloc, | ||
| entries: 1, | ||
| ap: b.qp, |
There was a problem hiding this comment.
please take the address of b.qp here. Otherwise this is going to allocate a new one every time.
| err error // error populated from under the quotapool | ||
| e Event | ||
| next *bufferEntry // linked-list element | ||
| alloc int64 // bytes allocated from the memQuota |
There was a problem hiding this comment.
do we still need this given we have an Alloc in e?
| // memRequest is a quotapool memory request. | ||
| type memRequest int64 |
There was a problem hiding this comment.
The downside of this change is now you're going to allocate one of these every time. The nice thing about the bufferEntry was that they were pooled.
| var EventMemoryMultiplier = settings.RegisterFloatSetting( | ||
| "changefeed.event_memory_multiplier", | ||
| "the amount of memory required to process an event is multiplied by this factor", | ||
| 2, |
There was a problem hiding this comment.
would it be crazy to start this at 3?
aa4171a to
bcc9b97
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/changefeedbase/settings.go, line 153 at r9 (raw file):
Previously, ajwerner wrote…
would it be crazy to start this at 3?
Nope. Done.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 92 at r9 (raw file):
Previously, ajwerner wrote…
please take the address of
b.qphere. Otherwise this is going to allocate a new one every time.
Done.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 176 at r9 (raw file):
Previously, ajwerner wrote…
just use merge?
Done.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 231 at r9 (raw file):
Previously, ajwerner wrote…
do we still need this given we have an
Allocine?
Nope.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 249 at r9 (raw file):
Previously, ajwerner wrote…
The downside of this change is now you're going to allocate one of these every time. The nice thing about the
bufferEntrywas that they were pooled.
I'm not sure there an allocation -- i'm casting a stack allocated number to this type.
Where is the allocation in e.g.
if err := b.qp.Acquire(ctx, memRequest(alloc)); err != nil {
return err
}
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 273 at r9 (raw file):
Previously, ajwerner wrote…
This seems scary, we may never succeed. This would be a very hard thing to debug.
I don't know if it's harder to debug than e.g. entering pushback in the first place.
I mean, we're just retrying, and waiting for memory -- just like we would if we simply were waiting for memory in the first place.
Perhaps I could add e.g. on slow acquisition option to produce log messages?
Would that alleviate your concerns?
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, @miretskiy, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 249 at r9 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I'm not sure there an allocation -- i'm casting a stack allocated number to this type.
Where is the allocation in e.g.if err := b.qp.Acquire(ctx, memRequest(alloc)); err != nil { return err }
In general, when you put a value into an interface box, there needs to be a pointer and thus on the heap. I bet if you looked at it, it would say alloc is moved to the heap. At the end of the day, something has got to give in terms of a heap allocation. At this point, you're passing almost everything around by value. The only pointer is the bufferEntry. FWIW, that's why I made it implement Request.
bcc9b97 to
7096ed3
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 249 at r9 (raw file):
Previously, ajwerner wrote…
In general, when you put a value into an interface box, there needs to be a pointer and thus on the heap. I bet if you looked at it, it would say
allocis moved to the heap. At the end of the day, something has got to give in terms of a heap allocation. At this point, you're passing almost everything around by value. The only pointer is thebufferEntry. FWIW, that's why I made it implementRequest.
Sadness.
go build -gcflags=-m ....
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go:128:40: memRequest(alloc) escapes to heap
Come BACK!
I actually liked memRequest; how it was separate and clean. I made the change to use buffer entry; alas
that means that there is 1 extra pool.Put (if we fail to acquire quota).
Also, kinda wonder if it's the case of premature optimization.
Anyways, tis done.
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, @miretskiy, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 249 at r9 (raw file):
Also, kinda wonder if it's the case of premature optimization.
oh, yeah, definitely, 100%. We allocate all over the darn place in this whole package. I have a problem and I'm sorry that I made you worry about it.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 273 at r9 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I don't know if it's harder to debug than e.g. entering pushback in the first place.
I mean, we're just retrying, and waiting for memory -- just like we would if we simply were waiting for memory in the first place.Perhaps I could add e.g.
on slow acquisitionoption to produce log messages?
Would that alleviate your concerns?
This situation is uniquely bad in that it literally may never terminate. I'd want that flashing to me if it happened, which a failed changefeed would be. My fear here is that either 1) we do acquire something under than grow call (I don't think we do) and then we're slowly starving the rest of the system of RAM or 2) we don't and there's no reason to expect the system in steady state is going to have all that memory available. I could see some finite retries here but infinite seems very bad.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 273 at r9 (raw file):
Previously, ajwerner wrote…
This situation is uniquely bad in that it literally may never terminate. I'd want that flashing to me if it happened, which a failed changefeed would be. My fear here is that either 1) we do acquire something under than grow call (I don't think we do) and then we're slowly starving the rest of the system of RAM or 2) we don't and there's no reason to expect the system in steady state is going to have all that memory available. I could see some finite retries here but infinite seems very bad.
It's definitely not 1; It is 2). I worry that the message doesn't even need to be that large; it could be arbitrary small. We could receive an event right when there is significant pressure from e.g. bulk stuff (index build or whatnot). Most/all bulk memory is used up. We try to acquire 10KB from kvFeedPool; that goes to bulk pool, and now we fail a perfectly normal changefeed over a single, tiny message.
The system is clearly under bulk monitor pressure; but we're effectively pushing back; we're letting memory to be released and we try again. As I said above: this doesn't feel significantly different from normal pushback.
In addition, any errors returned here will be retried -- and if the system is under pressure, we might be making matters worse by restarting the whole thing instead of slowing things down -- the way pushback supposed to work.
Perhaps we can discuss this further tomorrow -- but at least these are my reservations about failing here.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 273 at r9 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
It's definitely not 1; It is 2). I worry that the message doesn't even need to be that large; it could be arbitrary small. We could receive an event right when there is significant pressure from e.g. bulk stuff (index build or whatnot). Most/all bulk memory is used up. We try to acquire 10KB from kvFeedPool; that goes to bulk pool, and now we fail a perfectly normal changefeed over a single, tiny message.
The system is clearly under bulk monitor pressure; but we're effectively pushing back; we're letting memory to be released and we try again. As I said above: this doesn't feel significantly different from normal pushback.
In addition, any errors returned here will be retried -- and if the system is under pressure, we might be making matters worse by restarting the whole thing instead of slowing things down -- the way pushback supposed to work.Perhaps we can discuss this further tomorrow -- but at least these are my reservations about failing here.
I pushed another change that adds a check if request size exceeds per changefeed limit.
7096ed3 to
712e937
Compare
ajwerner
left a comment
There was a problem hiding this comment.
One thing that this does not cover is the memory used during the backfills. These have not really been as much of a problem because of the lack of a buffer. Still probably worth doing something about eventually.
Reviewed 1 of 3 files at r9, 1 of 1 files at r12.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, @miretskiy, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 273 at r9 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I pushed another change that adds a check if request size exceeds per changefeed limit.
Cool, add a slow acquisition log message and I'll be happy.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 50 at r12 (raw file):
) Buffer { opts = append(opts, quotapool.OnWaitFinish(
might be cool to have some metrics on how many messages are currently blocked. Can happen in follow-up.
miretskiy
left a comment
There was a problem hiding this comment.
Ack; We need to figure out backfill. No buffer, that's true, but it uses channel buffer, and that feeds into sinks, and sinks are tied to memory monitor; albeit "bulk IO one" and not per-changefeed one.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @HonoreDB, and @stevendanna)
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 273 at r9 (raw file):
Previously, ajwerner wrote…
Cool, add a slow acquisition log message and I'll be happy.
Done. Every 5 seconds.
pkg/ccl/changefeedccl/kvevent/blocking_buffer.go, line 50 at r12 (raw file):
Previously, ajwerner wrote…
might be cool to have some metrics on how many messages are currently blocked. Can happen in follow-up.
Do you mean how many messages are buffered? Afaik, there could only be one message that's blocked right now.
We don't have number of messages in the buffer (as a gauge), but we do have buffer in/out counts -- so you could probably get a decent sense of how many messages we have in the queue.
Also, with the newly added released counts, you can see how things are flowing through the system.
Propagate pushback information throughout changefeed pipeline. The pushback propagation is accomplished by associating a `Resource` with each event that is processed by changefeed system. The resource is propagated throughout changefeed, and is released when the event has been written to the sink. This change also updates and simplifies event memory accounting. Prior to this PR, memory accounting was incomplete and was error prone. This PR simplifies memory accounting by allocating resources once when the event arrives (blocking buffer), and releasing resources when event is written to the sink. Dynamic modifications to the amount of allocated memory are, in general, not safe (without additional complexity). To accommodate the fact that during event processing we use more memory (e.g. to parse and convert this event), we over-allocate resources to the the event. Release Notes: Enterprise change; changefeed will slow down correctly whenever there is a slow down in the system (i.e. downstream sink is slow).
712e937 to
1b20162
Compare
|
tftr |
|
Build succeeded: |
Propagate pushback information throughout changefeed pipeline.
The pushback propagation is accomplished by associating a
Resourcewith each event that is processed by changefeed system. The resource is
propagated throughout changefeed, and is released when the event
has been written to the sink.
This change also updates and simplifies event memory accounting.
Prior to this PR, memory accounting was incomplete and was error prone.
This PR simplifies memory accounting by allocating resources once when
the event arrives (blocking buffer), and releasing resources
when event is written to the sink. Dynamic modifications to the amount
of allocated memory are, in general, not safe (without additional
complexity). To accommodate the fact that during event processing we
use more memory (e.g. to parse and convert this event), we over-allocate
resources to the the event.
Release Notes: Enterprise change; changefeed will slow down correctly
whenever there is a slow down in the system (i.e. downstream sink is
slow).