@@ -36,6 +36,10 @@ type StatsCompactor struct {
3636 rowsRemovedCounter * metric.Counter
3737
3838 knobs * sqlstats.TestingKnobs
39+
40+ scratch struct {
41+ qargs []interface {}
42+ }
3943}
4044
4145// NewStatsCompactor returns a new instance of StatsCompactor.
@@ -59,44 +63,43 @@ func NewStatsCompactor(
5963// that exceeded the limit defined by `sql.stats.persisted_rows.max`
6064// (persistedsqlstats.SQLStatsMaxPersistedRows).
6165func (c * StatsCompactor ) DeleteOldestEntries (ctx context.Context ) error {
62- if err := c .removeStaleRowsPerShard (ctx ,
63- "system.statement_statistics" ,
64- systemschema .StmtStatsHashColumnName ,
65- "aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id" ,
66+ if err := c .removeStaleRowsPerShard (
67+ ctx ,
68+ stmtStatsCleanupOps ,
6669 ); err != nil {
6770 return err
6871 }
6972
70- return c .removeStaleRowsPerShard (ctx ,
71- "system.transaction_statistics" ,
72- systemschema .TxnStatsHashColumnName ,
73- "aggregated_ts, fingerprint_id, app_name, node_id" ,
73+ return c .removeStaleRowsPerShard (
74+ ctx ,
75+ txnStatsCleanupOps ,
7476 )
7577}
7678
7779func (c * StatsCompactor ) removeStaleRowsPerShard (
78- ctx context.Context , tableName , hashColumnName , pkColumnNames string ,
80+ ctx context.Context , ops * cleanupOperations ,
7981) error {
8082 rowLimitPerShard := c .getRowLimitPerShard ()
81-
82- existingRowCountQuery := c .getQueryForCheckingTableRowCounts (tableName , hashColumnName )
83- deleteOldStatsStmt := c .getStatementForDeletingStaleRows (tableName , pkColumnNames , hashColumnName )
84-
8583 for shardIdx , rowLimit := range rowLimitPerShard {
8684 var existingRowCount int64
85+
8786 if err := c .getRowCountForShard (
8887 ctx ,
89- existingRowCountQuery ,
88+ ops . getScanStmt ( c . knobs ) ,
9089 shardIdx ,
9190 & existingRowCount ,
9291 ); err != nil {
9392 return err
9493 }
9594
95+ if c .knobs != nil && c .knobs .OnCleanupStartForShard != nil {
96+ c .knobs .OnCleanupStartForShard (shardIdx , existingRowCount , rowLimit )
97+ }
98+
9699 if err := c .removeStaleRowsForShard (
97100 ctx ,
98- deleteOldStatsStmt ,
99- shardIdx ,
101+ ops ,
102+ int64 ( shardIdx ) ,
100103 existingRowCount ,
101104 rowLimit ,
102105 ); err != nil {
@@ -154,73 +157,179 @@ func (c *StatsCompactor) getRowLimitPerShard() []int64 {
154157// avoid having one large transaction.
155158func (c * StatsCompactor ) removeStaleRowsForShard (
156159 ctx context.Context ,
157- stmt string ,
158- shardIdx int ,
160+ ops * cleanupOperations ,
161+ shardIdx int64 ,
159162 existingRowCountPerShard , maxRowLimitPerShard int64 ,
160163) error {
164+ var err error
165+ var lastDeletedRow tree.Datums
161166 maxDeleteRowsPerTxn := CompactionJobRowsToDeletePerTxn .Get (& c .st .SV )
167+
162168 if rowsToRemove := existingRowCountPerShard - maxRowLimitPerShard ; rowsToRemove > 0 {
163169 for remainToBeRemoved := rowsToRemove ; remainToBeRemoved > 0 ; {
164170 rowsToRemovePerTxn := remainToBeRemoved
165171 if remainToBeRemoved > maxDeleteRowsPerTxn {
166172 rowsToRemovePerTxn = maxDeleteRowsPerTxn
167173 }
168- if _ , err := c .ie .ExecEx (ctx ,
169- "delete-old-sql-stats" ,
170- nil , /* txn */
171- sessiondata.InternalExecutorOverride {User : security .NodeUserName ()},
174+
175+ stmt := ops .getDeleteStmt (lastDeletedRow )
176+ qargs := c .getQargs (shardIdx , rowsToRemovePerTxn , lastDeletedRow )
177+ var rowsRemoved int64
178+
179+ lastDeletedRow , rowsRemoved , err = c .executeDeleteStmt (
180+ ctx ,
172181 stmt ,
173- shardIdx ,
174- rowsToRemovePerTxn ,
175- ); err != nil {
182+ qargs ,
183+ )
184+ if err != nil {
176185 return err
177186 }
178-
179187 c .rowsRemovedCounter .Inc (rowsToRemovePerTxn )
188+
189+ // If we removed less rows compared to what we intended, it means something
190+ // else is interfering with the cleanup job, likely a human operator.
191+ // This can happen when the operator forgot to cancel the job when manual
192+ // intervention is happening.
193+ if rowsRemoved < rowsToRemovePerTxn {
194+ break
195+ }
196+
180197 remainToBeRemoved -= rowsToRemovePerTxn
198+
181199 }
182200 }
183201
184202 return nil
185203}
186204
187- func (c * StatsCompactor ) getQueryForCheckingTableRowCounts (
188- tableName , hashColumnName string ,
189- ) string {
190- // [1]: table name
191- // [2]: follower read clause
192- // [3]: hash column name
193- existingRowCountQuery := `
194- SELECT count(*)
195- FROM %[1]s
196- %[2]s
197- WHERE %[3]s = $1
198- `
199-
200- followerReadClause := c .knobs .GetAOSTClause ()
201-
202- return fmt .Sprintf (existingRowCountQuery , tableName , followerReadClause , hashColumnName )
205+ func (c * StatsCompactor ) executeDeleteStmt (
206+ ctx context.Context , delStmt string , qargs []interface {},
207+ ) (lastRow tree.Datums , rowsDeleted int64 , err error ) {
208+ it , err := c .ie .QueryIteratorEx (ctx ,
209+ "delete-old-sql-stats" ,
210+ nil , /* txn */
211+ sessiondata.InternalExecutorOverride {User : security .NodeUserName ()},
212+ delStmt ,
213+ qargs ... ,
214+ )
215+ if err != nil {
216+ return nil , 0 , err
217+ }
218+ defer func () {
219+ err = errors .CombineErrors (err , it .Close ())
220+ }()
221+
222+ var ok bool
223+ for ok , err = it .Next (ctx ); ok ; ok , err = it .Next (ctx ) {
224+ lastRow = it .Cur ()
225+ rowsDeleted ++
226+ }
227+
228+ return lastRow , rowsDeleted , err
229+ }
230+
231+ func (c * StatsCompactor ) getQargs (shardIdx , limit int64 , lastDeletedRow tree.Datums ) []interface {} {
232+ size := len (lastDeletedRow ) + 2
233+ if cap (c .scratch .qargs ) < size {
234+ c .scratch .qargs = make ([]interface {}, 0 , size )
235+ }
236+ c .scratch .qargs = c .scratch .qargs [:0 ]
237+
238+ c .scratch .qargs = append (c .scratch .qargs , tree .NewDInt (tree .DInt (shardIdx )))
239+ c .scratch .qargs = append (c .scratch .qargs , tree .NewDInt (tree .DInt (limit )))
240+
241+ for _ , value := range lastDeletedRow {
242+ c .scratch .qargs = append (c .scratch .qargs , value )
243+ }
244+
245+ return c .scratch .qargs
246+ }
247+
248+ type cleanupOperations struct {
249+ initialScanStmtTemplate string
250+ unconstrainedDeleteStmt string
251+ constrainedDeleteStmt string
203252}
204253
205- func (c * StatsCompactor ) getStatementForDeletingStaleRows (
206- tableName , pkColumnNames , hashColumnName string ,
207- ) string {
208- // [1]: table name
209- // [2]: primary key
210- // [3]: hash column name
211- const stmt = `
212- DELETE FROM %[1]s
213- WHERE (%[2]s) IN (
214- SELECT %[2]s
215- FROM %[1]s
216- WHERE %[3]s = $1
217- ORDER BY aggregated_ts ASC
218- LIMIT $2
254+ var (
255+ stmtStatsCleanupOps = & cleanupOperations {
256+ initialScanStmtTemplate : `
257+ SELECT count(*)
258+ FROM system.statement_statistics
259+ %s
260+ WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1` ,
261+ unconstrainedDeleteStmt : `
262+ DELETE FROM system.statement_statistics
263+ WHERE (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id) IN (
264+ SELECT aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id
265+ FROM system.statement_statistics
266+ WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1
267+ ORDER BY aggregated_ts ASC
268+ LIMIT $2
269+ ) RETURNING aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id` ,
270+ constrainedDeleteStmt : `
271+ DELETE FROM system.statement_statistics
272+ WHERE (aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id) IN (
273+ SELECT aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id
274+ FROM system.statement_statistics
275+ WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_plan_hash_transaction_fingerprint_id_shard_8 = $1
276+ AND (
277+ (
278+ aggregated_ts,
279+ fingerprint_id,
280+ transaction_fingerprint_id,
281+ plan_hash,
282+ app_name,
283+ node_id
284+ ) >= ($3, $4, $5, $6, $7, $8)
285+ )
286+ ORDER BY aggregated_ts ASC
287+ LIMIT $2
288+ ) RETURNING aggregated_ts, fingerprint_id, transaction_fingerprint_id, plan_hash, app_name, node_id` ,
289+ }
290+ txnStatsCleanupOps = & cleanupOperations {
291+ initialScanStmtTemplate : `
292+ SELECT count(*)
293+ FROM system.transaction_statistics
294+ %s
295+ WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 = $1` ,
296+ unconstrainedDeleteStmt : `
297+ DELETE FROM system.transaction_statistics
298+ WHERE (aggregated_ts, fingerprint_id, app_name, node_id) IN (
299+ SELECT aggregated_ts, fingerprint_id, app_name, node_id
300+ FROM system.transaction_statistics
301+ WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 = $1
302+ ORDER BY aggregated_ts ASC
303+ LIMIT $2
304+ ) RETURNING aggregated_ts, fingerprint_id, app_name, node_id` ,
305+ constrainedDeleteStmt : `
306+ DELETE FROM system.transaction_statistics
307+ WHERE (aggregated_ts, fingerprint_id, app_name, node_id) IN (
308+ SELECT aggregated_ts, fingerprint_id, app_name, node_id
309+ FROM system.transaction_statistics
310+ WHERE crdb_internal_aggregated_ts_app_name_fingerprint_id_node_id_shard_8 = $1
311+ AND (
312+ (
313+ aggregated_ts,
314+ fingerprint_id,
315+ app_name,
316+ node_id
317+ ) >= ($3, $4, $5, $6)
318+ )
319+ ORDER BY aggregated_ts ASC
320+ LIMIT $2
321+ ) RETURNING aggregated_ts, fingerprint_id, app_name, node_id` ,
322+ }
219323)
220- `
221- return fmt .Sprintf (stmt ,
222- tableName ,
223- pkColumnNames ,
224- hashColumnName ,
225- )
324+
325+ func (c * cleanupOperations ) getScanStmt (knobs * sqlstats.TestingKnobs ) string {
326+ return fmt .Sprintf (c .initialScanStmtTemplate , knobs .GetAOSTClause ())
327+ }
328+
329+ func (c * cleanupOperations ) getDeleteStmt (lastDeletedRow tree.Datums ) string {
330+ if len (lastDeletedRow ) == 0 {
331+ return c .unconstrainedDeleteStmt
332+ }
333+
334+ return c .constrainedDeleteStmt
226335}
0 commit comments