Skip to content

copy: implement atomic retry#102805

Open
cucaroach wants to merge 3 commits intocockroachdb:masterfrom
cucaroach:copyio_new
Open

copy: implement atomic retry#102805
cucaroach wants to merge 3 commits intocockroachdb:masterfrom
cucaroach:copyio_new

Conversation

@cucaroach
Copy link
Copy Markdown
Contributor

@cucaroach cucaroach commented May 5, 2023

copy: enhance copyfrom roachtest

Add 3 node config for more retry coverage and turn off backups since its
a perf test.

Release note: none
Epic: CRDB-25321
Informs: #99327

copy: replay atomic copies if we encounter retriable error

Previously any errors encountered during atomic copies would just be
passed back to the client which could be very disruptive to large
migrations. Now we buffer up to sql.copy.retry.max_size bytes and
if an error is encountered we replay the COPY from the beginning with
a new transaction.

Release note (sql change): Atomic COPY commands can now be retried
increasing the rate of success of DMS operations especially when atomic
COPY behavior is required.
Fixes: #99327
Epic: CRDB-25321

copy: new copy IO implementation

Refactor COPY so that all the buffer reading takes place in a separate
implementation of the io.Reader interface. This does two things, it
enables the COPY implementation to efficiently handle small CopyData
frames by eliminating extra buffering and exposes the COPY bytes as
a pure stream of bytes which makes retry easier. It also cleans up the
COPY code that handles CopyData segments straddling line boundaries, now
we can just let the text and CSV readers do their thing and not have to do any
writeback.

The old implementation would read from a pgwire BufferedReader (copy 1)
into a pgwire "ReadBuffer" (copy 2) and then push those segments into a
bytes.Buffer "buf" (copy 3). The text and binary readers would read
right from buf but the CSV reader has its own buffer and we would read
lines from buf and write them into the CSV reader (copy 4).

The new approach does away with all this and the text format
reads directly from a bufio.Reader (copy 1) stacked on the copy.Reader
(no buffering) stacked on the pgwire BufferedReader (copy 2). For CSV
the CSVReader reads directly from the copy.Reader since it has its
own buffer so again only two copies off the wire. Binary reads directly
from the copy.Reader since it requires no ReadLine (but the I/O is still
buffered at the pgwire level).

This doesn't seem to affect performance much but it gives the GC a nice
break and sets up a clean solution for #99327.

Fixes: #93156
Informs: #99327
Release note: none
Epic: CRDB-25321

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@cucaroach cucaroach force-pushed the copyio_new branch 3 times, most recently from c8e9377 to 6e9979f Compare May 10, 2023 14:37
@cucaroach cucaroach force-pushed the copyio_new branch 4 times, most recently from 430c67d to b4d0912 Compare May 23, 2023 23:50
@cucaroach cucaroach changed the title wip: new copy IO implementation copy: implement atomic retry May 24, 2023
@cucaroach cucaroach force-pushed the copyio_new branch 5 times, most recently from cc4bdee to b8e333e Compare May 30, 2023 14:19
@cucaroach cucaroach force-pushed the copyio_new branch 3 times, most recently from 7c01f1a to 7fee73c Compare June 5, 2023 12:04
@cucaroach cucaroach marked this pull request as ready for review June 5, 2023 12:04
@cucaroach cucaroach requested review from a team as code owners June 5, 2023 12:04
@cucaroach cucaroach requested review from a team, msirek, otan and rafiss and removed request for a team June 5, 2023 12:04
@cucaroach cucaroach added the backport-23.1.x PAST MAINTENANCE SUPPORT: 23.1 patch releases via ER request only label Jun 6, 2023
Copy link
Copy Markdown
Contributor

@otan otan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the changes look good to me (small q's/nits), but i'd think someone you want someone actively working on the db for signoff.

also - are we sure we want this to be backported for v23.1? is this enabled by default then already?

retErr = cleanup(ctx, retErr)
}()
if c.p.SessionData().CopyFromRetriesEnabled && c.implicitTxn {
// If we are doing retries we may leave the outter txn in the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: outer

}
}
// If we exceeded the limit the buffering writer turns itself off and is
// zero'd, no biggee that we still read all that data, we have to do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: i think it's biggy 😂

if bw.Cap() == 0 {
return errors.New("copy retry aborted, sql.copy.retry.max_size exceeded")
}
_ = c.initReader(bw.GetReader())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't think you need _ =

