rangefeed: add release valve to logical op consumption#29076
rangefeed: add release valve to logical op consumption#29076craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
|
Please don't review this yet. I'm changing up the interface a bit to be more useful. |
3c7f7e6 to
3135149
Compare
|
Ok, this is good to review now. |
tbg
left a comment
There was a problem hiding this comment.
Reviewed 5 of 5 files at r1.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale)
pkg/storage/rangefeed/processor.go, line 53 at r1 (raw file):
// Stream.SendAsync and give each stream its own individual buffer. If // an individual stream is unable to keep up, to should fail on its own. var errBufferCapacityExceeded = roachpb.NewErrorf("rangefeed: buffer capacity exceeded")
I don't know that there is a problem in this code, but we do usually mutate *Error on the way out, so there's a chance this singleton will be polluted if not cloned appropriately at any place where it's used (and so you might as well make this a factory).
3135149 to
84b6de8
Compare
nvb
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale)
pkg/storage/rangefeed/processor.go, line 53 at r1 (raw file):
Previously, tschottdorf (Tobias Schottdorf) wrote…
I don't know that there is a problem in this code, but we do usually mutate
*Erroron the way out, so there's a chance this singleton will be polluted if not cloned appropriately at any place where it's used (and so you might as well make this a factory).
Good point, done!
This change adds a new `EventChanTimeout` configuration to `rangefeed.Processor`. The config specifies the maximum duration that `ConsumeLogicalOps` and `ForwardClosedTS` will block on the Processor's input channel. Once the duration is exceeded, these methods will stops the processor with an error and tear it down. This prevents slow consumers from making callers of the two methods block. The first method is used downstream of Raft, where blocking due to slow rangefeed consumers for an unbounded amount of time is unacceptable. It may require that we resize the processor's input buffer. A TODO is left about reducing the impact of a single slow consumer on an entire rangefeed.Processor. This can be left as a future improvement. Release note: None
84b6de8 to
412bb9b
Compare
|
bors r+ |
29076: rangefeed: add release valve to logical op consumption r=nvanbenschoten a=nvanbenschoten This change adds a new `EventChanTimeout` configuration to `rangefeed.Processor`. The config specifies the maximum duration that `ConsumeLogicalOps` and `ForwardClosedTS` will block on the Processor's input channel. Once the duration is exceeded, these methods will stops the processor with an error and tear it down. This prevents slow consumers from making callers of the two methods block. The first method is used downstream of Raft, where blocking due to slow rangefeed consumers for an unbounded amount of time is unacceptable. It may require that we resize the processor's input buffer. A TODO is left about reducing the impact of a single slow consumer on an entire rangefeed.Processor. This can be left as a future improvement. Release note: None Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Build succeeded |
29500: release-2.1: backport 7 rangefeed PRs r=nvanbenschoten a=nvanbenschoten Backport: * 5/5 commits from "kv: give Transport a haircut" (#28855) * 1/1 commits from "engine: use Txn.Timestamp instead of OrigTimestamp for LogLogicalOp" (#28970) * 1/1 commits from "rangefeed: add release valve to logical op consumption" (#29076) * 3/3 commits from "kv: teach DistSender about RangeFeeds, use for changefeeds" (#28912) * 1/1 commits from "rangefeed: small perf-related changes" (#29134) * 2/2 commits from "kv: truncate RangeFeed span to range descriptor " (#29219) * 2/2 commits from "storage: hook closed timestamps into rangefeed" (#28974) Please see individual PRs for details. All cherry-picks were clean. /cc @cockroachdb/release Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This change adds a new
EventChanTimeoutconfiguration torangefeed.Processor. The config specifies the maximum durationthat
ConsumeLogicalOpsandForwardClosedTSwill block on theProcessor's input channel. Once the duration is exceeded, these
methods will stops the processor with an error and tear it down.
This prevents slow consumers from making callers of the two methods
block.
The first method is used downstream of Raft, where blocking due to
slow rangefeed consumers for an unbounded amount of time is unacceptable.
It may require that we resize the processor's input buffer.
A TODO is left about reducing the impact of a single slow consumer
on an entire rangefeed.Processor. This can be left as a future
improvement.
Release note: None