Skip to content

lfs/batcher: support immediate Flush()-ing#1528

Merged
ttaylorr merged 11 commits intomasterfrom
batcher-truncate
Sep 20, 2016
Merged

lfs/batcher: support immediate Flush()-ing#1528
ttaylorr merged 11 commits intomasterfrom
batcher-truncate

Conversation

@ttaylorr
Copy link
Contributor

@ttaylorr ttaylorr commented Sep 19, 2016

This pull-request adds the ability to immediately flush the currently accumulating batch.

The motivation behind this pull-request is the ability to support multiple retries per object in the current transfer queue implementation. Currently, we keep track of the items that failed to transfer and then retry them once more before we exit in Wait().

In the future, we'd like to support retrying objects multiple times, by re-submitting them into the Batcher, and then getting batches of retries out. In order to do this, we need to know how many retries there are left. In my initial pass, I simply re-added failed items to the batcher, and waited for a batch to come out. However, a batch isn't delivered until we either fill the batch size (unlikely for retries), or Exit(), and close all of the channels.

To allow us to force a batch to be delivered immediately, we guarentee that an item has been delivered on the channel (more information in 98873d8), and then immediately Flush() the batch, giving us a batch of 1 item to be retried.

In future PRs, I plan to make this retry behavior smarter, by overflowing retry buckets, or batching the retries together. As a first pass, I just want it to work 😄

As a side 📝 , the test cases that I introduced in #613 (coincidentally, my first LFS patch) did not fully cover the behavior of the batcher, namely what happens when an uneven amount of items is added. This is what was causing the deadlock in 98873d8. With some debugging, the old tests produced:

Creating batcher with 5 items and size 3
Added item
Added item
Added item
Added item # <- gone
Added item # <- gone
Grabbing batch #1
Got batch size of 3
<end>

The remaining two items are gone! No one ever checks for them. I fixed this behavior in a0d0635.

/cc @rubyist since you were the original author
/cc @technoweenie @sinbad for 👀 and 💭 s

In preparation for the next commit where I'll add a Truncate() method to the
batcher, the `input` channel needs to guarentee to its callers that all items
within it have been processed (specifically before returning from `Add()`).

Since the input channel was buffered, the following situation could easily occur:
1. Several calls to Add() are issued, which get buffered by the "input" channel.
2. Items are processed on a separate goroutine, draining the buffer.
3. While the buffer still has items in it, a call to Truncate() causes an
   alternate select/case branch to fire (leaving all of the buffered items to be
   dropped on the floor).
4. A batch is returned containing all of the _processed_ items from `input`.
5. Buffered items are (perhaps) returned in the next batch.

Since the time spent blocking is short and I could not find instances where
this channel buffering is relied upon, this made the most sense of all the
synchronization methods (`WaitGroup`s, `Cond`s, etc)
lfs/batcher.go Outdated
// Add adds an item to the batcher. Add is safe to call from multiple
// goroutines.
func (b *Batcher) Add(t interface{}) {
// Add adds an item (or many items) to the batcher. Add is safe to call from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "Add sends one or more items to the batcher"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Way clearer! Changed in 49489c0.

@ttaylorr ttaylorr changed the title lfs/batcher: support immediate Truncation() lfs/batcher: support immediate Flush()-ing Sep 19, 2016
Since the tests are fixed to no longer deadlock when an uneven amount of items
are added, we can remove the sync.WaitGroup and instead use an un-buffered
channel to synchronize multiple `Add()`s.  # Please enter the commit message
for your changes.
b := &Batcher{
batchSize: batchSize,
input: make(chan interface{}, batchSize),
input: make(chan interface{}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making input a single length channel is going to force adding of more than one item to block until acceptInput() pulls an item, meaning these goroutines are now forced to work in lock step. This seems a bit restrictive?

Copy link
Contributor Author

@ttaylorr ttaylorr Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an interesting thought. The reason I un-buffered the channel in 98873d8 and a27d92d was so that I could ensure that after calling Add() a number of times in test, that they would for sure be in the batch so that after Flush()-ing, I'd get them out.

However, I'm not actually sure we ever needed the buffered behavior in the first place. I could be wrong, since I haven't been in this part of the code in a while, but I ran some benchmarks to get a second opinion.

With buffering (on master):

~/g/g/lfs (master!) $ git rev-parse HEAD
b7b1f2b7985c8f7a18c563c70898f669ffb8c13c

~/g/g/lfs (master!) $ go test -run=BenchmarkAddingToBatcher -bench=./lfs
BenchmarkAddingToBatcher-4      10000000               151 ns/op
BenchmarkLsTreeParser-4           500000              4288 ns/op   # <- ignore
PASS
ok      github.com/github/git-lfs/lfs   3.880s

Without buffering (on 49489c0):

~/g/g/lfs (batcher-truncate!) $ git rev-parse HEAD                                
49489c0f43011c5f0638eb2fd17b993e7488a0a6

~/g/g/lfs (batcher-truncate!) $ go test -run=BenchmarkAddingToBatcher -bench=./lfs
BenchmarkAddingToBatcher-4       3000000               551 ns/op
BenchmarkLsTreeParser-4           500000              4245 ns/op   # <- ignore
PASS
ok      github.com/github/git-lfs/lfs   4.422s

My initial thought after seeing these numbers was to buffer the channel. For future reference, one of the ways we could go about doing this is:

Upon receiving a <-b.flush signal, spin until we have completely drained the input channel:

case <-b.input:
for {
        select {
        case t, ok := <-b.input:
                // Do the thing that we're doing above :-(
        default:
                // Channel is empty!
                break Acc
        }
}

However, @rubyist made a great point in that given that we're not adding to this batcher from multiple goroutines currently in LFS, having this sort of contention isn't a big deal. He also pointed out that since we're using this code to do network transfers, an extra 400ns/op isn't a huge deal either.

I think given our current use-case, and the additional complexity necessary to buffer the input channel, we should leave this as-is until we need to change it.

For reference, the benchmark I ran is in this gist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right now, we drive the batcher sequentially and don't try to parallelise the API requests so this probably isn't an issue, just wanted to point out that it is more restrictive than before (it is now more explicitly a single in, single out style). If @rubyist is cool with that then so am I.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what he said in internal chat:

yeah it looks like it's all done sequentially. if it's cleaner to not buffer, i say do it, 400ns is noise when we're talking network transfers

I agree with the assessment here. Removing buffering lets us keep this to a simple select loop without a noticeable performance impact.

lfs/batcher.go Outdated
// acceptInput runs in its own goroutine and accepts input from external
// clients. It fills and dispenses batches in a sequential order: for a batch
// size N, N items will be processed before a new batch is ready.
// clients. Without truncation, it fills and dispenses batches in a sequential
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message still mentions truncation. Can you rewrite it in a way so it calls it "flushing" somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b := &Batcher{
batchSize: batchSize,
input: make(chan interface{}, batchSize),
input: make(chan interface{}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what he said in internal chat:

yeah it looks like it's all done sequentially. if it's cleaner to not buffer, i say do it, 400ns is noise when we're talking network transfers

I agree with the assessment here. Removing buffering lets us keep this to a simple select loop without a noticeable performance impact.

@ttaylorr ttaylorr merged commit 878165d into master Sep 20, 2016
@ttaylorr ttaylorr deleted the batcher-truncate branch September 20, 2016 19:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants