copy: implement atomic retry#102805
Conversation
c8e9377 to
6e9979f
Compare
430c67d to
b4d0912
Compare
cc4bdee to
b8e333e
Compare
7c01f1a to
7fee73c
Compare
otan
left a comment
There was a problem hiding this comment.
i think the changes look good to me (small q's/nits), but i'd think someone you want someone actively working on the db for signoff.
also - are we sure we want this to be backported for v23.1? is this enabled by default then already?
pkg/sql/copy_from.go
Outdated
| retErr = cleanup(ctx, retErr) | ||
| }() | ||
| if c.p.SessionData().CopyFromRetriesEnabled && c.implicitTxn { | ||
| // If we are doing retries we may leave the outter txn in the |
pkg/sql/copy_from.go
Outdated
| } | ||
| } | ||
| // If we exceeded the limit the buffering writer turns itself off and is | ||
| // zero'd, no biggee that we still read all that data, we have to do |
There was a problem hiding this comment.
super nit: i think it's biggy 😂
pkg/sql/copy_from.go
Outdated
| if bw.Cap() == 0 { | ||
| return errors.New("copy retry aborted, sql.copy.retry.max_size exceeded") | ||
| } | ||
| _ = c.initReader(bw.GetReader()) |
There was a problem hiding this comment.
nit: don't think you need _ =
| if err2 := c.resetForReplay(ctx, r, bw); err2 != nil { | ||
| return errors.CombineErrors(err, err2) | ||
| } else { | ||
| // We can only replay once. |
There was a problem hiding this comment.
any reason we do it this way? enforcing something we did earlier?
cucaroach
left a comment
There was a problem hiding this comment.
Thanks, will wait for @rafiss's review for sure. The thinking was it would be enabled by default on 23.2 and disabled if we backported to 23.1. But its a good question, its possible this is too risky to backport.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, and @rafiss)
pkg/sql/copy_from.go line 630 at r2 (raw file):
Previously, otan (Oliver Tan) wrote…
any reason we do it this way? enforcing something we did earlier?
Not really, I just thought retrying a massive copy more than once could be disruptive I guess? I could make it do N retries if we think that might be necessary.
pkg/sql/copy_from.go line 676 at r2 (raw file):
Previously, otan (Oliver Tan) wrote…
super nit: i think it's biggy 😂
Done.
I haven't reviewed this in detail, but it seems like a relatively large, invasive change. I'd advocate for not backporting unless we have a strong reason to do so. |
I agree. FYI @awoods187 @ajstorm |
|
I'm okay not backporting if the risk is sufficiently large. Would we be okay backporting and grouping it with the other AWS DMS specific settings? Or should we just rely on 23.2+? |
|
Given the recent problems we've had with 23.1 releases, I'm quite negative on backporting anything that's not critical. |
|
Previously, cucaroach (Tommy Reilly) wrote…
i think we already "retry" in existing code somewhere around here: cockroach/pkg/sql/conn_executor.go Lines 3267 to 3283 in 13addf5 do you know how often it retries? iirc it retries indefinitely. |
I don't, COPY as things are written doesn't go through this code because we don't call setTxnRewindPos. I never considered using the existing machinery for COPY retry, not sure how suitable it would be or if that's even what you're suggesting. My gut tells me if COPY fails twice somethings wrong and better to alert the user by failing back to the client than repeating indefinitely but I could see the argument for small copies that we should retry harder, although small copies are probably much less likely to fail with retriable errors. My hope is that KV will eventually do something about #98553 and this will be less of an issue. I've been running the copyfrom atomic in loops on roachtest and have yet to get it to fail. |
rafiss
left a comment
There was a problem hiding this comment.
thanks for the work here!
- for commit 1: most of my comments are about making sure the refactor is safe
- for commit 2: my main comment is a more fundamental question about where retries were falling short before
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @cucaroach, @msirek, @otan, @smg260, and @srosenberg)
-- commits line 50 at r4:
nit: this is not a valid release note category. the categories are here: https://cockroachlabs.atlassian.net/wiki/spaces/CRDB/pages/186548364/Release+notes#Categories
for this, i'd choose sql change
pkg/sql/copy_from.go line 630 at r2 (raw file):
Previously, otan (Oliver Tan) wrote…
i think we already "retry" in existing code somewhere around here:
cockroach/pkg/sql/conn_executor.go
Lines 3267 to 3283 in 13addf5
do you know how often it retries? iirc it retries indefinitely.
this should be discussed further - ever since #97808 it should be the case that atomic copies already get retried automatically, thanks to this bit that oliver linked to, and the state machine defined in conn_fsm.go
but i take it that wasn't working, since you made this PR. one thing that would help me understand these changes better is if you could provide a repro for a situation when an atomic COPY was not being retried when it should have been.
i'm a bit anxious about this change since it seems to reimplement a lot of existing stuff from the conn_executor, and the goal of #97808 was to move towards having less reimplementation of conn_exeuctor logic
pkg/sql/copy_from.go line 525 at r3 (raw file):
// protocol messages into a stream of just the data bytes. var cr copy.Reader cr.Init(c.conn.Rd(), c.p.execCfg.SV())
nit: instead of initializing a zero value then immediately calling Init, can we instead make a function like copy.NewReader(...) that returns a fully initialized reader right away?
pkg/sql/copy_from.go line 561 at r3 (raw file):
} default: panic("unknown copy format")
nit: can we just return an AssertionFailed instead of a panic that crashes the node?
pkg/sql/copy_from.go line 728 at r3 (raw file):
) } if c.binaryState == binaryStateNeedSignature {
nit: let's move the binaryStateNeedSignature case back into the switch statement.
pkg/sql/copy_from.go line 798 at r3 (raw file):
} _, err = c.rows.AddRow(ctx, datums) return err
nit: can you go back to the old code?
if err != nil {
return nil, err
}
return nil, nil
it's a more explicit way to do error handling, and it's all too common for us to get bugs because if code gets copy-pasted and the original code did not have an explicit error check, it's easy to forget to include it in the new place
pkg/sql/copy_from.go line 72 at r4 (raw file):
// copies that fail due to retriable errors. If an atomic copy under implicit // transaction control encounters a retriable error and the amount of the copy // data seen so far is less than copyRetryBufferSize, we will start a new
nit: the sentence in the comment is not complete.
pkg/sql/copy_from.go line 76 at r4 (raw file):
settings.TenantWritable, "sql.copy.retry.max_size", "set to non-zero to enable automatic copy retry",
nit: the description of the setting is not complete. it should say what the non-zero value means. the info you put in the comment would make sense for the setting description too.
pkg/sql/copy/reader.go line 30 at r3 (raw file):
type Reader struct { // We don't use this buffer but we use some of its helper methods. pgwirebase.ReadBuffer
nit: i would vote for not embedding this struct. i think it would make the initialization of this readbuffer less cumbersome. (no need for the separate Init function)
it also makes it so the copy.Reader does not expose a bunch of methods on it that would actually be pretty bogus to call. (for example, we don't want users of copy.Reader to call ReadUntypedMsg on it)
if we make it a separate field then this should be a *pgwirebase.ReadBuffer
pkg/sql/copy/reader.go line 55 at r3 (raw file):
return 0, io.EOF } // If we had a short read, finish it.
nit: there seems to be duplicated code here. how about this:
if c.remainder == 0 {
// Go to pgwire to get next segment.
size, err := c.readTypedMessage()
if err != nil {
return 0, err
}
c.remainder = size
}
// We never want to overread from the wire, we might read past COPY data
// segments so limit p to remainder bytes.
if c.remainder < len(p) {
p = p[:c.remainder]
}
n, err := c.pgr.Read(p)
if err != nil {
return 0, err
}
c.remainder -= n
return n, nil
pkg/sql/copy/reader.go line 89 at r3 (raw file):
} func (c *Reader) readTypedMessage() (size int, err error) {
nit: it's fairly confusing to have a readTypedMessage function here, when copy.Reader is embedding a pgwirebase.ReadBuffer, which has its own ReadTypedMsg function.
pkg/sql/copy/reader.go line 95 at r3 (raw file):
} typ := pgwirebase.ClientMessageType(b) _, size, err = c.ReadUntypedMsgSize(c.pgr)
hmmm this handling of "message too big" errors seems wrong to me. the old code was doing this:
typ, _, err := readBuf.ReadTypedMsg(c.conn.Rd())
followed by the handling of the MessageTooBigError
that is quite different from what is happening now. previously, we'd try to read the whole message; but now we only try to read the message size. where is the handling for MessageTooBig when reading the message itself? i'm thinking that this code should still be using (*pgwirebase.ReadBuffer).ReadTypedMsg, which correctly wraps the error with MessageTooBig when reading the message.
pkg/util/encoding/csv/reader.go line 164 at r3 (raw file):
// Report parse error on empty lines. DontSkipEmptyLines bool
what is the purpose of this? it seems like it is always set to true
cucaroach
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @smg260, and @srosenberg)
pkg/sql/copy_from.go line 630 at r2 (raw file):
Previously, rafiss (Rafi Shamim) wrote…
this should be discussed further - ever since #97808 it should be the case that atomic copies already get retried automatically, thanks to this bit that oliver linked to, and the state machine defined in conn_fsm.go
but i take it that wasn't working, since you made this PR. one thing that would help me understand these changes better is if you could provide a repro for a situation when an atomic COPY was not being retried when it should have been.
i'm a bit anxious about this change since it seems to reimplement a lot of existing stuff from the conn_executor, and the goal of #97808 was to move towards having less reimplementation of conn_exeuctor logic
Its never been the case that atomic copies get retried automatically, this is because the logic in updateTxnRewindPosMaybe says CopyIn and CopyOut can't advance and setTxnRewindPos is never called. I didn't really think about changing that, maybe I should have?
2799055 to
8a2a11d
Compare
Add 3 node config for more retry coverage and turn off backups since its a perf test. Release note: none Epic: CRDB-25321 Informs: cockroachdb#99327
Refactor COPY so that all the buffer reading takes place in a separate implementation of the io.Reader interface. This does two things, it enables the COPY implementation to efficiently handle small CopyData frames by eliminating extra buffering and exposes the COPY bytes as a pure stream of bytes which makes retry easier. It also cleans up the COPY code that handles CopyData segments straddling line boundaries, now we can just let the text/CSV reader do their thing and not have to do any writeback. The old implementation would read from a pgwire BufferedReader (copy 1) into a pgwire "ReadBuffer" (copy 2) and then push those segments into a bytes.Buffer "buf" (copy 3). The text and binary readers would read right from buf the CSV reader has its own buffer and we would read lines from buf and write them into the CSV reader's buffer (copy 4). The new approach does away with all this and the text format reads directly from a bufio.Reader (copy 1) stacked on the copy.Reader (no buffering) stacked on the pgwire BufferedReader (copy 2). For CSV the CSVReader reads directly from the copy.Reader since it has its own buffer so again only two copies off the wire. Binary reads directly from the copy.Reader since it requires no ReadLine (but the I/O is still buffered at the pgwire level). This doesn't seem to affect performance much but it gives the GC a nice break and sets up a clean solution for cockroachdb#99327. When encountering a memory usage error we used to try to let the encoder finish the row but with the more efficient buffering this started succeeds where it always failed before. Now we just don't do the hail mary and if we hit the limit we bail and return immediately, this is more OOM safe and simpler. Fixes: cockroachdb#93156 Informs: cockroachdb#99327 Release note: none Epic: CRDB-25321
Previously any errors encountered during atomic copies would just be passed back to the client which could be very disruptive to large migrations. Now we buffer up to sql.copy.retry.max_size bytes and if an error is encountered we replay the COPY from the beginning with a new transaction. Release note (sql change): Atomic COPY commands can now be retried increasing the rate of success of DMS operations especially when atomic COPY behavior is required. Fixes: cockroachdb#99327 Epic: CRDB-25321
cucaroach
left a comment
There was a problem hiding this comment.
Thanks for the initial review! I wonder if I should push the refactor first and let it bake for a week and then push retry stuff. I'll plan on doing that.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @rafiss, @smg260, and @srosenberg)
pkg/sql/copy/reader.go line 55 at r3 (raw file):
Previously, rafiss (Rafi Shamim) wrote…
nit: there seems to be duplicated code here. how about this:
if c.remainder == 0 { // Go to pgwire to get next segment. size, err := c.readTypedMessage() if err != nil { return 0, err } c.remainder = size } // We never want to overread from the wire, we might read past COPY data // segments so limit p to remainder bytes. if c.remainder < len(p) { p = p[:c.remainder] } n, err := c.pgr.Read(p) if err != nil { return 0, err } c.remainder -= n return n, nil
Nice!
pkg/sql/copy/reader.go line 95 at r3 (raw file):
Previously, rafiss (Rafi Shamim) wrote…
hmmm this handling of "message too big" errors seems wrong to me. the old code was doing this:
typ, _, err := readBuf.ReadTypedMsg(c.conn.Rd())followed by the handling of the
MessageTooBigErrorthat is quite different from what is happening now. previously, we'd try to read the whole message; but now we only try to read the message size. where is the handling for
MessageTooBigwhen reading the message itself? i'm thinking that this code should still be using(*pgwirebase.ReadBuffer).ReadTypedMsg, which correctly wraps the error withMessageTooBigwhen reading the message.
All the message size checking is in ReadUntypedMsgSize before we actually read the message so I think we're good. There are tests for this to make sure it works.
pkg/util/encoding/csv/reader.go line 164 at r3 (raw file):
Previously, rafiss (Rafi Shamim) wrote…
what is the purpose of this? it seems like it is always set to
true
The purpose is to not break other consumers of csv.Reader like import.
cucaroach
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @rafiss, @smg260, and @srosenberg)
pkg/sql/copy_from.go line 561 at r3 (raw file):
Previously, rafiss (Rafi Shamim) wrote…
nit: can we just return an
AssertionFailedinstead of a panic that crashes the node?
Hmm, reviewable lost the context here and I'm not sure what this refers to.
cucaroach
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @rafiss, @smg260, and @srosenberg)
pkg/sql/copy/reader.go line 89 at r3 (raw file):
Previously, rafiss (Rafi Shamim) wrote…
nit: it's fairly confusing to have a
readTypedMessagefunction here, whencopy.Readeris embedding apgwirebase.ReadBuffer, which has its ownReadTypedMsgfunction.
I wanted to lean on ReadUntypeMsgSize to get the message too big handling but otherwise skip the ReadBuffer's buffering, open to suggestions!
rafiss
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @cucaroach, @msirek, @otan, @smg260, and @srosenberg)
pkg/sql/copy_from.go line 630 at r2 (raw file):
I didn't really think about changing that, maybe I should have?
to me, it feels like doing the retries there would be a much less invasive refactor and is less risky since we won't have to manage transaction state in new bespoke ways. introducing new handles/callbacks on the txn object are the sorts of things that make me worried about regressions.
so i do think it is worth trying to make the change in updateTxnRewindPosMaybe. if it isn't possible to do there, i'd like us to understand the reason so we can add comments and explanation
A little late in the game to go back to the drawing board but I can look into it. The I/O refactoring was important for fixing #93156 and stands apart from the retry stuff. Can we agree to move forward with the copy I/O refactoring as a separate commit while I revisit the retry approach? |
copy: enhance copyfrom roachtest
Add 3 node config for more retry coverage and turn off backups since its
a perf test.
Release note: none
Epic: CRDB-25321
Informs: #99327
copy: replay atomic copies if we encounter retriable error
Previously any errors encountered during atomic copies would just be
passed back to the client which could be very disruptive to large
migrations. Now we buffer up to sql.copy.retry.max_size bytes and
if an error is encountered we replay the COPY from the beginning with
a new transaction.
Release note (sql change): Atomic COPY commands can now be retried
increasing the rate of success of DMS operations especially when atomic
COPY behavior is required.
Fixes: #99327
Epic: CRDB-25321
copy: new copy IO implementation
Refactor COPY so that all the buffer reading takes place in a separate
implementation of the io.Reader interface. This does two things, it
enables the COPY implementation to efficiently handle small CopyData
frames by eliminating extra buffering and exposes the COPY bytes as
a pure stream of bytes which makes retry easier. It also cleans up the
COPY code that handles CopyData segments straddling line boundaries, now
we can just let the text and CSV readers do their thing and not have to do any
writeback.
The old implementation would read from a pgwire BufferedReader (copy 1)
into a pgwire "ReadBuffer" (copy 2) and then push those segments into a
bytes.Buffer "buf" (copy 3). The text and binary readers would read
right from buf but the CSV reader has its own buffer and we would read
lines from buf and write them into the CSV reader (copy 4).
The new approach does away with all this and the text format
reads directly from a bufio.Reader (copy 1) stacked on the copy.Reader
(no buffering) stacked on the pgwire BufferedReader (copy 2). For CSV
the CSVReader reads directly from the copy.Reader since it has its
own buffer so again only two copies off the wire. Binary reads directly
from the copy.Reader since it requires no ReadLine (but the I/O is still
buffered at the pgwire level).
This doesn't seem to affect performance much but it gives the GC a nice
break and sets up a clean solution for #99327.
Fixes: #93156
Informs: #99327
Release note: none
Epic: CRDB-25321