-
Notifications
You must be signed in to change notification settings - Fork 962
Issue #1409: Added server side backpressure (@bug W-3651831@) #1410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
I think that the changes on SortedLedgerStorage are important. Personally I am okay with those changes but I would like to know opinions of the comunity
|
|
||
| logId = logId + 1; | ||
| final JournalChannel.BufferedChannelBuilder bcBuilder; | ||
| if (getDelay > 0 || addDelay > 0 || flushDelay > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be better to use PowerMock and to not use this kind of "explict" instrumentation.
Recently we added a lot of test cases which tweak the journal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
PowerMock in tests that extends BookKeeperClusterTestCase a bit problematic, likely due to
powermock/powermock#822
Figured a way how to use just mockito there, will update PR soon.
| */ | ||
| public class SortedLedgerStorage extends InterleavedLedgerStorage | ||
| implements LedgerStorage, CacheCallback, SkipListFlusher { | ||
| public class SortedLedgerStorage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a important change, SortedLedgerStorage will not be a InterleavedLedgerStorage anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the implications of this change? @eolivelli ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlg99 can you please describe the motto of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This simplifies testability i.e. SlowSortedLedgerStorage implementation. Reduces code duplication etc. I tried dealing with this without this change first and turn up ugly.
I am not worried about this as there is no reason to code directly against InterleavedLedgerStorage instead of CompactableLedgerStorage interface which SortedLedgerStorage still implements. Tests still pass though I had to change couple of tests that casted SortedLedgerStorage to InterleavedLedgerStorage.
If there is some specific reason that I have missed that explains why we should not change that or a reason why we should ignore interface and code against concrete implementation please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dlg99 thank you for trying.
Go head with your change.
We already told at community meeting that this is not a problem
| protected static final String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks"; | ||
| protected static final String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs"; | ||
| protected static final String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks"; | ||
| protected static final String WAIT_TIMEOUT_ON_BACKPRESSURE = "waitTimeoutOnBackpressureMs"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I this it would be better to have two separate properties, one for server and another for client.
Having two properties with the same name will be confiusing and/or we have to write docs to explain this fact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
|
retest this please |
| * 0 to allow request to fail immediately | ||
| * Default is -1 (disabled) | ||
| */ | ||
| public long getWaitTimeoutOnBackpressureMillis() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is from the client, how does the client know if the bookie is slow/dead/hung or something backpressure related? Also, how does this manifest with rest of the timeouts, say, read/write timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the client is see is that connection to bookie still exists but data cannot be send via it at the moment (or cannot be send fast enough to clean up existing backlog) and netty raises channel.isWritable() == false.
the rest is handled by client backpressure.
| * | ||
| * @return Max number of adds in progress. | ||
| */ | ||
| public int getMaxAddsInProgress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a stat rather than config. Maybe change getMaxAddsInProgressLimit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| * | ||
| * @return Max number of reads in progress. | ||
| */ | ||
| public int getMaxReadsInProgress() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment on the name as above. Adding "limit" or something like that will describe it better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| if (size > (Integer.MAX_VALUE - 1) / 2) { | ||
| // gives max of 2*1023MB for mem table (one being checkpointed and still writable). | ||
| throw new IllegalArgumentException("skiplist size over " + ((Integer.MAX_VALUE - 1) / 2)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to set the max to ( intmax-1)/2 with a warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it at explicit error (fail on start, fast detection) over the quiet value change.
| this.addEntryBlockedStats = statsLogger.getOpStatsLogger(ADD_ENTRY_BLOCKED_WAIT); | ||
| this.readEntryBlockedStats = statsLogger.getOpStatsLogger(READ_ENTRY_BLOCKED_WAIT); | ||
|
|
||
| int maxAdds = serverCfg.getMaxAddsInProgress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int maxAddsInProgressLimit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| int maxAdds = serverCfg.getMaxAddsInProgress(); | ||
| addsSemaphore = maxAdds > 0 ? new Semaphore(maxAdds, true) : null; | ||
|
|
||
| int maxReads = serverCfg.getMaxReadsInProgress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
| final long throttlingStartTimeNanos = MathUtils.nowInNano(); | ||
| channel.config().setAutoRead(false); | ||
| readsBlocked.incrementAndGet(); | ||
| readsSemaphore.acquireUninterruptibly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Info/debug message around this block will be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add
| final long throttlingStartTimeNanos = MathUtils.nowInNano(); | ||
| channel.config().setAutoRead(false); | ||
| addsBlocked.incrementAndGet(); | ||
| addsSemaphore.acquireUninterruptibly(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Info/Debug message will be helpful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add
…ing of non-responsive client
…ime when test started. Trying to fix + add more logging
9aceb8a to
7ad47de
Compare
|
retest this please |
sijie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! @dlg99
|
|
||
| protected void onAddRequestStart(Channel channel) { | ||
| if (addsSemaphore != null) { | ||
| if (!addsSemaphore.tryAcquire()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
currently all the throttling is based requests. it might be good to do throttling based on bytes, because that is much easier to translate to bandwidth usage rather requests. #1367 is one ask for that.
that can be a future improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie This is partially addressed this in memtable flush scenario but I wanted to keep configuration consistent across read and write requests (as in: requests in progress limit) and in case of reads one does not know size of the entry until index is read.
We can think about to byte-based configuration later but I feel like it is not going to be a high priority if backpressure is dynamic (like with this change) and reacts on actual throughput and not a rate-base throttling like client used to have.
|
@jvrao @eolivelli Guys, I did the changes to address your comments. Can you please take another look? |
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Great work
#shipit
jvrao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…pache#1410) (apache#167) (@bug W-3651831@) backpressure: server-side backpressure @Rev vjujjuri@ @Rev sijie@ @Rev eolivelli@
*Motivation* A strong behavior was observed when using sorted ledger storage with single entry log manager on production: "the entry log files are rotated very frequently and small entry log files are produced". The problem was introduced due to apache#1410. At current entry logger, when a new entry log file is created, EntryLogger will notify its listeners that a new entry log file is rotated via [`onRotateEntryLog`](https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java#L152). Before the change in apache#1410, `SortedLedgerStorage` inherits from `InterleavedLedgerStorage`. So when a new entry log file is rotated, `SortedLedgerStorage` is notified. However after the change in apache#1410, `SortedLedgerStorage` doesn't inherits `InterleavedLedgerStorage` anymore. Instead, the relationship is changed to composition. `SortedLedgerStorage` is composed using an interleaved ledger storage. So the entrylog listener contract was broken. `SortedLedgerStorage` will not receive any `onRotateEntryLog` notification any more. *Changes* When `SortedLedgerStorage` initializes, it passes its own entry log listener down to the interleaved ledger storage. So entry logger can notify the right person for entry log rotations. *Tests* Existing tests should cover most of the case. Looking for how to add new test cases.
Descriptions of the changes in this PR: *Motivation* A strong behavior was observed when using sorted ledger storage with single entry log manager on production: "the entry log files are rotated very frequently and small entry log files are produced". The problem was introduced due to #1410. At current entry logger, when a new entry log file is created, EntryLogger will notify its listeners that a new entry log file is rotated via [`onRotateEntryLog`](https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java#L152). Before the change in #1410, `SortedLedgerStorage` inherits from `InterleavedLedgerStorage`. So when a new entry log file is rotated, `SortedLedgerStorage` is notified. However after the change in #1410, `SortedLedgerStorage` doesn't inherits `InterleavedLedgerStorage` anymore. Instead, the relationship is changed to composition. `SortedLedgerStorage` is composed using an interleaved ledger storage. So the entrylog listener contract was broken. `SortedLedgerStorage` will not receive any `onRotateEntryLog` notification any more. *Changes* When `SortedLedgerStorage` initializes, it passes its own entry log listener down to the interleaved ledger storage. So entry logger can notify the right person for entry log rotations. *Tests* Existing tests should cover most of the case. Looking for how to add new test cases. Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Charan Reddy Guttapalem <reddycharan18@gmail.com>, Andrey Yegorov <None> This closes #1807 from sijie/fix_rotation_behavior
Descriptions of the changes in this PR: *Motivation* A strong behavior was observed when using sorted ledger storage with single entry log manager on production: "the entry log files are rotated very frequently and small entry log files are produced". The problem was introduced due to #1410. At current entry logger, when a new entry log file is created, EntryLogger will notify its listeners that a new entry log file is rotated via [`onRotateEntryLog`](https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogManagerBase.java#L152). Before the change in #1410, `SortedLedgerStorage` inherits from `InterleavedLedgerStorage`. So when a new entry log file is rotated, `SortedLedgerStorage` is notified. However after the change in #1410, `SortedLedgerStorage` doesn't inherits `InterleavedLedgerStorage` anymore. Instead, the relationship is changed to composition. `SortedLedgerStorage` is composed using an interleaved ledger storage. So the entrylog listener contract was broken. `SortedLedgerStorage` will not receive any `onRotateEntryLog` notification any more. *Changes* When `SortedLedgerStorage` initializes, it passes its own entry log listener down to the interleaved ledger storage. So entry logger can notify the right person for entry log rotations. *Tests* Existing tests should cover most of the case. Looking for how to add new test cases. Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Charan Reddy Guttapalem <reddycharan18@gmail.com>, Andrey Yegorov <None> This closes #1807 from sijie/fix_rotation_behavior (cherry picked from commit deebe6d) Signed-off-by: Sijie Guo <sijie@apache.org>
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](apache#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change apache#1410 to V2 protocol
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change #1410 to V2 protocol Descriptions of the changes in this PR:
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change #1410 to V2 protocol Descriptions of the changes in this PR: (cherry picked from commit 62400bd)
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change #1410 to V2 protocol Descriptions of the changes in this PR: (cherry picked from commit 62400bd)
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](apache#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change apache#1410 to V2 protocol Descriptions of the changes in this PR: (cherry picked from commit 62400bd) (cherry picked from commit b025f7b)
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](apache#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change apache#1410 to V2 protocol Descriptions of the changes in this PR: (cherry picked from commit 62400bd) (cherry picked from commit b025f7b)
--- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](apache#1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](apache/pulsar#14861) to be resolved. *Modification* - Apply the change apache#1410 to V2 protocol Descriptions of the changes in this PR:
Added server side backpressure handling and related unit tests.
Background:
BK’s writes happen in this order on the server side:
First, ledger storage (Interleaved|Sorted), presumably non-blocking to some level.
Second, journal.
Request is finished when journal’s write is fsynced.
Three major moving parts on the server side need to be taken into account:
Implementation:
Requests in progress (RIPs) are requests being processed by threads in threadpool + requests waiting for the next thread. RIPs lifetime if from the moment it is received/read by netty to the moment response for the request is sent.
Target limit of RIPs (heuristics) is ((number of processing threads)*2+(expected max batch size on journal)), so each thread can have one request to process and the next one waiting.
It is assumed that we have enough memory to keep data for that many requests. It is impossible to estimate size of read request at the moment when it is received anyway.
Limit is configured by setting number of RIPs explicitly in config for the following reasons:
Easier to experiment with different numbers. I.e. we may want to experiment with different number of requests in progress, i.e. ((number of processing threads)+2*(expected max batch size on journal)) or simply (2*(expected max batch size on journal)).
There is an option to run request directly on netty thread so no config parameter to base initial value on and netty’s defaults can change between versions.
Removes need for explicit enable/disable backpressure flag, instead we can set RIPs to zero.
Pause netty’s autoread when limit is exceeded to prevent it from pulling more data before we track it as RIP.
Limit number of requests in asynchronous write path (LedgerStorage)
InterleavedLedgerStorage will naturally block if write is slowed down due to i.e. fsync.
SortedLedgerStorage has naive implementation of throttling that blocks request for 1ms if checkpoint (memtable flush) is in progress. This is replaced with block until space in memtable is available. The limit is set to 2*(skipListSize) where skipListSize is limit that triggers memtable flush.
Drop response after timeout (client will not hear about that request) or close the channel (disconnect will notify the client that responses to requests from that connection will never happen).
Master Issue: #1409