if err2 := c.resetForReplay(ctx, r, bw); err2 != nil {
return errors.CombineErrors(err, err2)
} else {
// We can only replay once.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we do it this way? enforcing something we did earlier?

@blathers-crl blathers-crl bot requested a review from otan June 13, 2023 11:01
Copy link
Copy Markdown
Contributor Author

@cucaroach cucaroach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will wait for @rafiss's review for sure. The thinking was it would be enabled by default on 23.2 and disabled if we backported to 23.1. But its a good question, its possible this is too risky to backport.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, and @rafiss)


pkg/sql/copy_from.go line 630 at r2 (raw file):

Previously, otan (Oliver Tan) wrote…

any reason we do it this way? enforcing something we did earlier?

Not really, I just thought retrying a massive copy more than once could be disruptive I guess? I could make it do N retries if we think that might be necessary.


pkg/sql/copy_from.go line 676 at r2 (raw file):

Previously, otan (Oliver Tan) wrote…

super nit: i think it's biggy 😂

Done.

@rytaft
Copy link
Copy Markdown
Collaborator

rytaft commented Jun 13, 2023

But its a good question, its possible this is too risky to backport.

I haven't reviewed this in detail, but it seems like a relatively large, invasive change. I'd advocate for not backporting unless we have a strong reason to do so.

@cucaroach
Copy link
Copy Markdown
Contributor Author

I haven't reviewed this in detail, but it seems like a relatively large, invasive change. I'd advocate for not backporting unless we have a strong reason to do so.

I agree. FYI @awoods187 @ajstorm

@awoods187
Copy link
Copy Markdown

I'm okay not backporting if the risk is sufficiently large. Would we be okay backporting and grouping it with the other AWS DMS specific settings? Or should we just rely on 23.2+?

@rytaft
Copy link
Copy Markdown
Collaborator

rytaft commented Jun 13, 2023

Given the recent problems we've had with 23.1 releases, I'm quite negative on backporting anything that's not critical.

@otan
Copy link
Copy Markdown
Contributor

otan commented Jun 14, 2023

pkg/sql/copy_from.go line 630 at r2 (raw file):

Previously, cucaroach (Tommy Reilly) wrote…

Not really, I just thought retrying a massive copy more than once could be disruptive I guess? I could make it do N retries if we think that might be necessary.

i think we already "retry" in existing code somewhere around here:

if retriable {
var rc rewindCapability
var canAutoRetry bool
if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled {
rc, canAutoRetry = ex.getRewindTxnCapability()
}
ev := eventRetriableErr{
IsCommit: fsm.FromBool(isCommit(stmt)),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
payload := eventRetriableErrPayload{
err: err,
rewCap: rc,
}
return ev, payload
}

do you know how often it retries? iirc it retries indefinitely.

@cucaroach
Copy link
Copy Markdown
Contributor Author

i think we already "retry" in existing code somewhere around here:

if retriable {
var rc rewindCapability
var canAutoRetry bool
if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled {
rc, canAutoRetry = ex.getRewindTxnCapability()
}
ev := eventRetriableErr{
IsCommit: fsm.FromBool(isCommit(stmt)),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
payload := eventRetriableErrPayload{
err: err,
rewCap: rc,
}
return ev, payload
}

do you know how often it retries? iirc it retries indefinitely.

I don't, COPY as things are written doesn't go through this code because we don't call setTxnRewindPos. I never considered using the existing machinery for COPY retry, not sure how suitable it would be or if that's even what you're suggesting. My gut tells me if COPY fails twice somethings wrong and better to alert the user by failing back to the client than repeating indefinitely but I could see the argument for small copies that we should retry harder, although small copies are probably much less likely to fail with retriable errors. My hope is that KV will eventually do something about #98553 and this will be less of an issue.

I've been running the copyfrom atomic in loops on roachtest and have yet to get it to fail.

@cucaroach cucaroach requested a review from a team as a code owner June 14, 2023 12:46
@cucaroach cucaroach requested review from smg260 and srosenberg and removed request for a team June 14, 2023 12:46
@cucaroach cucaroach removed the backport-23.1.x PAST MAINTENANCE SUPPORT: 23.1 patch releases via ER request only label Jun 20, 2023
Copy link
Copy Markdown
Collaborator

@rafiss rafiss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the work here!

  • for commit 1: most of my comments are about making sure the refactor is safe
  • for commit 2: my main comment is a more fundamental question about where retries were falling short before

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach, @msirek, @otan, @smg260, and @srosenberg)


-- commits line 50 at r4:
nit: this is not a valid release note category. the categories are here: https://cockroachlabs.atlassian.net/wiki/spaces/CRDB/pages/186548364/Release+notes#Categories

for this, i'd choose sql change


pkg/sql/copy_from.go line 630 at r2 (raw file):

Previously, otan (Oliver Tan) wrote…

i think we already "retry" in existing code somewhere around here:

if retriable {
var rc rewindCapability
var canAutoRetry bool
if ex.implicitTxn() || !ex.sessionData().InjectRetryErrorsEnabled {
rc, canAutoRetry = ex.getRewindTxnCapability()
}
ev := eventRetriableErr{
IsCommit: fsm.FromBool(isCommit(stmt)),
CanAutoRetry: fsm.FromBool(canAutoRetry),
}
payload := eventRetriableErrPayload{
err: err,
rewCap: rc,
}
return ev, payload
}

do you know how often it retries? iirc it retries indefinitely.

this should be discussed further - ever since #97808 it should be the case that atomic copies already get retried automatically, thanks to this bit that oliver linked to, and the state machine defined in conn_fsm.go

but i take it that wasn't working, since you made this PR. one thing that would help me understand these changes better is if you could provide a repro for a situation when an atomic COPY was not being retried when it should have been.

i'm a bit anxious about this change since it seems to reimplement a lot of existing stuff from the conn_executor, and the goal of #97808 was to move towards having less reimplementation of conn_exeuctor logic


pkg/sql/copy_from.go line 525 at r3 (raw file):

	// protocol messages into a stream of just the data bytes.
	var cr copy.Reader
	cr.Init(c.conn.Rd(), c.p.execCfg.SV())

nit: instead of initializing a zero value then immediately calling Init, can we instead make a function like copy.NewReader(...) that returns a fully initialized reader right away?


pkg/sql/copy_from.go line 561 at r3 (raw file):

		}
	default:
		panic("unknown copy format")

nit: can we just return an AssertionFailed instead of a panic that crashes the node?


pkg/sql/copy_from.go line 728 at r3 (raw file):

		)
	}
	if c.binaryState == binaryStateNeedSignature {

nit: let's move the binaryStateNeedSignature case back into the switch statement.


pkg/sql/copy_from.go line 798 at r3 (raw file):

	}
	_, err = c.rows.AddRow(ctx, datums)
	return err

