Skip to content

Commit c2937d7

Browse files
authored
Merge pull request #13200 from serathius/downgrade-storage
Implement schema migration and panic when trying to downgrade storage
2 parents 58fb625 + 79f6faa commit c2937d7

20 files changed

Lines changed: 1361 additions & 169 deletions

CHANGELOG-3.6.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ Previous change logs can be found at [CHANGELOG-3.5](https://github.com/etcd-io/
88

99
See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
1010

11+
### Breaking Changes
12+
13+
- `etcd` will no longer start on data dir created by newer versions (for example etcd v3.6 will not run on v3.7+ data dir). To downgrade data dir please check out `etcdutl migrate` command.
14+
1115
### etcdctl v3
1216

1317
- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).

etcdutl/etcdutl/migrate_command.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -103,29 +103,30 @@ func migrateCommandFunc(c *migrateConfig) error {
103103
defer c.be.Close()
104104
lg := GetLogger()
105105
tx := c.be.BatchTx()
106-
tx.Lock()
107106
current, err := schema.DetectSchemaVersion(lg, tx)
108107
if err != nil {
109-
tx.Unlock()
110108
lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
111109
return err
112110
}
113-
if *current == *c.targetVersion {
114-
tx.Unlock()
115-
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(current)))
111+
if current == *c.targetVersion {
112+
lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
116113
return nil
117114
}
118-
if c.force {
119-
unsafeMigrateForce(lg, tx, c.targetVersion)
120-
tx.Unlock()
121-
c.be.ForceCommit()
122-
return nil
115+
err = schema.Migrate(lg, tx, *c.targetVersion)
116+
if err != nil {
117+
if !c.force {
118+
return err
119+
}
120+
lg.Info("normal migrate failed, trying with force", zap.Error(err))
121+
migrateForce(lg, tx, c.targetVersion)
123122
}
124-
tx.Unlock()
125-
return fmt.Errorf("storage version migration is not yet supported")
123+
c.be.ForceCommit()
124+
return nil
126125
}
127126

