sql,jobs: create SQL Stats Compaction Job and resumer#68434
sql,jobs: create SQL Stats Compaction Job and resumer#68434craig[bot] merged 2 commits intocockroachdb:masterfrom
Conversation
4b7ec28 to
8ebff66
Compare
ac53e65 to
2b8342d
Compare
matthewtodd
left a comment
There was a problem hiding this comment.
🎉
Reviewed 3 of 3 files at r1, 16 of 16 files at r2, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @Azhng)
-- commits, line 10 at r2 ([raw file](https://github.com/cockroachdb/cockroach/blob/2b8342d0df54f4634affec19a708aaa5fa425e53/-- commits#L10)):
nit: "barebones"
pkg/sql/compact_sql_stats.go, line 54 at r2 (raw file):
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { if err != nil { if jobErr := r.Job.FractionProgressed(ctx, txn, func(ctx context.Context, _ jobspb.ProgressDetails) float32 {
Maybe a comment here, because it's a little unclear to me what this line is doing. Does the literal 0 in the callback mean anything, like are we checking that the job hasn't made any progress? Or are we just using FractionProgressed for its error-checking capabilities, but we don't actually care about progress?
pkg/sql/sqlstats/persistedsqlstats/cluster_settings.go, line 59 at r2 (raw file):
"sql.stats.persisted_rows.max", "maximum number of rows of statement and transaction"+ " statistics will be persisted in the system tables",
nit: "statistics that will be"
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 41 at r2 (raw file):
// StatsCompactorKnobs can be used to tune the behavior of // StatsCompactor. type StatsCompactorKnobs struct {
If these are just for testing, maybe call them StatsCompactorTestingKnobs?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 69 at r2 (raw file):
} stmtStatsEntryCnt, txnStatsEntryCnt, err := c.getExistingStmtAndTxnStatsEntries(ctx)
Is "cnt" a common abbreviation for "count" around here? Maybe spell it out if not?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 87 at r2 (raw file):
stmt := "SELECT count(*) FROM system.statement_statistics" if c.TestingKnobs != nil && c.TestingKnobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics"
This is the same stmt as above, did you mean to make something different happen here?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 103 at r2 (raw file):
stmtStatsEntryCnt = int64(tree.MustBeDInt(row[0])) // stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()"
Kill this comment, yeah?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r2 (raw file):
stmt = "SELECT count(*) FROM system.transaction_statistics" if c.TestingKnobs != nil && c.TestingKnobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics"
Ditto, same stmt as above.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 138 at r2 (raw file):
SELECT %[2]s FROM %[1]s ORDER BY aggregated_ts
Maybe an explicit ASC here? I get nervous deleting things, lol.
pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go, line 53 at r2 (raw file):
// CheckExistingCompactionJob checks for existing SQL Stats Compaction job // that are either PAUCED, CANCELED, or RUNNING. If so, it returns a
PAUSED
pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 54 at r2 (raw file):
stopper := stop.NewStopper() defer stopper.Stop(ctx)
Just curious, what does this stopper stop? I don't see it used below. Does it look for magic stuff in the ctx?
pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 100 at r2 (raw file):
require.Equal(t, maxPersistedRowLimit, len(actualStmtFingerprints)) require.Equal(t, maxPersistedRowLimit, len(actualTxnFingerprints)) for fingerprintID := range actualTxnFingerprints {
super-nit: This code usually talks about statements, then transactions, so maybe move this for loop down?
pkg/util/jobutil/job_utils.go, line 28 at r1 (raw file):
// started earlier than this one. If job is nil, CheckRunningJobsHelper just // checks if there are any pending, running, or paused jobs. func CheckRunningJobsHelper(
All makes sense, since this is a pure extraction. I'm thinking about names and wondering if this might be called RunningJobsExist?
pkg/util/jobutil/job_utils.go, line 30 at r1 (raw file):
func CheckRunningJobsHelper( ctx context.Context, job *jobs.Job,
Would it make sense to just pass the JobID here? Or could it be integrated into the filter somehow?
pkg/util/jobutil/job_utils.go, line 33 at r1 (raw file):
ie sqlutil.InternalExecutor, txn *kv.Txn, payloadFilter func(payload *jobspb.Payload) bool,
One more naming thought, I think this is a "payloadPredicate"?
7801f9e to
201d810
Compare
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @matthewtodd)
pkg/sql/compact_sql_stats.go, line 54 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Maybe a comment here, because it's a little unclear to me what this line is doing. Does the literal
0in the callback mean anything, like are we checking that the job hasn't made any progress? Or are we just usingFractionProgressedfor its error-checking capabilities, but we don't actually care about progress?
Hmm I put it here because I saw the automatic create stats job updates the job progress in their resumer. On second thought, since as of now, the compaction job is relatively simple and runs pretty fast, I don't think we need to update the job progress just for the sake of doing so.
I removed it for now and we can reintroduce it once we start to actually implement the compaction logic.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 69 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Is "cnt" a common abbreviation for "count" around here? Maybe spell it out if not?
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 87 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
This is the same
stmtas above, did you mean to make something different happen here?
Ah, the original stmt should haveAS OF SYSTEM TIME clause. Fixed .
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 103 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Kill this comment, yeah?
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Ditto, same
stmtas above.
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 138 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Maybe an explicit
ASChere? I get nervous deleting things, lol.
Ah sounds good 👍
pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 54 at r2 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Just curious, what does this
stopperstop? I don't see it used below. Does it look for magic stuff in thectx?
Ah this is definitely redundant. Removed.
pkg/util/jobutil/job_utils.go, line 28 at r1 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
All makes sense, since this is a pure extraction. I'm thinking about names and wondering if this might be called
RunningJobsExist?
Done.
pkg/util/jobutil/job_utils.go, line 30 at r1 (raw file):
Previously, matthewtodd (Matthew Todd) wrote…
Would it make sense to just pass the
JobIDhere? Or could it be integrated into the filter somehow?
The behavior of the function when job is nil is different when the job is not nil. If job is nil, then this function just check if there are any other running job that matches the predicate. If the job is not nil, the function checks if the provided job's jobID is the earliest running job that's created.
Updated the comment to highlight that.
matthewtodd
left a comment
There was a problem hiding this comment.
Reviewed 18 of 18 files at r3, 10 of 16 files at r4, 6 of 6 files at r5, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @Azhng)
ajwerner
left a comment
There was a problem hiding this comment.
Nice job navigating the jobs boilerplate.
pkg/sql/compact_sql_stats.go
Outdated
| ) | ||
|
|
||
| type sqlStatsCompactionResumer struct { | ||
| *jobs.Job |
There was a problem hiding this comment.
Is this embedding buying you anything?
| if c.TestingKnobs == nil { | ||
| c.TestingKnobs = &StatsCompactorTestingKnobs{} | ||
| } | ||
| c.TestingKnobs.DisableFollowerRead = disallowFollowerRead |
There was a problem hiding this comment.
It's not often that you see non-testing code setting a TestingKnob value. Is the idea that your test is going to verify that the boolean made it through to here? If so, use a callback.
There was a problem hiding this comment.
After reading more, I think it's that you're trying to tell the compactor to disallow follower reads. But, like, that knob is not going to change throughout the life of the cluster so this seems a little funky. Instead just inject the testing knobs in the appropriate places.
|
|
||
| // Init initializes the StatsCompactor. This method needs to be called before | ||
| // the DeleteOldestEntries method. | ||
| func (c *StatsCompactor) Init( |
There was a problem hiding this comment.
Why Init and not just NewStatsCompactor so you don't need to stress about whether or not it has been initialized?
| // [1]: table name | ||
| // [2]: primary key | ||
| // [3]: number of entries to remove | ||
| stmt := ` |
| ) | ||
| ` | ||
|
|
||
| err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { |
There was a problem hiding this comment.
I think this can all be return c.db.Txn(ctx, ...
| // on system tables that are yet to be created. This is not a scenario that is | ||
| // possible in real life. |
There was a problem hiding this comment.
I like the "real life" commentary but perhaps "outside of testing" would be better.
pkg/jobs/jobspb/jobs.proto
Outdated
| } | ||
|
|
||
| message SQLStatsCompactionDetails { | ||
| bool disable_follower_read = 1; |
There was a problem hiding this comment.
Do we really need this in the job details? From my reading it only comes from a testing knob which should be shared by all of the instances of the job.
pkg/sql/compact_sql_stats.go
Outdated
| if !ok { | ||
| return errors.AssertionFailedf("invalid job detail payload for sql stats compaction") | ||
| } | ||
| r.compactionExecutor.Init(ie, db, jobDetails.DisableFollowerRead) |
There was a problem hiding this comment.
The code uses both DisableFollowerReads and disallowFollerReads. Can we pick one?
pkg/util/jobutil/job_utils.go
Outdated
| // by the Apache License, Version 2.0, included in the file | ||
| // licenses/APL.txt. | ||
|
|
||
| package jobutil |
There was a problem hiding this comment.
I don't know that this warrants its own package. jobs already depends on all of this stuff. Seems fine to put this file in that package.
pkg/util/jobutil/job_utils.go
Outdated
| // running, or paused jobs that matches payloadPredicate. | ||
| func RunningJobExists( | ||
| ctx context.Context, | ||
| job *jobs.Job, |
There was a problem hiding this comment.
Any reason not to just pass the job ID as opposed to the job struct?
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @matthewtodd)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, ajwerner wrote…
Do we really need this in the job details? From my reading it only comes from a testing knob which should be shared by all of the instances of the job.
Removed. Added package-level variable disableFollowerReadForTest instead of passing it through protobuf.
pkg/sql/compact_sql_stats.go, line 25 at r5 (raw file):
Previously, ajwerner wrote…
Is this embedding buying you anything?
Not really. Removed the embedding.
pkg/sql/compact_sql_stats.go, line 52 at r5 (raw file):
Previously, ajwerner wrote…
The code uses both
DisableFollowerReadsanddisallowFollerReads. Can we pick one?
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 46 at r5 (raw file):
Previously, ajwerner wrote…
I like the "real life" commentary but perhaps "outside of testing" would be better.
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 55 at r5 (raw file):
Previously, ajwerner wrote…
Why
Initand not justNewStatsCompactorso you don't need to stress about whether or not it has been initialized?
I was under the wrong impression that all the fields in the Resumer need to be initialized before Resume() is called.
Removed Init and replace it with NewStatsCompactor
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 64 at r5 (raw file):
Previously, ajwerner wrote…
After reading more, I think it's that you're trying to tell the compactor to disallow follower reads. But, like, that knob is not going to change throughout the life of the cluster so this seems a little funky. Instead just inject the testing knobs in the appropriate places.
Replaced it with a package level private variable. Since other parts of the persistedsqlstats will eventually start using follower read and having one package level private variable can just disable all the follower reads in the package.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 148 at r5 (raw file):
Previously, ajwerner wrote…
I think this can all be
return c.db.Txn(ctx, ...
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go, line 79 at r5 (raw file):
Previously, ajwerner wrote…
total nit: I might write this
if err == nil && exists { err = ErrConcurrentSQLStatsCompaction } return err </blockquote></details> Done. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_test.go, line 68 at r5](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-MgHHSTR10m5UZAUk3hE-r5-68:-MgucJck04OW4ep56Xu7:b2nhoia) ([raw file](https://github.com/cockroachdb/cockroach/blob/201d8109a5b5072c4897a7320e4372e61fbc584c/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go#L68)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> nit: you can pass the value to `Exec` like: ```go firstSQLConn.Exec(t, "SET CLUSTER SETTING sql.stats.persisted_rows.max = $1", maxPersistedRowLimit)
TIL
pkg/util/jobutil/job_utils.go, line 11 at r5 (raw file):
Previously, ajwerner wrote…
I don't know that this warrants its own package.
jobsalready depends on all of this stuff. Seems fine to put this file in that package.
Done.
pkg/util/jobutil/job_utils.go, line 32 at r5 (raw file):
Previously, ajwerner wrote…
Any reason not to just pass the job ID as opposed to the job struct?
Replaced with job ID.
pkg/util/jobutil/job_utils.go, line 57 at r5 (raw file):
Previously, ajwerner wrote…
I think you want more than just this. Reverting, PauseRequested, CancelRequested. See
jobs.NonTerminalStatusTupleString.
Done.
ajwerner
left a comment
There was a problem hiding this comment.
Reviewed 6 of 19 files at r6, 2 of 16 files at r7.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @Azhng, and @matthewtodd)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Removed. Added package-level variable
disableFollowerReadForTestinstead of passing it through protobuf.
huh, I don't think that's better. Don't you have access to testing knobs everywhere that you'd need it?
pkg/sql/create_stats.go, line 651 at r6 (raw file):
} return nil /* retErr */
you've removed retErr
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 34 at r7 (raw file):
ie sqlutil.InternalExecutor TestingKnobs *StatsCompactorTestingKnobs
this thing should get passed to NewStatsCompactor and then should exist in the top-level TestingKnobs
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 107 at r7 (raw file):
if row.Len() != 1 { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, errors.AssertionFailedf("unexpected number of column returned")
not that it matters but, if you're going to use AssertionFailedf maybe report the number of columns?
pkg/jobs/utils.go, line 39 at r6 (raw file):
system.jobs WHERE status IN ($1, $2, $3, $4, $5, $6)
you can literally just do the following and lose the args
status IN ` + NonTerminalStatusTupleString +`
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @Azhng, and @matthewtodd)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, ajwerner wrote…
huh, I don't think that's better. Don't you have access to testing knobs everywhere that you'd need it?
Since the resumer creates the StatsCompactor, and the resumer's constructor is registered in func init(), I'm not entirely sure how I can inject testing knobs into the resumer? (Since it's privately scoped into the sql package because it depends on JobExecContext). Any ideas here?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 107 at r7 (raw file):
Previously, ajwerner wrote…
not that it matters but, if you're going to use
AssertionFailedfmaybe report the number of columns?
Done.
pkg/jobs/utils.go, line 39 at r6 (raw file):
Previously, ajwerner wrote…
you can literally just do the following and lose the args
status IN ` + NonTerminalStatusTupleString +`
For some reason I thought NonTerminalStatusTupleString doesn't have StatusRunning and StatusPaused. 🤦♂️
arulajmani
left a comment
There was a problem hiding this comment.
Reviewed 1 of 19 files at r6, 18 of 18 files at r8.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @Azhng)
pkg/sql/create_stats.go, line 635 at r8 (raw file):
// just checks if there are any pending, running, or paused CreateStats jobs. func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) error { var jobID jobspb.JobID
nit: s/var jobID jobspb.ID/jobID := jobspb.InvalidJobID
pkg/jobs/utils.go, line 25 at r8 (raw file):
// RunningJobExists checks that whether there are any other jobs (matched by // payloadPredicate callback) in the pending, running, or paused status that // started earlier than the job with provided jobID.
Drive by comment -- briefly looking at this code, it seems like this is intended to work even when the jobID does not refer to a real jobID. Am I reading this right? Could you expand on your comment to talk about the case where jobID is InvalidJobID as well?
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @Azhng)
pkg/sql/create_stats.go, line 635 at r8 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
nit: s/var jobID jobspb.ID/jobID := jobspb.InvalidJobID
Done
pkg/jobs/utils.go, line 25 at r8 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Drive by comment -- briefly looking at this code, it seems like this is intended to work even when the jobID does not refer to a real jobID. Am I reading this right? Could you expand on your comment to talk about the case where jobID is InvalidJobID as well?
Added additional explanation for the InvalidJobID case
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @arulajmani)
pkg/jobs/jobspb/jobs.proto, line 667 at r5 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Since the
resumercreates theStatsCompactor, and theresumer's constructor is registered infunc init(), I'm not entirely sure how I can inject testing knobs into the resumer? (Since it's privately scoped into thesqlpackage because it depends onJobExecContext). Any ideas here?
Nevermind, I can just use the testing knobs that is already in the ExecCfg(). Updated
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 34 at r7 (raw file):
Previously, ajwerner wrote…
this thing should get passed to NewStatsCompactor and then should exist in the top-level TestingKnobs
Done.
|
@ajwerner RFAL |
ajwerner
left a comment
There was a problem hiding this comment.
Reviewed 1 of 18 files at r16, 3 of 19 files at r17, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, @Azhng, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 79 at r17 (raw file):
func (c *StatsCompactor) getExistingStmtAndTxnStatsCount( ctx context.Context, ) (stmtStatsEntryCount, txnStatsEntryCount int64, err error) {
nit: don't name this error, it can get you in trouble.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 80 at r17 (raw file):
ctx context.Context, ) (stmtStatsEntryCount, txnStatsEntryCount int64, err error) { getRowCount := func(ctx context.Context, opName string, stmt string) (int64, error) {
nit: you could pass in the pointer and simplify things a tad.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r17 (raw file):
Quoted 13 lines of code…
stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics" } stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt) if err != nil { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err } stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics" } txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
nit: put these in separate blocks:
{
stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()"
if c.knobs.DisableFollowerRead {
stmt = "SELECT count(*) FROM system.statement_statistics"
}
stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt)
if err != nil {
return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err
}
}
{
stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()"
if c.knobs.DisableFollowerRead {
stmt = "SELECT count(*) FROM system.transaction_statistics"
}
txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
if err != nil
}
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 113 at r17 (raw file):
} txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)
you weren't returning this error.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 118 at r17 (raw file):
} func (c *StatsCompactor) deleteOldestEntries(
I feel like the intention behind this pattern does not match reality. Namely, you're providing a number of rows to delete with the idea that you're going to not contend with foreground writes. The logic there would be sound if it weren't for the hash sharded index. The problem is that we're going to end up scanning the number of entries from each shard and we'll have to do a merge join to take the top k. I think what you want is to break this down and find the number of entries to delete on a per-shard basis. Somewhere you should export the shard count and then do this query for each shard specifying the shard column explicitly.
I encourage you to take a look at the query plan and see what the limit pushdown looks like.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 140 at r17 (raw file):
txn *kv.Txn, stmt string, curRowsCount int64) error {
nit: wrap this like:
removeRows := func(
ctx context.Context,
opName string,
txn *kv.Txn,
stmt string,
curRowsCount int64,
) error {
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 79 at r17 (raw file):
Previously, ajwerner wrote…
nit: don't name this error, it can get you in trouble.
Done. Is it because of the concern for shadowing variables?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 80 at r17 (raw file):
Previously, ajwerner wrote…
nit: you could pass in the pointer and simplify things a tad.
Hmm I'm not exactly sure what the "pointer" is referring to here. Do you mind clarify?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 106 at r17 (raw file):
Previously, ajwerner wrote…
stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics" } stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt) if err != nil { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err } stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics" } txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt)nit: put these in separate blocks:
{ stmt := "SELECT count(*) FROM system.statement_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.statement_statistics" } stmtStatsEntryCount, err = getRowCount(ctx, "scan-sql-stmt-stats-entries", stmt) if err != nil { return 0 /* stmtStatsEntryCount */, 0 /* txnStatsEntryCount */, err } } { stmt = "SELECT count(*) FROM system.transaction_statistics AS OF SYSTEM TIME follower_read_timestamp()" if c.knobs.DisableFollowerRead { stmt = "SELECT count(*) FROM system.transaction_statistics" } txnStatsEntryCount, err = getRowCount(ctx, "scan-sql-txn-stats-entries", stmt) if err != nil } </blockquote></details> Done. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 113 at r17](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-Mho9PBr8YDCXdTcbCfr:-MhoTvRf2MzXa2qPuKM3:b-896fix) ([raw file](https://github.com/cockroachdb/cockroach/blob/2397132670a7b41e242da5574080e344432680df/pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go#L113)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> you weren't returning this error. </blockquote></details> Done. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 118 at r17](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-MhoBJPe9j7RjgV8e4Iu:-MhoTw4W-_S7xvxgG-ej:bkca99l) ([raw file](https://github.com/cockroachdb/cockroach/blob/2397132670a7b41e242da5574080e344432680df/pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go#L118)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> I feel like the intention behind this pattern does not match reality. Namely, you're providing a number of rows to delete with the idea that you're going to not contend with foreground writes. The logic there would be sound if it weren't for the hash sharded index. The problem is that we're going to end up scanning the number of entries from each shard and we'll have to do a merge join to take the top k. I think what you want is to break this down and find the number of entries to delete on a per-shard basis. Somewhere you should export the shard count and then do this query for each shard specifying the shard column explicitly. I encourage you to take a look at the query plan and see what the limit pushdown looks like. </blockquote></details> I see. That makes sense. Changed the code to clean up on per-shard basis. ___ *[pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 140 at r17](https://reviewable.io/reviews/cockroachdb/cockroach/68434#-MhoAgP36zmoJHM9BIjZ:-MhoZPgvEjA97uUH6A6Q:b-896fix) ([raw file](https://github.com/cockroachdb/cockroach/blob/2397132670a7b41e242da5574080e344432680df/pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go#L140)):* <details><summary><i>Previously, ajwerner wrote…</i></summary><blockquote> nit: wrap this like:removeRows := func( ctx context.Context, opName string, txn *kv.Txn, stmt string, curRowsCount int64, ) error {</blockquote></details> Done. <!-- Sent from Reviewable.io -->
ajwerner
left a comment
There was a problem hiding this comment.
Here's more feedback. Some of it would eliminate the need for other parts of it (like if you broke out the logic per shard, then you definitely don't need the array).
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, @Azhng, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 79 at r17 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Done. Is it because of the concern for shadowing variables?
Partially, also so that you can get unused variable checking. Like consider the case below where you assigned to err but then returned nil. The linter didn't catch that.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 80 at r17 (raw file):
Previously, Azhng (Archer Zhang) wrote…
Hmm I'm not exactly sure what the "pointer" is referring to here. Do you mind clarify?
like, you could pass in a *int64 and only return an error from this function.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 99 at r18 (raw file):
ctx context.Context, opName, tableName, hashColumnName string, ) ([]int64, error) { rowCountPerShard := make([]int64, systemschema.SQLStatsHashShardBucketCount)
you could use arrays everywhere given this is a constant. Consider:
const shards = systemschema.SQLStatsHashShardBucketCount
func (c *StatsCompactor) getRowCountPerShard(
ctx context.Context, opName, tableName, hashColumnName string,
) ([shards]int64, error) {and then pass these arrays everywhere.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 116 at r18 (raw file):
} stmt = fmt.Sprintf(stmt, tableName, hashColumnName)
nit: if you're going to be formatting this anyway, you could just add the follower read clause as another parameter.
stmt = `SELECT count(*) FROM %[1]s WHERE %[2]s = $1%[3]s`
followerReadClause := ` AS OF SYSTEM TIME follower_read_timestamp()`
if c.knobs.DisableFollowerRead {
followerReadClause = ""
}
stmt = fmt.Sprintf(stmt, tableName, hashColumnName, followerReadClause)pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 136 at r18 (raw file):
func (c *StatsCompactor) getExistingStmtAndTxnStatsCount( ctx context.Context, ) ([]int64, []int64, error) {
I'd name the counts, just _ for the error.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 188 at r18 (raw file):
} else { limitPerShard[shardIdx] = maxStatsEntryPerShard }
there's a nice trick here I learned once. You can just keep subtracting and dividing and it will all work out nicely:
limitPerShard := make([]int64, systemschema.SQLStatsHashShardBucketCount)
maxPersistedRows := SQLStatsMaxPersistedRows.Get(&c.st.SV)
for i := 0; i < shards; i++ {
limitsPerShard[i] = maxPersistedRows / (shards - i)
maxPersistedRows -= limitsPerShard[i]
}pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
func (c *StatsCompactor) deleteOldestEntries( ctx context.Context, stmtStatsRowCountPerShard, txnStatsRowCountPerShard []int64,
how crazy would it be to pick a timestamp below which you'll be deleting stuff as opposed to picking what amounts to an offset?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 242 at r18 (raw file):
} return c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
I'm anxious about the size of this transaction. Large transactions in cockroach are known for having a bad time. Particularly large deletes which might overlap with foreground traffic. I would:
- break this transaction out per shard. I think that'll also simplify some of the logic in this file.
- Put a limit on the number of rows we're willing to delete in a single transaction and then add a loop.
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 99 at r18 (raw file):
Previously, ajwerner wrote…
you could use arrays everywhere given this is a constant. Consider:
const shards = systemschema.SQLStatsHashShardBucketCount func (c *StatsCompactor) getRowCountPerShard( ctx context.Context, opName, tableName, hashColumnName string, ) ([shards]int64, error) {and then pass these arrays everywhere.
Opted to refactor to implement the logic per shard.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 116 at r18 (raw file):
Previously, ajwerner wrote…
nit: if you're going to be formatting this anyway, you could just add the follower read clause as another parameter.
stmt = `SELECT count(*) FROM %[1]s WHERE %[2]s = $1%[3]s` followerReadClause := ` AS OF SYSTEM TIME follower_read_timestamp()` if c.knobs.DisableFollowerRead { followerReadClause = "" } stmt = fmt.Sprintf(stmt, tableName, hashColumnName, followerReadClause)
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 136 at r18 (raw file):
Previously, ajwerner wrote…
I'd name the counts, just
_for the error.
Opted to implement the logic per shard.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 188 at r18 (raw file):
Previously, ajwerner wrote…
there's a nice trick here I learned once. You can just keep subtracting and dividing and it will all work out nicely:
limitPerShard := make([]int64, systemschema.SQLStatsHashShardBucketCount) maxPersistedRows := SQLStatsMaxPersistedRows.Get(&c.st.SV) for i := 0; i < shards; i++ { limitsPerShard[i] = maxPersistedRows / (shards - i) maxPersistedRows -= limitsPerShard[i] }
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
Previously, ajwerner wrote…
how crazy would it be to pick a timestamp below which you'll be deleting stuff as opposed to picking what amounts to an offset?
I don't quite follow here. Are we talking about the timestamp of the transaction ?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 242 at r18 (raw file):
Previously, ajwerner wrote…
I'm anxious about the size of this transaction. Large transactions in cockroach are known for having a bad time. Particularly large deletes which might overlap with foreground traffic. I would:
- break this transaction out per shard. I think that'll also simplify some of the logic in this file.
- Put a limit on the number of rows we're willing to delete in a single transaction and then add a loop.
Done.
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, @Azhng, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
Previously, Azhng (Archer Zhang) wrote…
I don't quite follow here. Are we talking about the timestamp of the transaction ?
It's not important; what you have here is fine.
What I was thinking is that right now you go and find the number of rows in the shard and then you find the number you think should be in the shard and you delete until you think you've deleted enough. Another approach would be to find the aggregated_ts of the newest row you want to delete and use that drive the deletions here.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 33 at r19 (raw file):
const maxDeleteRowsPerTxn = 128 // StatsCompactor is responsible for compacting older SQL Stats. It is
my last nit is that this thing doesn't really compact, it just deletes. Is compaction future work?
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 89 at r19 (raw file):
deleteOldStatsStmt := c.getStatementForDeletingStaleRows(tableName, pkColumnNames, hashColumnName) for shardIdx := int64(0); shardIdx < systemschema.SQLStatsHashShardBucketCount; shardIdx++ {
nit: for shard, limit := range limitPerShard, I don't think that the int64 nature of the shard is important for your function signatures.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 169 at r19 (raw file):
} if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { _, err := c.ie.ExecEx(ctx,
fun fact, if you pass a nil for the txn the internal executor will deal with the transaction management for you. I encourage you to do that here.
Azhng
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner, @arulajmani, and @maryliag)
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 195 at r18 (raw file):
Previously, ajwerner wrote…
It's not important; what you have here is fine.
What I was thinking is that right now you go and find the number of rows in the shard and then you find the number you think should be in the shard and you delete until you think you've deleted enough. Another approach would be to find the
aggregated_tsof the newest row you want to delete and use that drive the deletions here.
I see. Interesting 🤔 I suppose we can try out that approach if the current approach becomes problematic.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 33 at r19 (raw file):
Previously, ajwerner wrote…
my last nit is that this thing doesn't really compact, it just deletes. Is compaction future work?
Yes, compaction is future work.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 89 at r19 (raw file):
Previously, ajwerner wrote…
nit: for
shard, limit := range limitPerShard, I don't think that theint64nature of the shard is important for your function signatures.
Done.
pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go, line 169 at r19 (raw file):
Previously, ajwerner wrote…
fun fact, if you pass a
nilfor thetxnthe internal executor will deal with the transaction management for you. I encourage you to do that here.
Done.
c2b6973 to
9702ece
Compare
maryliag
left a comment
There was a problem hiding this comment.
Reviewed 20 of 20 files at r23, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner and @arulajmani)
|
TFTR! bors r=ajwerner,maryliag |
|
Merge conflict. |
Release note: None
This commit introduces the SQL Stats Compaction job type and a barebones implementation of the SQL Stats compaction. Release justification: category 4 Release note: None
|
bors r=ajwerner,maryliag |
|
Build succeeded: |
68401: sql: create sql stats compaction scheduled job r=miretskiy,maryliag a=Azhng Depends on #68434, #69346 sql: create sql stats compaction scheduled job This commit introduces SQL Stats Compaction Scheduled job, where it runs in an hourly schedule and invokes the SQL Stats Compaction Job that was created in the previous commit. This commit also introduces `crdb_internal.schedule_sql_stats_compaction()` builtin as a way to manually create SQL Stats compaction schedule. `SHOW SCHEDULES` command'syntax is also extended to support `SHOW SCHEDUELS FOR SQL STATISTICS`. Release justification: category 4 Release note (sql change): introduce crdb_internal.schedule_sql_stats_compaction() to manually create SQL Stats compaction schedule. Extend SHOW SCHEDULES command to support SHOW SCHEDULES FOR SQL STATISTICS. Co-authored-by: Azhng <archer.xn@gmail.com>
69046: kv: return error from checkNegotiateAndSendPreconditions r=nvanbenschoten a=nvanbenschoten Instead of panicking. Discussed in #68969. Release justification: change to new functionality 69273: sql: crdb_internal.reset_sql_stats() now resets persisted SQL Stats r=maryliag a=Azhng Depends on #68401 and #68434 Previously, crdb_internal.reset_sql_stats() builtin only resets cluster-wide in-memory sql stats. This patch updated the builtin to be able to reset persisted sql stats as well. Release justification: category 4 Release note (sql change): crdb_internal.reset_sql_stats() now resets persisted SQL Stats. Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com> Co-authored-by: Azhng <archer.xn@gmail.com>
Previous PR #67090
Followed by #68401
First Commit
sql,util: refactor checking for running jobs to its own util package
Release note: None
Second Commit
sql,jobs: create SQL Stats Compaction Job and resumer
This commit introduces the SQL Stats Compaction job type
and a barebones implementation of the SQL Stats compaction.
Release justification: category 4
Release note: None