nit: can you go back to the old code?

	if err != nil {
		return nil, err
	}
	return nil, nil

it's a more explicit way to do error handling, and it's all too common for us to get bugs because if code gets copy-pasted and the original code did not have an explicit error check, it's easy to forget to include it in the new place


pkg/sql/copy_from.go line 72 at r4 (raw file):

// copies that fail due to retriable errors. If an atomic copy under implicit
// transaction control encounters a retriable error and the amount of the copy
// data seen so far is less than copyRetryBufferSize, we will start a new

nit: the sentence in the comment is not complete.


pkg/sql/copy_from.go line 76 at r4 (raw file):

	settings.TenantWritable,
	"sql.copy.retry.max_size",
	"set to non-zero to enable automatic copy retry",

nit: the description of the setting is not complete. it should say what the non-zero value means. the info you put in the comment would make sense for the setting description too.


pkg/sql/copy/reader.go line 30 at r3 (raw file):

type Reader struct {
	// We don't use this buffer but we use some of its helper methods.
	pgwirebase.ReadBuffer

nit: i would vote for not embedding this struct. i think it would make the initialization of this readbuffer less cumbersome. (no need for the separate Init function)

it also makes it so the copy.Reader does not expose a bunch of methods on it that would actually be pretty bogus to call. (for example, we don't want users of copy.Reader to call ReadUntypedMsg on it)

if we make it a separate field then this should be a *pgwirebase.ReadBuffer


pkg/sql/copy/reader.go line 55 at r3 (raw file):

		return 0, io.EOF
	}
	// If we had a short read, finish it.

nit: there seems to be duplicated code here. how about this:

		if c.remainder == 0 {
			// Go to pgwire to get next segment.
			size, err := c.readTypedMessage()
			if err != nil {
				return 0, err
			}
			c.remainder = size
		}
		// We never want to overread from the wire, we might read past COPY data
		// segments so limit p to remainder bytes.
		if c.remainder < len(p) {
			p = p[:c.remainder]
		}
		n, err := c.pgr.Read(p)
		if err != nil {
			return 0, err
		}
		c.remainder -= n
		return n, nil

pkg/sql/copy/reader.go line 89 at r3 (raw file):

}

