Skip to content

Commit 508442d

Browse files
austenLacyhkdsun
andauthored
vttestserver: persist vschema changes in --persistent_mode (vitessio#13065) (#107)
Co-authored-by: Hormoz Kheradmand <hkdsun@users.noreply.github.com>
1 parent 564a634 commit 508442d

7 files changed

Lines changed: 157 additions & 26 deletions

File tree

go/cmd/vtcombo/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,13 @@ var (
6161
mysqlPort = flags.Int("mysql_port", 3306, "mysql port")
6262
externalTopoServer = flags.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+
6363
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")
64-
plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.")
64+
plannerName = flags.String("planner-version", "", "Sets the default planner to use when the session has not changed it. Valid values are: V3, V3Insert, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the gen4 planner and falls back to the V3 planner if the gen4 fails.")
65+
vschemaPersistenceDir = flags.String("vschema-persistence-dir", "", "If set, per-keyspace vschema will be persisted in this directory "+
66+
"and reloaded into the in-memory topology server across restarts. Bookkeeping is performed using a simple watcher goroutine. "+
67+
"This is useful when running vtcombo as an application development container (e.g. vttestserver) where you want to keep the same "+
68+
"vschema even if developer's machine reboots. This works in tandem with vttestserver's --persistent_mode flag. Needless to say, "+
69+
"this is neither a perfect nor a production solution for vschema persistence. Consider using the --external_topo_server flag if "+
70+
"you require a more complete solution. This flag is ignored if --external_topo_server is set.")
6571

6672
tpb vttestpb.VTTestTopology
6773
ts *topo.Server
@@ -293,6 +299,10 @@ func main() {
293299
exit.Return(1)
294300
}
295301

302+
if *vschemaPersistenceDir != "" && !*externalTopoServer {
303+
startVschemaWatcher(*vschemaPersistenceDir, tpb.Keyspaces, ts)
304+
}
305+
296306
servenv.OnRun(func() {
297307
addStatusParts(vtg)
298308
})

go/cmd/vtcombo/vschema_watcher.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
Copyright 2023 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"os"
23+
"path"
24+
25+
"vitess.io/vitess/go/vt/log"
26+
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
27+
vttestpb "vitess.io/vitess/go/vt/proto/vttest"
28+
"vitess.io/vitess/go/vt/topo"
29+
)
30+
31+
func startVschemaWatcher(vschemaPersistenceDir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) {
32+
// Create the directory if it doesn't exist.
33+
if err := createDirectoryIfNotExists(vschemaPersistenceDir); err != nil {
34+
log.Fatalf("Unable to create vschema persistence directory %v: %v", vschemaPersistenceDir, err)
35+
}
36+
37+
// If there are keyspace files, load them.
38+
loadKeyspacesFromDir(vschemaPersistenceDir, keyspaces, ts)
39+
40+
// Rebuild the SrvVSchema object in case we loaded vschema from file
41+
if err := ts.RebuildSrvVSchema(context.Background(), tpb.Cells); err != nil {
42+
log.Fatalf("RebuildSrvVSchema failed: %v", err)
43+
}
44+
45+
// Now watch for changes in the SrvVSchema object and persist them to disk.
46+
go watchSrvVSchema(context.Background(), ts, tpb.Cells[0])
47+
}
48+
49+
func loadKeyspacesFromDir(dir string, keyspaces []*vttestpb.Keyspace, ts *topo.Server) {
50+
for _, ks := range tpb.Keyspaces {
51+
ksFile := path.Join(dir, ks.Name+".json")
52+
if _, err := os.Stat(ksFile); err == nil {
53+
jsonData, err := os.ReadFile(ksFile)
54+
if err != nil {
55+
log.Fatalf("Unable to read keyspace file %v: %v", ksFile, err)
56+
}
57+
58+
keyspace := &vschemapb.Keyspace{}
59+
err = json.Unmarshal(jsonData, keyspace)
60+
if err != nil {
61+
log.Fatalf("Unable to parse keyspace file %v: %v", ksFile, err)
62+
}
63+
64+
ts.SaveVSchema(context.Background(), ks.Name, keyspace)
65+
log.Infof("Loaded keyspace %v from %v\n", ks.Name, ksFile)
66+
}
67+
}
68+
}
69+
70+
func watchSrvVSchema(ctx context.Context, ts *topo.Server, cell string) {
71+
data, ch, err := ts.WatchSrvVSchema(context.Background(), tpb.Cells[0])
72+
if err != nil {
73+
log.Fatalf("WatchSrvVSchema failed: %v", err)
74+
}
75+
76+
if data.Err != nil {
77+
log.Fatalf("WatchSrvVSchema could not retrieve initial vschema: %v", data.Err)
78+
}
79+
persistNewSrvVSchema(data.Value)
80+
81+
for update := range ch {
82+
if update.Err != nil {
83+
log.Errorf("WatchSrvVSchema returned an error: %v", update.Err)
84+
} else {
85+
persistNewSrvVSchema(update.Value)
86+
}
87+
}
88+
}
89+
90+
func persistNewSrvVSchema(srvVSchema *vschemapb.SrvVSchema) {
91+
for ksName, ks := range srvVSchema.Keyspaces {
92+
jsonBytes, err := json.MarshalIndent(ks, "", " ")
93+
if err != nil {
94+
log.Errorf("Error marshaling keyspace: %v", err)
95+
continue
96+
}
97+
98+
err = os.WriteFile(path.Join(*vschemaPersistenceDir, ksName+".json"), jsonBytes, 0644)
99+
if err != nil {
100+
log.Errorf("Error writing keyspace file: %v", err)
101+
}
102+
log.Infof("Persisted keyspace %v to %v", ksName, *vschemaPersistenceDir)
103+
}
104+
}
105+
106+
func createDirectoryIfNotExists(dir string) error {
107+
if _, err := os.Stat(dir); os.IsNotExist(err) {
108+
return os.Mkdir(dir, 0755)
109+
}
110+
return nil
111+
}

go/cmd/vttestserver/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ func registerFlags(fs *pflag.FlagSet) {
9292
" vttestserver as a database container in local developer environments. Note"+
9393
" that db migration files (--schema_dir option) and seeding of"+
9494
" random data (--initialize_with_random_data option) will only run during"+
95-
" cluster startup if the data directory does not already exist. vschema"+
96-
" migrations are run every time the cluster starts, since persistence"+
97-
" for the topology server has not been implemented yet")
95+
" cluster startup if the data directory does not already exist. "+
96+
" Changes to VSchema are persisted across cluster restarts using a simple"+
97+
" watcher if the --data_dir argument is specified.")
9898

