Skip to content

Commit b01d45e

Browse files
committed
sql: rewrite sql stats compaction job to avoid scanning mvcc garbage
Related to #79548 Previously, SQL Stats cleanup job's performance could severely degrade and cannot keep up with the write load, since its `DELETE` statements often needed to scan over large range of MVCC garbage. This commit addresses this issue by further constraining the scans of the `DELETE` statements to reduce how much MVCC garbage it scans over. Release note: None
1 parent dbbe083 commit b01d45e

4 files changed

Lines changed: 310 additions & 103 deletions

File tree

pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,15 @@ go_test(
7070
"//pkg/jobs",
7171
"//pkg/jobs/jobspb",
7272
"//pkg/jobs/jobstest",
73+
"//pkg/kv/kvserver",
7374
"//pkg/roachpb",
7475
"//pkg/scheduledjobs",
7576
"//pkg/security",
7677
"//pkg/security/securitytest",
7778
"//pkg/server",
7879
"//pkg/sql",
7980
"//pkg/sql/catalog",
81+
"//pkg/sql/catalog/systemschema",
8082
"//pkg/sql/sem/tree",
8183
"//pkg/sql/sessiondata",
8284
"//pkg/sql/sqlstats",

pkg/sql/sqlstats/persistedsqlstats/compaction_exec.go

Lines changed: 171 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ type StatsCompactor struct {
3636
rowsRemovedCounter *metric.Counter
3737

3838
knobs *sqlstats.TestingKnobs
39+
40+
scratch struct {
41+
qargs []interface{}
42+
}
3943
}
4044

4145
// NewStatsCompactor returns a new instance of StatsCompactor.
@@ -59,44 +63,43 @@ func NewStatsCompactor(
5963
// that exceeded the limit defined by `sql.stats.persisted_rows.max`
6064
// (persistedsqlstats.SQLStatsMaxPersistedRows).
6165
func (c *StatsCompactor) DeleteOldestEntries(ctx context.Context) error {
62-
if err := c.removeStaleRowsPerShard(ctx,
63-
"system.statement_statistics",
64-
systemschema.StmtStatsHashColumnName,
65-
"aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id",
66+
if err := c.removeStaleRowsPerShard(
67+
ctx,
68+
stmtStatsCleanupOps,
6669
); err != nil {
6770
return err
6871
}
6972

70-
return c.removeStaleRowsPerShard(ctx,
71-
"system.transaction_statistics",
72-
systemschema.TxnStatsHashColumnName,
73-
"aggregated_ts, fingerprint_id, app_name, node_id",
73+
return c.removeStaleRowsPerShard(
74+
ctx,
75+
txnStatsCleanupOps,
7476
)
7577
}
7678

7779
func (c *StatsCompactor) removeStaleRowsPerShard(
78-
ctx context.Context, tableName, hashColumnName, pkColumnNames string,
80+
ctx context.Context, ops *cleanupOperations,
7981
) error {
8082
rowLimitPerShard := c.getRowLimitPerShard()
81-
82-
existingRowCountQuery := c.getQueryForCheckingTableRowCounts(tableName, hashColumnName)
83-
deleteOldStatsStmt := c.getStatementForDeletingStaleRows(tableName, pkColumnNames, hashColumnName)
84-
8583
for shardIdx, rowLimit := range rowLimitPerShard {
8684
var existingRowCount int64
85+
8786
if err := c.getRowCountForShard(
8887
ctx,
89-
existingRowCountQuery,
88+
ops.getScanStmt(c.knobs),
9089
shardIdx,
9190
&existingRowCount,
9291
); err != nil {
9392
return err
9493
}
9594

95+
if c.knobs != nil && c.knobs.OnCleanupStartForShard != nil {
96+
c.knobs.OnCleanupStartForShard(shardIdx, existingRowCount, rowLimit)
97+
}
98+
9699
if err := c.removeStaleRowsForShard(
97100
ctx,
98-
deleteOldStatsStmt,
99-
shardIdx,
101+
ops,
102+
int64(shardIdx),
100103
existingRowCount,
101104
rowLimit,
102105
); err != nil {
@@ -154,73 +157,179 @@ func (c *StatsCompactor) getRowLimitPerShard() []int64 {
154157
// avoid having one large transaction.
155158
func (c *StatsCompactor) removeStaleRowsForShard(
156159
ctx context.Context,
157-
stmt string,
158-
shardIdx int,
160+
ops *cleanupOperations,
161+
shardIdx int64,
159162
existingRowCountPerShard, maxRowLimitPerShard int64,
160163
) error {
164+
var err error
165+
var lastDeletedRow tree.Datums
161166
maxDeleteRowsPerTxn := CompactionJobRowsToDeletePerTxn.Get(&c.st.SV)
167+
162168
if rowsToRemove := existingRowCountPerShard - maxRowLimitPerShard; rowsToRemove > 0 {
163169
for remainToBeRemoved := rowsToRemove; remainToBeRemoved > 0; {
164170
rowsToRemovePerTxn := remainToBeRemoved
165171
if remainToBeRemoved > maxDeleteRowsPerTxn {
166172
rowsToRemovePerTxn = maxDeleteRowsPerTxn
167173
}
168-
if _, err := c.ie.ExecEx(ctx,
169-
"delete-old-sql-stats",
170-
nil, /* txn */
171-
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
174+
175+
stmt := ops.getDeleteStmt(lastDeletedRow)
176+
qargs := c.getQargs(shardIdx, rowsToRemovePerTxn, lastDeletedRow)
177+
var rowsRemoved int64
178+
179+
lastDeletedRow, rowsRemoved, err = c.executeDeleteStmt(
180+
ctx,
172181
stmt,
173-
shardIdx,
174-
rowsToRemovePerTxn,
175-
); err != nil {
182+
qargs,
183+
)
184+
if err != nil {
176185
return err
177186
}
178-
179187
c.rowsRemovedCounter.Inc(rowsToRemovePerTxn)
188+
189+
// If we removed less rows compared to what we intended, it means something
190+
// else is interfering with the cleanup job, likely a human operator.
191+
// This can happen when the operator forgot to cancel the job when manual
192+
// intervention is happening.
193+
if rowsRemoved < rowsToRemovePerTxn {
194+
break
195+
}
196+
180197
remainToBeRemoved -= rowsToRemovePerTxn
198+
181199
}
182200
}
183201

184202
return nil
185203
}
186204

187-
func (c *StatsCompactor) getQueryForCheckingTableRowCounts(
188-
tableName, hashColumnName string,
189-
) string {
190-
// [1]: table name
191-
// [2]: follower read clause
192-
// [3]: hash column name
193-
existingRowCountQuery := `
194-
SELECT count(*)
195-
FROM %[1]s
196-
%[2]s
197-
WHERE %[3]s = $1
198-
`
199-
200-
followerReadClause := c.knobs.GetAOSTClause()
201-
202-
return fmt.Sprintf(existingRowCountQuery, tableName, followerReadClause, hashColumnName)
205+
func (c *StatsCompactor) executeDeleteStmt(
206+
ctx context.Context, delStmt string, qargs []interface{},
207+
) (lastRow tree.Datums, rowsDeleted int64, err error) {
208+
it, err := c.ie.QueryIteratorEx(ctx,
209+
"delete-old-sql-stats",
210+
nil, /* txn */
211+
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
212+
delStmt,
213+
qargs...,
214+
)
215+
if err != nil {
216+
return nil, 0, err
217+
}
218+
defer func() {
219+
err = errors.CombineErrors(err, it.Close())
220+
}()
221+
222+
var ok bool
223+
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
224+
lastRow = it.Cur()
225+
rowsDeleted++
226+
}
227+
228+
return lastRow, rowsDeleted, err
229+
}
230+
231+
func (c *StatsCompactor) getQargs(shardIdx, limit int64, lastDeletedRow tree.Datums) []interface{} {
232+
size := len(lastDeletedRow) + 2
233+
if cap(c.scratch.qargs) < size {
234+
c.scratch.qargs = make([]interface{}, 0, size)
235+
}
236+
c.scratch.qargs = c.scratch.qargs[:0]
237+
238+
c.scratch.qargs = append(c.scratch.qargs, tree.NewDInt(tree.DInt(shardIdx)))
239+
c.scratch.qargs = append(c.scratch.qargs, tree.NewDInt(tree.DInt(limit)))
240+
241+
for _, value := range lastDeletedRow {
242+
c.scratch.qargs = append(c.scratch.qargs, value)
243+
}
244+
245+
return c.scratch.qargs
246+
}
247+
248+
type cleanupOperations struct {
249+
initialScanStmtTemplate string
250+
unconstrainedDeleteStmt string
251+
constrainedDeleteStmt string
203252
}
204253

205-
func (c *StatsCompactor) getStatementForDeletingStaleRows(
206-
tableName, pkColumnNames, hashColumnName string,
207-
) string {
208-
// [1]: table name
209-
// [2]: primary key
210-
// [3]: hash column name
211-
const stmt = `
212-
DELETE FROM %[1]s
213-
WHERE (%[2]s) IN (
214-
SELECT %[2]s
215-
FROM %[1]s
216-
WHERE %[3]s = $1
217-
ORDER BY aggregated_ts ASC
218-
LIMIT $2
254+
var (
255+
stmtStatsCleanupOps = &cleanupOperations{
256+
initialScanStmtTemplate: `
257+
SELECT count(*)
258+
FROM system.statement_statistics
259+
%s
260+
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1`,
261+
unconstrainedDeleteStmt: `
262+
DELETE FROM system.statement_statistics
263+
WHERE (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id) IN (
264+
SELECT aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id
265+
FROM system.statement_statistics
266+
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1
267+
ORDER BY aggregated_ts ASC
268+
LIMIT $2
269+
) RETURNING aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id`,
270+
constrainedDeleteStmt: `
271+
DELETE FROM system.statement_statistics
272+
WHERE (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id) IN (
273+
SELECT aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id
274+
FROM system.statement_statistics
275+
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1
276+
AND (
277+
(
278+
aggregated_ts,
279+
fingerprint_id,
280+
transaction_fingerprint_id,
281+
plan_hash,
282+
app_name,
283+
node_id
284+
) >= ($3, $4, $5, $6, $7, $8)
285+
)
286+
ORDER BY aggregated_ts ASC
287+
LIMIT $2
288+
) RETURNING aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id`,
289+
}
290+
txnStatsCleanupOps = &cleanupOperations{
291+
initialScanStmtTemplate: `
292+
SELECT count(*)
293+
FROM system.transaction_statistics
294+
%s
295+
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 = $1`,
296+
unconstrainedDeleteStmt: `
297+
DELETE FROM system.transaction_statistics
298+
WHERE (aggregated_ts, fingerprint_id, app_name, node_id) IN (
299+
SELECT aggregated_ts, fingerprint_id, app_name, node_id
300+
FROM system.transaction_statistics
301+
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 = $1
302+
ORDER BY aggregated_ts ASC
303+
LIMIT $2
304+
) RETURNING aggregated_ts, fingerprint_id, app_name, node_id`,
305+
constrainedDeleteStmt: `
306+
DELETE FROM system.transaction_statistics
307+
WHERE (aggregated_ts, fingerprint_id, app_name, node_id) IN (
308+
SELECT aggregated_ts, fingerprint_id, app_name, node_id
309+
FROM system.transaction_statistics
310+
WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 = $1
311+
AND (
312+
(
313+
aggregated_ts,
314+
fingerprint_id,
315+
app_name,
316+
node_id
317+
) >= ($3, $4, $5, $6)
318+
)
319+
ORDER BY aggregated_ts ASC
320+
LIMIT $2
321+
) RETURNING aggregated_ts, fingerprint_id, app_name, node_id`,
322+
}
219323
)
220-
`
221-
return fmt.Sprintf(stmt,
222-
tableName,
223-
pkColumnNames,
224-
hashColumnName,
225-
)
324+
325+
func (c *cleanupOperations) getScanStmt(knobs *sqlstats.TestingKnobs) string {
326+
return fmt.Sprintf(c.initialScanStmtTemplate, knobs.GetAOSTClause())
327+
}
328+
329+
func (c *cleanupOperations) getDeleteStmt(lastDeletedRow tree.Datums) string {
330+
if len(lastDeletedRow) == 0 {
331+
return c.unconstrainedDeleteStmt
332+
}
333+
334+
return c.constrainedDeleteStmt
226335
}

0 commit comments

Comments
 (0)