Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,11 +908,9 @@ func createImportingDescriptors(

// Allocate no schedule to the row-level TTL.
// This will be re-written when the descriptor is published.
if details.DescriptorCoverage != tree.AllDescriptors {
for _, table := range mutableTables {
if table.HasRowLevelTTL() {
table.RowLevelTTL.ScheduleID = 0
}
for _, table := range mutableTables {
if table.HasRowLevelTTL() {
table.RowLevelTTL.ScheduleID = 0
}
}

Expand Down Expand Up @@ -1769,15 +1767,10 @@ func (r *restoreResumer) publishDescriptors(

// Go through the descriptors and find any declarative schema change jobs
// affecting them.
//
// If we're restoring all the descriptors, it means we're also restoring the
// jobs.
if details.DescriptorCoverage != tree.AllDescriptors {
if err := scbackup.CreateDeclarativeSchemaChangeJobs(
ctx, r.execCfg.JobRegistry, txn, all,
); err != nil {
return err
}
if err := scbackup.CreateDeclarativeSchemaChangeJobs(
ctx, r.execCfg.JobRegistry, txn, all,
); err != nil {
return err
}

// Write the new TableDescriptors and flip state over to public so they can be
Expand Down Expand Up @@ -1810,7 +1803,7 @@ func (r *restoreResumer) publishDescriptors(
return err
}
// Assign a TTL schedule before publishing.
if details.DescriptorCoverage != tree.AllDescriptors && mutTable.HasRowLevelTTL() {
if mutTable.HasRowLevelTTL() {
j, err := sql.CreateRowLevelTTLScheduledJob(
ctx,
execCfg,
Expand Down Expand Up @@ -2479,7 +2472,7 @@ func (r *restoreResumer) restoreSystemTables(

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := systemTable.config.migrationFunc(ctx, r.execCfg, txn,
systemTable.stagingTableName); err != nil {
systemTable.stagingTableName, details.DescriptorRewrites); err != nil {
return err
}

Expand Down
84 changes: 71 additions & 13 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"context"
fmt "fmt"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -69,7 +71,7 @@ type systemBackupConfiguration struct {
// migrationFunc performs the necessary migrations on the system table data in
// the crdb_temp staging table before it is loaded into the actual system
// table.
migrationFunc func(ctx context.Context, execCtx *sql.ExecutorConfig, txn *kv.Txn, tempTableName string) error
migrationFunc func(ctx context.Context, execCtx *sql.ExecutorConfig, txn *kv.Txn, tempTableName string, rekeys jobspb.DescRewriteMap) error
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing though, take it or leave it. It's definitely more code for the sake of making the convention harder to break. I think the below suggestion would be better than your comments.

  1. what about making the migrationFunc field required and then having an explicit value that indicates that it's a no-op and fail on nil
  2. make this function an interface (can have a function-based implementation) and have an implementation of that interface which is used to explicitly mark that there are no fields. In this form, you reject nil at init time and you'll call the method in the interface no matter what unless it's the sentinel saying there is nothing to migrate

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I started on merging optIn/optOut and migrationFunc into one field but backed it out for the sake of a more contained pr. I was thinking: a) rename to restoreStrategy, have a const optOutOfRestore migrationFunc = nil and then have a noop func rawRowRestore = func (....) { return nil } for the ones that have no func right now. But then I was looking at the fact that for each one that has a func, after we call it, we update the job (details, ugh) to reflect that has been called for resumption, so then switched return nil to return errNoopMigtation and checked that in the caller, but it was starting to get to be a lot of code that wasn't related to ID fixes contained here so I backed it out for a future diff.

// customRestoreFunc is responsible for restoring the data from a table that
// holds the restore system table data into the given system table. If none
// is provided then `defaultRestoreFunc` is used.
Expand Down Expand Up @@ -185,42 +187,74 @@ func settingsRestoreFunc(
// by TestAllSystemTablesHaveBackupConfig.
var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
systemschema.UsersTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
},
systemschema.ZonesTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // ID in "id".
// The zones table should be restored before the user data so that the range
// allocator properly distributes ranges during the restore.
migrationFunc: rekeySystemTable("id"),
restoreBeforeData: true,
},
systemschema.SettingsTable.GetName(): {
// The settings table should be restored after all other system tables have
// been restored. This is because we do not want to overwrite the clusters'
// settings before all other user and system data has been restored.
restoreInOrder: math.MaxInt32,
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
customRestoreFunc: settingsRestoreFunc,
},
systemschema.LocationsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
},
systemschema.RoleMembersTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
},
systemschema.RoleOptionsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
},
systemschema.UITable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
},
systemschema.CommentsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // ID in "object_id".
migrationFunc: rekeySystemTable("object_id"),
},
systemschema.JobsTable.GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.ScheduledJobsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // Desc IDs in some rows.
// Some rows, specifically those which are schedules for row-ttl, have IDs
// baked into their values, making the restored rows invalid. Rewriting them
// would be tricky since the ID is in a binary proto field, but we already
// have code to synthesize new schedules from the table being restored that
// runs during descriptor creation. We can leverage these by leaving the
// synthesized schedule rows in the real schedule table when we otherwise
// clean it out, and skipping TTL rows when we copy from the restored
// schedule table.
customRestoreFunc: func(ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, _, tempTableName string) error {
execType := tree.ScheduledRowLevelTTLExecutor.InternalName()

const deleteQuery = "DELETE FROM system.scheduled_jobs WHERE executor_type <> $1"
if _, err := execCfg.InternalExecutor.Exec(
ctx, "restore-scheduled_jobs-delete", txn, deleteQuery, execType,
); err != nil {
return errors.Wrapf(err, "deleting existing scheduled_jobs")
}

restoreQuery := fmt.Sprintf(
"INSERT INTO system.scheduled_jobs (SELECT * FROM %s WHERE executor_type <> $1);",
tempTableName,
)

if _, err := execCfg.InternalExecutor.Exec(
ctx, "restore-scheduled_jobs-insert", txn, restoreQuery, execType,
); err != nil {
return err
}
return nil
},
},
systemschema.TableStatisticsTable.GetName(): {
// Table statistics are backed up in the backup descriptor for now.
Expand Down Expand Up @@ -290,7 +324,8 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.DatabaseRoleSettingsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // ID in "database_id".
migrationFunc: rekeySystemTable("database_id"),
},
systemschema.TenantUsageTable.GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
Expand All @@ -302,7 +337,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.TenantSettingsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
customRestoreFunc: tenantSettingsTableRestoreFunc,
},
systemschema.SpanCountTable.GetName(): {
Expand All @@ -313,10 +348,33 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.SystemExternalConnectionsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
shouldIncludeInClusterBackup: optInToClusterBackup, // No desc ID columns.
},
}

func rekeySystemTable(
colName string,
) func(context.Context, *sql.ExecutorConfig, *kv.Txn, string, jobspb.DescRewriteMap) error {
return func(ctx context.Context, execCtx *sql.ExecutorConfig, txn *kv.Txn, tempTableName string, rekeys jobspb.DescRewriteMap) error {
toRekey := make(descpb.IDs, 0, len(rekeys))
for i := range rekeys {
toRekey = append(toRekey, i)
}
sort.Sort(toRekey)

executor := execCtx.InternalExecutor
for _, old := range toRekey {
// TODO(dt): batch 10+ updates at once if >10 in the map.
q := fmt.Sprintf("UPDATE %s SET %s = $1 WHERE %s = $2", tempTableName, colName, colName)
_, err := executor.Exec(ctx, fmt.Sprintf("remap-%s", tempTableName), txn, q, rekeys[old].ID, old)
if err != nil {
return errors.Wrapf(err, "remapping %s", tempTableName)
}
}
return nil
}
}

// GetSystemTablesToIncludeInClusterBackup returns a set of system table names that
// should be included in a cluster backup.
func GetSystemTablesToIncludeInClusterBackup() map[string]struct{} {
Expand Down