tq: prioritize transferring retries before new items#1758
tq: prioritize transferring retries before new items#1758
Conversation
tq/adapterbase.go
Outdated
There was a problem hiding this comment.
Maybe I'm missing something, but if you're adding len(transfers) to jobWait, and Done is only ever called as a response to completion of items in jobChan on a 1:1 basis, jobWait.Wait() should never return before this loop is done, therefore the close in End() is safe?
There was a problem hiding this comment.
This is correct, if the calling code occurs in a single goroutine. Consider this:
func main() {
var a *adapterBase
go a.Add(myBatch)
a.Wait()
}If we're say, 50 items into myBatch before the execution scheduler takes us to the a.Wait() call, it'll close a.jobChan, which is what we expect. Once the scheduler takes us back to the goroutine, it'll try and process the 51st item and write into a closed channel, which is a panic. Even if it weren't a panic, we don't want to lose those items.
There was a problem hiding this comment.
Excuse me, I hit enter a little too early :-). What I wanted to add is that even though this is a fairly fringe scenario, I still think it's important to consider.
There was a problem hiding this comment.
I must be being stupid, because I don't understand how that could be an issue. Since Add() increments the wait group by len(myBatch), it doesn't matter how far through myBatch you get, the wait group won't be able to proceed past Wait() until Done() has been called that many times. So I'm not seeing where the race comes from...
There was a problem hiding this comment.
Doh! Sorry, I thought you were asking about my use of a.jobWait.Wait() in Done(), which I added after I added that // BUG(taylor) comment while debugging. Looks like I just forgot to remove that comment, which I did in ce08f66. I think we just found out in a round-about way that we're on the same page!
tq/adapterbase.go
Outdated
There was a problem hiding this comment.
I'm unreasonably happy with how job.Done() scans 😄
tq/transfer_queue.go
Outdated
There was a problem hiding this comment.
Since you have to supply the max as an argument I'm not sure what value this function adds?
There was a problem hiding this comment.
Yeah, I was a little hesitant about adding this function. I think it provides minimal value for the reason that you stated, and it expands the Batch interface. I've removed it in ced37c6c.
tq/transfer_queue.go
Outdated
There was a problem hiding this comment.
Sorry, I don't understand what benefit this sort is giving you. Since you're only sorting within the batch, and the batch is the unit that is blocked on, what does it matter what the ordering is inside the batch?
I could imagine that if you sorted all the transfers as a whole so that bigger files made it into the first batch that this would alter the blocking behaviour, but sorting inside the batch seems incidental. Please explain?
There was a problem hiding this comment.
No worries, I wasn't quite clear with my reasoning here. The idea is to reduce the likelihood of a single worker getting tied up processing one very large item at the end of a batch while others are sitting idle. Since enqueuing the next batch is contingent on processing the current batch completely, idle workers should be treated as something we need to minimize as much as possible.
Specifically, the scenario I'm thinking about is a batch of 100 items where the first 99 are 5MB and the last item is 2GB. Processed in sequence, the workers will quickly finish transferring the small items. Towards the very end of the sequence, one worker will be sitting transferring a very large object, while the remaining workers are sitting doing nothing.
If the 2GB item is processed first, it ties a worker up for the duration of the transfer as it always would, but it allows the other workers to tear through the small items, minimizing the chance that a small number of large transfers will prolong the duration of the batch.
The sort makes this property much less noticeable, as does a larger number of workers, or a more evenly sized workload.
I went ahead and clarified this in b9aa0986.
There was a problem hiding this comment.
OK I see, so it's saving you from the case where one massive file is the absolute last thing off the queue when everything else is already finished. I'm not sure if in practice it's likely to make much of difference but I understand where you're coming from now :)
tq/transfer_queue.go
Outdated
There was a problem hiding this comment.
I like how the original & retries are merged into one batch, makes everything simpler.
tq/transfer_queue.go
Outdated
There was a problem hiding this comment.
This effectively means outChan is going to be nil for the most common case, which isn't entirely clear & could be confusing. Probably time to retire outChan entirely?
|
I'd really like to remove the coupling on the |
e1f68be to
886dbcc
Compare
This makes much more sense than the approach I pursued. I was finished with the in-place refactoring on Saturday night, and wanted to keep moving forward. However, I didn't want to create two PRs that would be in-flight at once, since I figured that'd be tough to review. Instead, I opted to create one large PR, which was a mistake.
I think that's a good idea. I reset |
f9e4888 to
7745140
Compare
|
@technoweenie I implemented the changes that we discussed this morning in cf409ac...d7bf608. 😄 |
technoweenie
left a comment
There was a problem hiding this comment.
Looks great, thanks for taking care of my notes from our IRL review. There's a number of things I want addressed before we can merge to master and make the tq package a real thing, but they should happen in separate PRs.
Rough list (that should probably be expanded into a full issue):
- Try to de-couple
configpackage, usingconfig.Environmentinterface. - Change
Begin(maxConcurrency int, cb ProgressCallback)interface function to `Begin(m *Manifest, cb ProgressCallback), so we can future proof this entry point. - Figure out that Transfer/Transferrable/api.ObjectResource stuff. Ideally there's a single
tq.Objectstruct or something.
|
@technoweenie sounds good, and thanks for the 👍. I'm thinking that the follow-up PRs can happen in the order that you suggested, since the only strict dependency is that the Additionally before we merge I think after all of that, this should be 🆗 to merge into |
|
Opened up a tracking issue and am tracking progress in #1764. |
This pull-request implements priority retries, and moves
TransferQueue-related code into a new package,tq, as discussed in #1651.Why?
In an effort to better support transferring large numbers of objects through the
*tq.TransferQueue, we need to buffer less data than we currently do. One way to do this is to process retries up front, instead of keeping data in memory about failed objects around until the end of the transfer. Normally, this isn't a huge problem, but if the transfer queue fails half a million objects, we can't afford to keep that in memory.What?
This pull request brings a number of benefits to the
*TransferQueuetype:*TransferQueuecan't accept more items, there's no sense in wasting a large amount of disk and CPU usage scanning the Git data. Previously, instances ofgit rev-list,git cat-fileandgit cat-file --batchwould run ad nauseam even if theTransferQueuecouldn't accept new items. Now,Add()will block after a maximum buffer depth has been reached. In other words, by default,Add()will accept 200 (twice the default batch size of 100) items before apply back-pressure up to thegitscanner, causing it to wait.Add()-ed, data about that object can be kept in memory for less time. By treating retries as part of the next batch and prioritizing them, we make less API calls.Inner-workings
This pull-request fundamentally changes the way that we process batches and retries in the transfer queue. Here's a breakdown of what goes on:
<-q.incomingchannel is closed, append fromAdd()via<-q.incomingitems into the batch.<-q.incomingchannel is closed AND the next batch is empty, exit.This requires us to change how the
*tq.adapterBase(previously*transfer.adapterBase) type is implemented. The changes can be summarized by the signature change. What used to be:is now:
by synchronizing the results with a batch of
*Transferitems, we can deterministically pre-fill the next batch with failed items from the last one.This change comes at a cost, which is that in order to know when to
close()theresultschannel, we must wait for all of the transfer adapters to finish their in-progress tasks. This effect will be un-noticeable if the items in a transfer set are of uniform size, but is more pronounced when items with wildly different sizes are processed in a random order with a poor network connection.To combat this, items in a batch are sorted in descending object size to minimize the chance that a worker will get tied up on a large object as the last item in a batch. This change, implemented in 6decfe4 should make this a non-issue.
Since this PR is a on the bigger side, here's a convenient break-down of the changes:
batchSizeinstance variable (andOptionFn) to allow configuring the batch size.lfs/transfer_queue.goandtransferpackage into new package,tq.tqpackage to avoid stuttering./cc @git-lfs/core