lfs/batcher: support immediate Flush()-ing#1528
Conversation
In preparation for the next commit where I'll add a Truncate() method to the batcher, the `input` channel needs to guarentee to its callers that all items within it have been processed (specifically before returning from `Add()`). Since the input channel was buffered, the following situation could easily occur: 1. Several calls to Add() are issued, which get buffered by the "input" channel. 2. Items are processed on a separate goroutine, draining the buffer. 3. While the buffer still has items in it, a call to Truncate() causes an alternate select/case branch to fire (leaving all of the buffered items to be dropped on the floor). 4. A batch is returned containing all of the _processed_ items from `input`. 5. Buffered items are (perhaps) returned in the next batch. Since the time spent blocking is short and I could not find instances where this channel buffering is relied upon, this made the most sense of all the synchronization methods (`WaitGroup`s, `Cond`s, etc)
lfs/batcher.go
Outdated
| // Add adds an item to the batcher. Add is safe to call from multiple | ||
| // goroutines. | ||
| func (b *Batcher) Add(t interface{}) { | ||
| // Add adds an item (or many items) to the batcher. Add is safe to call from |
There was a problem hiding this comment.
How about "Add sends one or more items to the batcher"?
Since the tests are fixed to no longer deadlock when an uneven amount of items are added, we can remove the sync.WaitGroup and instead use an un-buffered channel to synchronize multiple `Add()`s. # Please enter the commit message for your changes.
| b := &Batcher{ | ||
| batchSize: batchSize, | ||
| input: make(chan interface{}, batchSize), | ||
| input: make(chan interface{}), |
There was a problem hiding this comment.
Making input a single length channel is going to force adding of more than one item to block until acceptInput() pulls an item, meaning these goroutines are now forced to work in lock step. This seems a bit restrictive?
There was a problem hiding this comment.
I think this is an interesting thought. The reason I un-buffered the channel in 98873d8 and a27d92d was so that I could ensure that after calling Add() a number of times in test, that they would for sure be in the batch so that after Flush()-ing, I'd get them out.
However, I'm not actually sure we ever needed the buffered behavior in the first place. I could be wrong, since I haven't been in this part of the code in a while, but I ran some benchmarks to get a second opinion.
With buffering (on master):
~/g/g/lfs (master!) $ git rev-parse HEAD
b7b1f2b7985c8f7a18c563c70898f669ffb8c13c
~/g/g/lfs (master!) $ go test -run=BenchmarkAddingToBatcher -bench=./lfs
BenchmarkAddingToBatcher-4 10000000 151 ns/op
BenchmarkLsTreeParser-4 500000 4288 ns/op # <- ignore
PASS
ok github.com/github/git-lfs/lfs 3.880s
Without buffering (on 49489c0):
~/g/g/lfs (batcher-truncate!) $ git rev-parse HEAD
49489c0f43011c5f0638eb2fd17b993e7488a0a6
~/g/g/lfs (batcher-truncate!) $ go test -run=BenchmarkAddingToBatcher -bench=./lfs
BenchmarkAddingToBatcher-4 3000000 551 ns/op
BenchmarkLsTreeParser-4 500000 4245 ns/op # <- ignore
PASS
ok github.com/github/git-lfs/lfs 4.422s
My initial thought after seeing these numbers was to buffer the channel. For future reference, one of the ways we could go about doing this is:
Upon receiving a <-b.flush signal, spin until we have completely drained the input channel:
case <-b.input:
for {
select {
case t, ok := <-b.input:
// Do the thing that we're doing above :-(
default:
// Channel is empty!
break Acc
}
}However, @rubyist made a great point in that given that we're not adding to this batcher from multiple goroutines currently in LFS, having this sort of contention isn't a big deal. He also pointed out that since we're using this code to do network transfers, an extra 400ns/op isn't a huge deal either.
I think given our current use-case, and the additional complexity necessary to buffer the input channel, we should leave this as-is until we need to change it.
For reference, the benchmark I ran is in this gist.
There was a problem hiding this comment.
Yeah right now, we drive the batcher sequentially and don't try to parallelise the API requests so this probably isn't an issue, just wanted to point out that it is more restrictive than before (it is now more explicitly a single in, single out style). If @rubyist is cool with that then so am I.
There was a problem hiding this comment.
Here's what he said in internal chat:
yeah it looks like it's all done sequentially. if it's cleaner to not buffer, i say do it, 400ns is noise when we're talking network transfers
I agree with the assessment here. Removing buffering lets us keep this to a simple select loop without a noticeable performance impact.
lfs/batcher.go
Outdated
| // acceptInput runs in its own goroutine and accepts input from external | ||
| // clients. It fills and dispenses batches in a sequential order: for a batch | ||
| // size N, N items will be processed before a new batch is ready. | ||
| // clients. Without truncation, it fills and dispenses batches in a sequential |
There was a problem hiding this comment.
This message still mentions truncation. Can you rewrite it in a way so it calls it "flushing" somehow?
9f9b532 to
1408c85
Compare
| b := &Batcher{ | ||
| batchSize: batchSize, | ||
| input: make(chan interface{}, batchSize), | ||
| input: make(chan interface{}), |
There was a problem hiding this comment.
Here's what he said in internal chat:
yeah it looks like it's all done sequentially. if it's cleaner to not buffer, i say do it, 400ns is noise when we're talking network transfers
I agree with the assessment here. Removing buffering lets us keep this to a simple select loop without a noticeable performance impact.
This pull-request adds the ability to immediately flush the currently accumulating batch.
The motivation behind this pull-request is the ability to support multiple retries per object in the current transfer queue implementation. Currently, we keep track of the items that failed to transfer and then retry them once more before we exit in
Wait().In the future, we'd like to support retrying objects multiple times, by re-submitting them into the Batcher, and then getting batches of retries out. In order to do this, we need to know how many retries there are left. In my initial pass, I simply re-added failed items to the batcher, and waited for a batch to come out. However, a batch isn't delivered until we either fill the batch size (unlikely for retries), or Exit(), and close all of the channels.
To allow us to force a batch to be delivered immediately, we guarentee that an item has been delivered on the channel (more information in 98873d8), and then immediately Flush() the batch, giving us a batch of 1 item to be retried.
In future PRs, I plan to make this retry behavior smarter, by overflowing retry buckets, or batching the retries together. As a first pass, I just want it to work 😄
As a side 📝 , the test cases that I introduced in #613 (coincidentally, my first LFS patch) did not fully cover the behavior of the batcher, namely what happens when an uneven amount of items is added. This is what was causing the deadlock in 98873d8. With some debugging, the old tests produced:
The remaining two items are gone! No one ever checks for them. I fixed this behavior in a0d0635.
/cc @rubyist since you were the original author
/cc @technoweenie @sinbad for 👀 and 💭 s