func (c *Reader) readTypedMessage() (size int, err error) {

nit: it's fairly confusing to have a readTypedMessage function here, when copy.Reader is embedding a pgwirebase.ReadBuffer, which has its own ReadTypedMsg function.


pkg/sql/copy/reader.go line 95 at r3 (raw file):

	}
	typ := pgwirebase.ClientMessageType(b)
	_, size, err = c.ReadUntypedMsgSize(c.pgr)

hmmm this handling of "message too big" errors seems wrong to me. the old code was doing this:

	typ, _, err := readBuf.ReadTypedMsg(c.conn.Rd())

followed by the handling of the MessageTooBigError

that is quite different from what is happening now. previously, we'd try to read the whole message; but now we only try to read the message size. where is the handling for MessageTooBig when reading the message itself? i'm thinking that this code should still be using (*pgwirebase.ReadBuffer).ReadTypedMsg, which correctly wraps the error with MessageTooBig when reading the message.


pkg/util/encoding/csv/reader.go line 164 at r3 (raw file):

	// Report parse error on empty lines.
	DontSkipEmptyLines bool

what is the purpose of this? it seems like it is always set to true

Copy link
Copy Markdown
Contributor Author

@cucaroach cucaroach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @smg260, and @srosenberg)


pkg/sql/copy_from.go line 630 at r2 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

this should be discussed further - ever since #97808 it should be the case that atomic copies already get retried automatically, thanks to this bit that oliver linked to, and the state machine defined in conn_fsm.go

but i take it that wasn't working, since you made this PR. one thing that would help me understand these changes better is if you could provide a repro for a situation when an atomic COPY was not being retried when it should have been.

i'm a bit anxious about this change since it seems to reimplement a lot of existing stuff from the conn_executor, and the goal of #97808 was to move towards having less reimplementation of conn_exeuctor logic

Its never been the case that atomic copies get retried automatically, this is because the logic in updateTxnRewindPosMaybe says CopyIn and CopyOut can't advance and setTxnRewindPos is never called. I didn't really think about changing that, maybe I should have?

@cucaroach cucaroach force-pushed the copyio_new branch 2 times, most recently from 2799055 to 8a2a11d Compare June 22, 2023 14:11
Add 3 node config for more retry coverage and turn off backups since its
a perf test.

Release note: none
Epic: CRDB-25321
Informs: cockroachdb#99327
Refactor COPY so that all the buffer reading takes place in a separate
implementation of the io.Reader interface. This does two things, it
enables the COPY implementation to efficiently handle small CopyData
frames by eliminating extra buffering and exposes the COPY bytes as
a pure stream of bytes which makes retry easier. It also cleans up the
COPY code that handles CopyData segments straddling line boundaries, now
we can just let the text/CSV reader do their thing and not have to do
any writeback.

The old implementation would read from a pgwire BufferedReader (copy 1)
into a pgwire "ReadBuffer" (copy 2) and then push those segments into a
bytes.Buffer "buf" (copy 3). The text and binary readers would read
right from buf the CSV reader has its own buffer and we would read
lines from buf and write them into the CSV reader's buffer (copy 4).

The new approach does away with all this and the text format
reads directly from a bufio.Reader (copy 1) stacked on the copy.Reader
(no buffering) stacked on the pgwire BufferedReader (copy 2). For CSV
the CSVReader reads directly from the copy.Reader since it has its
own buffer so again only two copies off the wire. Binary reads directly
from the copy.Reader since it requires no ReadLine (but the I/O is still
buffered at the pgwire level).

This doesn't seem to affect performance much but it gives the GC a nice
break and sets up a clean solution for cockroachdb#99327.

When encountering a memory usage error we used to try to let the encoder
finish the row but with the more efficient buffering this started
succeeds where it always failed before. Now we just don't do the hail
mary and if we hit the limit we bail and return immediately, this is
more OOM safe and simpler.

Fixes: cockroachdb#93156
Informs: cockroachdb#99327
Release note: none
Epic: CRDB-25321
Previously any errors encountered during atomic copies would just be
passed back to the client which could be very disruptive to large
migrations. Now we buffer up to sql.copy.retry.max_size bytes and
if an error is encountered we replay the COPY from the beginning with
a new transaction.

