@@ -4,6 +4,8 @@ package retention // import "github.com/influxdata/influxdb/services/retention"
44import (
55 "errors"
66 "fmt"
7+ "math"
8+ "slices"
79 "sync"
810 "time"
911
@@ -13,13 +15,37 @@ import (
1315 "go.uber.org/zap"
1416)
1517
16- type MetaClient interface {
18+ type OSSMetaClient interface {
1719 Databases () []meta.DatabaseInfo
1820 DeleteShardGroup (database , policy string , id uint64 ) error
1921 DropShard (id uint64 ) error
2022 PruneShardGroups () error
2123}
2224
25+ type MetaClient interface {
26+ OSSMetaClient
27+ NodeID () uint64
28+ }
29+
30+ const (
31+ // ossNodeID is a special node ID for OSS nodes. No enterprise node will ever have this node ID.
32+ // 0 can not be used because there is a brief period on startup before meta-client initialization
33+ // and before joining a cluster that NodeID() == 0.
34+ ossNodeID uint64 = math .MaxUint64
35+ )
36+
37+ // ossMetaClientAdapter adds methods the retention service needs to the OSS meta.Client implementation.
38+ // OSSMetaClient is decorated with methods needed for the Enterprise retention service instead of adding
39+ // them to the OSS MetaClient to avoid polluting the OSS MetaClient namespace.
40+ type ossMetaClientAdapter struct {
41+ OSSMetaClient
42+ }
43+
44+ // NodeID returns the magic ossNodeID identifier.
45+ func (c * ossMetaClientAdapter ) NodeID () uint64 {
46+ return ossNodeID
47+ }
48+
2349// Service represents the retention policy enforcement service.
2450type Service struct {
2551 MetaClient
@@ -58,12 +84,20 @@ func NewService(c Config) *Service {
5884}
5985
6086// OSSDropShardMetaRef creates a closure appropriate for OSS to use as DropShardMetaRef.
61- func OSSDropShardMetaRef (mc MetaClient ) func (uint64 , []uint64 ) error {
87+ func OSSDropShardMetaRef (mc OSSMetaClient ) func (uint64 , []uint64 ) error {
6288 return func (shardID uint64 , owners []uint64 ) error {
6389 return mc .DropShard (shardID )
6490 }
6591}
6692
93+ func (s * Service ) SetMetaClient (c MetaClient ) {
94+ s .MetaClient = c
95+ }
96+
97+ func (s * Service ) SetOSSMetaClient (c OSSMetaClient ) {
98+ s .SetMetaClient (& ossMetaClientAdapter {OSSMetaClient : c })
99+ }
100+
67101// Open starts retention policy enforcement.
68102func (s * Service ) Open () error {
69103 if ! s .config .Enabled || s .done != nil {
@@ -116,6 +150,10 @@ func (s *Service) run() {
116150 }
117151}
118152
153+ func (s * Service ) isOSS () bool {
154+ return s .NodeID () == ossNodeID
155+ }
156+
119157func (s * Service ) DeletionCheck () {
120158 log , logEnd := logger .NewOperation (s .logger , "Retention policy deletion check" , "retention_delete_check" )
121159 defer logEnd ()
@@ -244,16 +282,21 @@ func (s *Service) DeletionCheck() {
244282
245283 // Check for expired phantom shards that exist in the metadata but not in the store.
246284 for id , info := range deletedShardIDs {
247- func () {
248- log , logEnd := logger .NewOperation (log , "Drop phantom shard references" , "retention_drop_phantom_refs" ,
249- logger .Database (info .db ), logger .Shard (id ), logger .RetentionPolicy (info .rp ), zap .Uint64s ("owners" , info .owners ))
250- defer logEnd ()
251- log .Warn ("Expired phantom shard detected during retention check, removing from metadata" )
252- if err := s .DropShardMetaRef (id , info .owners ); err != nil {
253- log .Error ("Error dropping shard meta reference for phantom shard" , zap .Error (err ))
254- retryNeeded = true
255- }
256- }()
285+ // Enterprise tracks shard ownership while OSS does not because it is single node. A shard not in the
286+ // TSDB but in the metadata is always a phantom shard for OSS. For enterprise, it is only a phantom shard
287+ // if this node is supposed to own the shard according to the metadata.
288+ if s .isOSS () || slices .Contains (info .owners , s .NodeID ()) {
289+ func () {
290+ log , logEnd := logger .NewOperation (log , "Drop phantom shard references" , "retention_drop_phantom_refs" ,
291+ logger .Database (info .db ), logger .Shard (id ), logger .RetentionPolicy (info .rp ), zap .Uint64s ("owners" , info .owners ))
292+ defer logEnd ()
293+ log .Warn ("Expired phantom shard detected during retention check, removing from metadata" )
294+ if err := s .DropShardMetaRef (id , info .owners ); err != nil {
295+ log .Error ("Error dropping shard meta reference for phantom shard" , zap .Error (err ))
296+ retryNeeded = true
297+ }
298+ }()
299+ }
257300 }
258301
259302 func () {
0 commit comments