9999
fs.BoolVar(&doSeed, "initialize_with_random_data", false,
100100
"If this flag is each table-shard will be initialized"+

go/cmd/vttestserver/vttestserver_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,14 @@ func TestPersistentMode(t *testing.T) {
8181
cluster, err := startPersistentCluster(dir)
8282
assert.NoError(t, err)
8383

84-
// basic sanity checks similar to TestRunsVschemaMigrations
84+
// Add a new "ad-hoc" vindex via vtgate once the cluster is up, to later make sure it is persisted across teardowns
85+
err = addColumnVindex(cluster, "test_keyspace", "alter vschema on persistence_test add vindex my_vdx(id)")
86+
assert.NoError(t, err)
87+
88+
// Basic sanity checks similar to TestRunsVschemaMigrations
89+
// See go/cmd/vttestserver/data/schema/app_customer/* and go/cmd/vttestserver/data/schema/test_keyspace/*
8590
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
91+
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"})
8692
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
8793

8894
// insert some data to ensure persistence across teardowns
@@ -111,8 +117,9 @@ func TestPersistentMode(t *testing.T) {
111117
defer cluster.TearDown()
112118
assert.NoError(t, err)
113119

114-
// rerun our sanity checks to make sure vschema migrations are run during every startup
120+
// rerun our sanity checks to make sure vschema is persisted correctly
115121
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
122+
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "persistence_test", vindex: "my_vdx", vindexType: "hash", column: "id"})
116123
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})
117124