Release note (sql change): Atomic COPY commands can now be retried
increasing the rate of success of DMS operations especially when atomic
COPY behavior is required.
Fixes: cockroachdb#99327
Epic: CRDB-25321
Copy link
Copy Markdown
Contributor Author

@cucaroach cucaroach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the initial review! I wonder if I should push the refactor first and let it bake for a week and then push retry stuff. I'll plan on doing that.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @rafiss, @smg260, and @srosenberg)


pkg/sql/copy/reader.go line 55 at r3 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

nit: there seems to be duplicated code here. how about this:

		if c.remainder == 0 {
			// Go to pgwire to get next segment.
			size, err := c.readTypedMessage()
			if err != nil {
				return 0, err
			}
			c.remainder = size
		}
		// We never want to overread from the wire, we might read past COPY data
		// segments so limit p to remainder bytes.
		if c.remainder < len(p) {
			p = p[:c.remainder]
		}
		n, err := c.pgr.Read(p)
		if err != nil {
			return 0, err
		}
		c.remainder -= n
		return n, nil

Nice!


pkg/sql/copy/reader.go line 95 at r3 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

hmmm this handling of "message too big" errors seems wrong to me. the old code was doing this:

	typ, _, err := readBuf.ReadTypedMsg(c.conn.Rd())

followed by the handling of the MessageTooBigError

that is quite different from what is happening now. previously, we'd try to read the whole message; but now we only try to read the message size. where is the handling for MessageTooBig when reading the message itself? i'm thinking that this code should still be using (*pgwirebase.ReadBuffer).ReadTypedMsg, which correctly wraps the error with MessageTooBig when reading the message.

All the message size checking is in ReadUntypedMsgSize before we actually read the message so I think we're good. There are tests for this to make sure it works.


pkg/util/encoding/csv/reader.go line 164 at r3 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

what is the purpose of this? it seems like it is always set to true

The purpose is to not break other consumers of csv.Reader like import.

Copy link
Copy Markdown
Contributor Author

@cucaroach cucaroach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @rafiss, @smg260, and @srosenberg)


pkg/sql/copy_from.go line 561 at r3 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

nit: can we just return an AssertionFailed instead of a panic that crashes the node?

Hmm, reviewable lost the context here and I'm not sure what this refers to.

Copy link
Copy Markdown
Contributor Author

@cucaroach cucaroach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @msirek, @otan, @rafiss, @smg260, and @srosenberg)


pkg/sql/copy/reader.go line 89 at r3 (raw file):

Previously, rafiss (Rafi Shamim) wrote…

nit: it's fairly confusing to have a readTypedMessage function here, when copy.Reader is embedding a pgwirebase.ReadBuffer, which has its own ReadTypedMsg function.

I wanted to lean on ReadUntypeMsgSize to get the message too big handling but otherwise skip the ReadBuffer's buffering, open to suggestions!

Copy link
Copy Markdown
Collaborator

@rafiss rafiss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @cucaroach, @msirek, @otan, @smg260, and @srosenberg)


pkg/sql/copy_from.go line 630 at r2 (raw file):

I didn't really think about changing that, maybe I should have?

to me, it feels like doing the retries there would be a much less invasive refactor and is less risky since we won't have to manage transaction state in new bespoke ways. introducing new handles/callbacks on the txn object are the sorts of things that make me worried about regressions.

so i do think it is worth trying to make the change in updateTxnRewindPosMaybe. if it isn't possible to do there, i'd like us to understand the reason so we can add comments and explanation

@cucaroach
Copy link
Copy Markdown
Contributor Author

I didn't really think about changing that, maybe I should have?

to me, it feels like doing the retries there would be a much less invasive refactor and is less risky since we won't have to manage transaction state in new bespoke ways. introducing new handles/callbacks on the txn object are the sorts of things that make me worried about regressions.

so i do think it is worth trying to make the change in updateTxnRewindPosMaybe. if it isn't possible to do there, i'd like us to understand the reason so we can add comments and explanation

A little late in the game to go back to the drawing board but I can look into it. The I/O refactoring was important for fixing #93156 and stands apart from the retry stuff. Can we agree to move forward with the copy I/O refactoring as a separate commit while I revisit the retry approach?

@otan otan removed their request for review August 8, 2023 04:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

copy: implement buffered copy that can retry copy: optimize I/O for small data segments

6 participants