@@ -26,52 +26,20 @@ import (
2626// cache provides a consistent snapshot when available, but the snapshot
2727// may be stale.
2828type Cache struct {
29- w * rangefeedcache.Watcher [* kvpb.RangeFeedValue ]
30- defaultZoneConfig * zonepb.ZoneConfig
31- additionalKVsSource config.SystemConfigProvider
32- mu struct {
29+ w * rangefeedcache.Watcher [* kvpb.RangeFeedValue ]
30+ defaultZoneConfig * zonepb.ZoneConfig
31+ mu struct {
3332 syncutil.RWMutex
3433
3534 cfg * config.SystemConfig
3635
3736 registry notificationRegistry
38-
39- // additionalKVs provides a mechanism for the creator of the
40- // cache to provide additional values.
41- //
42- // This is used to support injecting some key-value pairs from the
43- // system tenant into the system config.
44- additionalKVs []roachpb.KeyValue
4537 }
4638}
4739
4840// New constructs a new Cache.
4941func New (
5042 codec keys.SQLCodec , clock * hlc.Clock , f * rangefeed.Factory , defaultZoneConfig * zonepb.ZoneConfig ,
51- ) * Cache {
52- return NewWithAdditionalProvider (
53- codec , clock , f , defaultZoneConfig , nil , /* additionalProvider */
54- )
55- }
56-
57- // NewWithAdditionalProvider constructs a new Cache with the addition of
58- // another provider of a SystemConfig. This additional provider is used only
59- // for the KVs in its system config. The key-value pairs it provides should
60- // not overlap with those of this provider, if they do, the latest values
61- // will be preferred.
62- //
63- // This functionality exists to provide access to the system tenant's view
64- // of its zone configuration for RANGE DEFAULT and RANGE TENANTS. This is
65- // needed primarily in the mixed-version state before the tenant is in control
66- // of its own zone configurations.
67- //
68- // TODO(ajwerner): Remove this functionality once it's no longer needed in 22.2.
69- func NewWithAdditionalProvider (
70- codec keys.SQLCodec ,
71- clock * hlc.Clock ,
72- f * rangefeed.Factory ,
73- defaultZoneConfig * zonepb.ZoneConfig ,
74- additional config.SystemConfigProvider ,
7543) * Cache {
7644 // TODO(ajwerner): Deal with what happens if the system config has more than this
7745 // many rows.
@@ -82,7 +50,6 @@ func NewWithAdditionalProvider(
8250 defaultZoneConfig : defaultZoneConfig ,
8351 }
8452 c .mu .registry = notificationRegistry {}
85- c .additionalKVsSource = additional
8653
8754 spans := []roachpb.Span {
8855 codec .TableSpan (keys .DescriptorTableID ),
@@ -105,36 +72,6 @@ func (c *Cache) Start(ctx context.Context, stopper *stop.Stopper) error {
10572 if err := rangefeedcache .Start (ctx , stopper , c .w , nil /* onError */ ); err != nil {
10673 return err
10774 }
108- if c .additionalKVsSource != nil {
109- setAdditionalKeys := func () {
110- if cfg := c .additionalKVsSource .GetSystemConfig (); cfg != nil {
111- c .setAdditionalKeys (cfg .Values )
112- }
113- }
114- ch , unregister := c .additionalKVsSource .RegisterSystemConfigChannel ()
115- // Check if there are any additional keys to set before returning from
116- // start. This is mostly to make tests deterministic.
117- select {
118- case <- ch :
119- setAdditionalKeys ()
120- default :
121- }
122- if err := stopper .RunAsyncTask (ctx , "systemconfigwatcher-additional" , func (ctx context.Context ) {
123- for {
124- select {
125- case <- ctx .Done ():
126- return
127- case <- stopper .ShouldQuiesce ():
128- return
129- case <- ch :
130- setAdditionalKeys ()
131- }
132- }
133- }); err != nil {
134- unregister ()
135- return err
136- }
137- }
13875 return nil
13976}
14077
@@ -163,46 +100,6 @@ func (c *Cache) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister fun
163100 }
164101}
165102
166- func (c * Cache ) setAdditionalKeys (kvs []roachpb.KeyValue ) {
167- c .mu .Lock ()
168- defer c .mu .Unlock ()
169-
170- sort .Sort (keyValues (kvs ))
171- if c .mu .cfg == nil {
172- c .mu .additionalKVs = kvs
173- return
174- }
175-
176- cloned := append ([]roachpb.KeyValue (nil ), c .mu .cfg .Values ... )
177- trimmed := append (trimOldKVs (cloned , c .mu .additionalKVs ), kvs ... )
178- sort .Sort (keyValues (trimmed ))
179- c .mu .cfg = config .NewSystemConfig (c .defaultZoneConfig )
180- c .mu .cfg .Values = trimmed
181- c .mu .additionalKVs = kvs
182- c .mu .registry .notify ()
183- }
184-
185- // trimOldKVs removes KVs from cloned where for all keys in prev.
186- // This function assumes that both cloned and prev are sorted.
187- func trimOldKVs (cloned , prev []roachpb.KeyValue ) []roachpb.KeyValue {
188- trimmed := cloned [:0 ]
189- shouldSkip := func (clonedOrd int ) (shouldSkip bool ) {
190- for len (prev ) > 0 {
191- if cmp := prev [0 ].Key .Compare (cloned [clonedOrd ].Key ); cmp >= 0 {
192- return cmp == 0
193- }
194- prev = prev [1 :]
195- }
196- return false
197- }
198- for i := range cloned {
199- if ! shouldSkip (i ) {
200- trimmed = append (trimmed , cloned [i ])
201- }
202- }
203- return trimmed
204- }
205-
206103type keyValues []roachpb.KeyValue
207104
208105func (k keyValues ) Len () int { return len (k ) }
@@ -221,7 +118,8 @@ func (c *Cache) handleUpdate(
221118 var updatedData []roachpb.KeyValue
222119 switch update .Type {
223120 case rangefeedcache .CompleteUpdate :
224- updatedData = rangefeedbuffer .MergeKVs (c .mu .additionalKVs , updateKVs )
121+ sort .Sort (keyValues (updateKVs ))
122+ updatedData = updateKVs
225123 case rangefeedcache .IncrementalUpdate :
226124 if len (updateKVs ) == 0 {
227125 // Simply return since there is nothing interesting.
0 commit comments