128-
func unsafeMigrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
127+
func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) {
128+
tx.Lock()
129+
defer tx.Unlock()
129130
// Storage version is only supported since v3.6
130131
if target.LessThan(schema.V3_6) {
131132
schema.UnsafeClearStorageVersion(tx)

pkg/notify/notify.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2021 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package notify
16+
17+
import (
18+
"sync"
19+
)
20+
21+
// Notifier is a thread safe struct that can be used to send notification about
22+
// some event to multiple consumers.
23+
type Notifier struct {
24+
mu sync.RWMutex
25+
channel chan struct{}
26+
}
27+
28+
// NewNotifier returns new notifier
29+
func NewNotifier() *Notifier {
30+
return &Notifier{
31+
channel: make(chan struct{}),
32+
}
33+
}
34+
35+
// Receive returns channel that can be used to wait for notification.
36+
// Consumers will be informed by closing the channel.
37+
func (n *Notifier) Receive() <-chan struct{} {
38+
n.mu.RLock()
39+
defer n.mu.RUnlock()
40+
return n.channel
41+
}
42+
43+
// Notify closes the channel passed to consumers and creates new channel to used
44+
// for next notification.
45+
func (n *Notifier) Notify() {
46+
newChannel := make(chan struct{})
47+
n.mu.Lock()
48+
channelToClose := n.channel
49+
n.channel = newChannel
50+
n.mu.Unlock()
51+
close(channelToClose)
52+
}

server/etcdserver/adapters.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"context"
1919

2020
"github.com/coreos/go-semver/semver"
21+
"go.etcd.io/etcd/server/v3/storage/backend"
22+
"go.etcd.io/etcd/server/v3/storage/schema"
2123
"go.uber.org/zap"
2224

2325
"go.etcd.io/etcd/api/v3/version"
@@ -28,6 +30,14 @@ import (
2830
// serverVersionAdapter implements Server interface needed by serverversion.Monitor
2931
type serverVersionAdapter struct {
3032
*EtcdServer
33+
tx backend.BatchTx
34+
}
35+
36+
func newServerVersionAdapter(s *EtcdServer) *serverVersionAdapter {
37+
return &serverVersionAdapter{
38+
EtcdServer: s,
39+
tx: nil,
40+
}
3141
}
3242

3343
var _ serverversion.Server = (*serverVersionAdapter)(nil)
@@ -56,3 +66,36 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
5666
func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
5767
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
5868
}
69+
70+
func (s *serverVersionAdapter) GetStorageVersion() *semver.Version {
71+
if s.tx == nil {
72+
s.Lock()
73+
defer s.Unlock()
74+
}
75+
v, err := schema.UnsafeDetectSchemaVersion(s.lg, s.tx)
76+
if err != nil {
77+
return nil
78+
}
79+
return &v
80+
}
81+
82+
func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) {
83+
if s.tx == nil {
84+
s.Lock()
85+
defer s.Unlock()
86+
}
87+
err := schema.UnsafeMigrate(s.lg, s.tx, target)
88+
if err != nil {
89+
s.lg.Error("failed migrating storage schema", zap.String("storage-version", target.String()), zap.Error(err))
90+
}
91+
}
92+
93+
func (s *serverVersionAdapter) Lock() {
94+
s.tx = s.be.BatchTx()
95+
s.tx.Lock()
96+
}
97+
98+
func (s *serverVersionAdapter) Unlock() {
99+
s.tx.Unlock()
100+
s.tx = nil
101+
}

server/etcdserver/api/membership/cluster.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"go.etcd.io/etcd/api/v3/version"
3030
"go.etcd.io/etcd/client/pkg/v3/types"
3131
"go.etcd.io/etcd/pkg/v3/netutil"
32+
"go.etcd.io/etcd/pkg/v3/notify"
3233
"go.etcd.io/etcd/raft/v3"
3334
"go.etcd.io/etcd/raft/v3/raftpb"
3435
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
@@ -57,7 +58,8 @@ type RaftCluster struct {
5758
// removed id cannot be reused.
5859
removed map[types.ID]bool
5960

60-
downgradeInfo *DowngradeInfo
61+
downgradeInfo *DowngradeInfo
62+
versionChanged *notify.Notifier
6163
}
6264

6365
// ConfigChangeContext represents a context for confChange.
@@ -247,6 +249,10 @@ func (c *RaftCluster) SetBackend(be MembershipBackend) {
247249
c.be.MustCreateBackendBuckets()
248250
}
249251

252+
func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
253+
c.versionChanged = n
254+
}
255+
250256
func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
251257
c.Lock()
252258
defer c.Unlock()
@@ -545,6 +551,9 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
545551
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
546552
}
547553
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1)
554+
if c.versionChanged != nil {
555+
c.versionChanged.Notify()
556+
}
548557
onSet(c.lg, ver)
549558
}
550559

server/etcdserver/bootstrap.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,29 @@ func bootstrapBackend(cfg config.ServerConfig) (be backend.Backend, ci cindex.Co
147147
ci = cindex.NewConsistentIndex(nil)
148148
beHooks = serverstorage.NewBackendHooks(cfg.Logger, ci)
149149
be = serverstorage.OpenBackend(cfg, beHooks)
150+
defer func() {
151+
if err != nil && be != nil {
152+
be.Close()
153+
}
154+
}()
150155
ci.SetBackend(be)
151156
schema.CreateMetaBucket(be.BatchTx())
152157
if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
153-
err := maybeDefragBackend(cfg, be)
158+
err = maybeDefragBackend(cfg, be)
154159
if err != nil {
155-
be.Close()
156160
return nil, nil, false, nil, err
157161
}
158162
}
159163
cfg.Logger.Debug("restore consistentIndex", zap.Uint64("index", ci.ConsistentIndex()))
164+
165+
// TODO(serathius): Implement schema setup in fresh storage
166+
if beExist {
167+
err = schema.Validate(cfg.Logger, be.BatchTx())
168+
if err != nil {
169+
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
170+
return nil, nil, false, nil, err
171+
}
172+
}
160173
return be, ci, beExist, beHooks, nil
161174
}
162175

0 commit comments

Comments
 (0)