118125
// ensure previous data was successfully persisted
@@ -316,7 +323,8 @@ func startCluster(flags ...string) (vttest.LocalCluster, error) {
316323
keyspaceArg := "--keyspaces=" + strings.Join(clusterKeyspaces, ",")
317324
numShardsArg := "--num_shards=2,2"
318325
vschemaDDLAuthorizedUsers := "--vschema_ddl_authorized_users=%"
319-
os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers}...)
326+
alsoLogToStderr := "--alsologtostderr" // better debugging
327+
os.Args = append(os.Args, []string{schemaDirArg, keyspaceArg, numShardsArg, tabletHostname, vschemaDDLAuthorizedUsers, alsoLogToStderr}...)
320328
os.Args = append(os.Args, flags...)
321329
return runCluster()
322330
}

go/flags/endtoend/vttestserver.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ Usage of vttestserver:
7777
--num_shards strings Comma separated shard count (one per keyspace) (default [2])
7878
--onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s)
7979
--onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s)
80-
--persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. vschema migrations are run every time the cluster starts, since persistence for the topology server has not been implemented yet
80+
--persistent_mode If this flag is set, the MySQL data directory is not cleaned up when LocalCluster.TearDown() is called. This is useful for running vttestserver as a database container in local developer environments. Note that db migration files (--schema_dir option) and seeding of random data (--initialize_with_random_data option) will only run during cluster startup if the data directory does not already exist. Changes to VSchema are persisted across cluster restarts using a simple watcher if the --data_dir argument is specified.
8181
--pid_file string If set, the process will write its pid to the named file, and delete it on graceful shutdown.
8282
--planner-version string Sets the default planner to use when the session has not changed it. Valid values are: V3, Gen4, Gen4Greedy and Gen4Fallback. Gen4Fallback tries the new gen4 planner and falls back to the V3 planner if the gen4 fails.
8383
--pool_hostname_resolve_interval duration if set force an update to all hostnames and reconnect if changed, defaults to 0 (disabled)

go/vt/vttest/local_cluster.go

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -458,28 +458,26 @@ func (db *LocalCluster) loadSchema(shouldRunDatabaseMigrations bool) error {
458458
}
459459
}
460460

461-
glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql"))
462-
for _, filepath := range glob {
463-
cmds, err := LoadSQLFile(filepath, schemaDir)
464-
if err != nil {
465-
return err
466-
}
467-
468-
// One single vschema migration per file
469-
if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") {
470-
if err = db.applyVschema(keyspace, cmds[0]); err != nil {
461+
if shouldRunDatabaseMigrations {
462+
glob, _ := filepath.Glob(path.Join(schemaDir, "*.sql"))
463+
for _, filepath := range glob {
464+
cmds, err := LoadSQLFile(filepath, schemaDir)
465+
if err != nil {
471466
return err
472467
}
473-
continue
474-
}
475468

476-
if !shouldRunDatabaseMigrations {
477-
continue
478-
}
469+
// One single vschema migration per file
470+
if !db.OnlyMySQL && len(cmds) == 1 && strings.HasPrefix(strings.ToUpper(cmds[0]), "ALTER VSCHEMA") {
471+
if err = db.applyVschema(keyspace, cmds[0]); err != nil {
472+
return err
473+
}
474+
continue
475+
}
479476

480-
for _, dbname := range db.shardNames(kpb) {
481-
if err := db.Execute(cmds, dbname); err != nil {
482-
return err
477+
for _, dbname := range db.shardNames(kpb) {
478+
if err := db.Execute(cmds, dbname); err != nil {
479+
return err
480+
}
483481
}
484482
}
485483
}

go/vt/vttest/vtprocess.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/http"
2424
"os"
2525
"os/exec"
26+
"path"
2627
"strings"
2728
"syscall"
2829
"time"
@@ -234,6 +235,9 @@ func VtcomboProcess(environment Environment, args *Config, mysql MySQLManager) (
234235
if args.SchemaDir != "" {
235236
vt.ExtraArgs = append(vt.ExtraArgs, []string{"--schema_dir", args.SchemaDir}...)
236237
}
238+
if args.PersistentMode && args.DataDir != "" {
239+
vt.ExtraArgs = append(vt.ExtraArgs, []string{"--vschema-persistence-dir", path.Join(args.DataDir, "vschema_data")}...)
240+
}
237241
if args.TransactionMode != "" {
238242
vt.ExtraArgs = append(vt.ExtraArgs, []string{"--transaction_mode", args.TransactionMode}...)
239243
}

0 commit comments

Comments
 (0)