Skip to content

Commit 516ee6e

Browse files
efd6mergify-bot
authored andcommitted
libbeat/processors/add_process_metadata: implement a process cache eviction policy (#29717)
The eviction policy implemented here guarantees a hard limit to the number of entries in the cache and provides a downward force on the number of entries over time with an exponential decay on the fraction of entries that are older than the cache expiration duration. (cherry picked from commit 9bd989b)
1 parent 9a5ee41 commit 516ee6e

4 files changed

Lines changed: 159 additions & 9 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
9090
- system/socket: Fix bugs leading to wrong process being attributed to flows. {pull}29166[29166] {issue}17165[17165]
9191
- system/socket: Fix process name and arg truncation for long names, paths and args lists. {issue}24667[24667] {pull}29410[29410]
9292
- system/socket: Fix startup errors on newer 5.x kernels due to missing _do_fork function. {issue}29607[29607] {pull}29744[29744]
93+
- libbeat/processors/add_process_metadata: Fix memory leak in process cache. {issue}24890[24890] {pull}29717[29717]
9394

9495
*Filebeat*
9596

libbeat/processors/add_process_metadata/add_process_metadata.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ import (
3636
)
3737

3838
const (
39-
processorName = "add_process_metadata"
40-
cacheExpiration = time.Second * 30
41-
containerIDMapping = "container.id"
39+
processorName = "add_process_metadata"
40+
cacheExpiration = time.Second * 30
41+
cacheCapacity = 32 << 10 // maximum number of process cache entries.
42+
cacheEvictionEffort = 10 // number of entries to sample for expiry eviction.
43+
containerIDMapping = "container.id"
4244
)
4345

4446
var (
@@ -49,7 +51,7 @@ var (
4951
// ErrNoProcess is returned when metadata for a process can't be collected.
5052
ErrNoProcess = errors.New("process not found")
5153

52-
procCache = newProcessCache(cacheExpiration, gosysinfoProvider{})
54+
procCache = newProcessCache(cacheExpiration, cacheCapacity, cacheEvictionEffort, gosysinfoProvider{})
5355

5456
processCgroupPaths = cgroup.ProcessCgroupPaths
5557

@@ -112,7 +114,6 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
112114
}
113115

114116
mappings, err := config.getMappings()
115-
116117
if err != nil {
117118
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName)
118119
}
@@ -139,7 +140,6 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
139140
} else {
140141
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
141142
}
142-
143143
}
144144

145145
if withCache {

libbeat/processors/add_process_metadata/cache.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,22 @@ type processCacheEntry struct {
2929
}
3030

3131
type processCache struct {
32-
cache map[int]processCacheEntry
3332
provider processMetadataProvider
3433
expiration time.Duration
35-
rwMutex sync.RWMutex
34+
35+
cap int // cap is the maximum number of elements the cache will hold.
36+
effort int // effort is the number of entries to examine during expired element eviction.
37+
38+
rwMutex sync.RWMutex // rwMutex protects the cache map.
39+
cache map[int]processCacheEntry
3640
}
3741

38-
func newProcessCache(expiration time.Duration, provider processMetadataProvider) processCache {
42+
func newProcessCache(expiration time.Duration, cap, effort int, provider processMetadataProvider) processCache {
3943
return processCache{
4044
cache: make(map[int]processCacheEntry),
4145
expiration: expiration,
46+
cap: cap,
47+
effort: effort,
4248
provider: provider,
4349
}
4450
}
@@ -58,6 +64,12 @@ func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) {
5864
if !valid {
5965
pc.rwMutex.Lock()
6066
defer pc.rwMutex.Unlock()
67+
68+
pc.tryEvictExpired()
69+
if len(pc.cache) >= pc.cap {
70+
pc.evictRandomEntry()
71+
}
72+
6173
// Make sure someone else didn't generate this entry while we were
6274
// waiting for the write lock
6375
if entry, valid = pc.getEntryUnlocked(pid); !valid {
@@ -68,3 +80,27 @@ func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) {
6880
}
6981
return entry.metadata, entry.err
7082
}
83+
84+
// tryEvictExpired implements a random sampling expired element cache
85+
// eviction policy.
86+
func (pc *processCache) tryEvictExpired() {
87+
now := time.Now()
88+
n := 0
89+
for pid, entry := range pc.cache {
90+
if n >= pc.effort {
91+
return
92+
}
93+
if now.After(entry.expiration) {
94+
delete(pc.cache, pid)
95+
}
96+
n++
97+
}
98+
}
99+
100+
// evictRandomEntry implements a random cache eviction policy.
101+
func (pc *processCache) evictRandomEntry() {
102+
for pid := range pc.cache {
103+
delete(pc.cache, pid)
104+
return
105+
}
106+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package add_process_metadata
19+
20+
import (
21+
"math/rand"
22+
"testing"
23+
"time"
24+
)
25+
26+
var cacheEvictionTests = []struct {
27+
name string
28+
29+
expire time.Duration
30+
cap, effort int
31+
32+
iters int
33+
maxPID int
34+
delay time.Duration
35+
}{
36+
{
37+
name: "small sparse",
38+
expire: time.Millisecond,
39+
cap: 100,
40+
effort: 5,
41+
iters: 1000,
42+
maxPID: 100000,
43+
delay: 2 * time.Millisecond,
44+
},
45+
{
46+
name: "small dense",
47+
expire: time.Millisecond,
48+
cap: 100,
49+
effort: 5,
50+
iters: 1000,
51+
maxPID: 10,
52+
delay: 2 * time.Millisecond,
53+
},
54+
{
55+
name: "large sparse",
56+
expire: time.Millisecond,
57+
cap: 100,
58+
effort: 5,
59+
iters: 10000,
60+
maxPID: 100000,
61+
delay: time.Millisecond / 10,
62+
},
63+
{
64+
name: "large dense",
65+
expire: time.Millisecond,
66+
cap: 100,
67+
effort: 5,
68+
iters: 10000,
69+
maxPID: 10,
70+
delay: time.Millisecond / 10,
71+
},
72+
{
73+
name: "huge sparse",
74+
expire: time.Millisecond,
75+
cap: 100,
76+
effort: 5,
77+
iters: 1000,
78+
maxPID: 100000,
79+
delay: time.Millisecond / 100,
80+
},
81+
{
82+
name: "huge dense",
83+
expire: time.Millisecond,
84+
cap: 100,
85+
effort: 5,
86+
iters: 1000,
87+
maxPID: 10,
88+
delay: time.Millisecond / 100,
89+
},
90+
}
91+
92+
func TestCacheEviction(t *testing.T) {
93+
for _, test := range cacheEvictionTests {
94+
rnd := rand.New(rand.NewSource(1))
95+
c := newProcessCache(test.expire, test.cap, test.effort, emptyProvider{})
96+
97+
for i := 0; i < test.iters; i++ {
98+
pid := rnd.Intn(test.maxPID)
99+
c.GetProcessMetadata(pid)
100+
if len(c.cache) > test.cap {
101+
t.Errorf("cache overflow for %s after %d iterations", test.name, i)
102+
break
103+
}
104+
time.Sleep(test.delay)
105+
}
106+
}
107+
}
108+
109+
type emptyProvider struct{}
110+
111+
func (emptyProvider) GetProcessMetadata(pid int) (*processMetadata, error) {
112+
return &processMetadata{pid: pid}, nil
113+
}

0 commit comments

Comments
 (0)