Skip to content

Commit 44c5c32

Browse files
committed
libbeat/processors/add_process_metadata: implement a process cache eviction policy
The eviction policy implemented here guarantees a hard limit to the number of entries in the cache and provides a downward force on the number of entries over time with an exponential decay on the fraction of entries that are older than the cache expiration duration.
1 parent 2874b62 commit 44c5c32

4 files changed

Lines changed: 159 additions & 9 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
9292
- system/socket: Fix bugs leading to wrong process being attributed to flows. {pull}29166[29166] {issue}17165[17165]
9393
- system/socket: Fix process name and arg truncation for long names, paths and args lists. {issue}24667[24667] {pull}29410[29410]
9494
- system/socket: Fix startup errors on newer 5.x kernels due to missing _do_fork function. {issue}29607[29607] {pull}29744[29744]
95+
- libbeat/processors/add_process_metadata: Fix memory leak in process cache. {issue}24890[24890] {pull}29717[29717]
9596

9697
*Filebeat*
9798

libbeat/processors/add_process_metadata/add_process_metadata.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ import (
3636
)
3737

3838
const (
39-
processorName = "add_process_metadata"
40-
cacheExpiration = time.Second * 30
41-
containerIDMapping = "container.id"
39+
processorName = "add_process_metadata"
40+
cacheExpiration = time.Second * 30
41+
cacheCapacity = 32 << 10 // maximum number of process cache entries.
42+
cacheEvictionEffort = 10 // number of entries to sample for expiry eviction.
43+
containerIDMapping = "container.id"
4244
)
4345

4446
var (
@@ -49,7 +51,7 @@ var (
4951
// ErrNoProcess is returned when metadata for a process can't be collected.
5052
ErrNoProcess = errors.New("process not found")
5153

52-
procCache = newProcessCache(cacheExpiration, gosysinfoProvider{})
54+
procCache = newProcessCache(cacheExpiration, cacheCapacity, cacheEvictionEffort, gosysinfoProvider{})
5355

5456
processCgroupPaths = cgroup.ProcessCgroupPaths
5557

@@ -112,7 +114,6 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
112114
}
113115

114116
mappings, err := config.getMappings()
115-
116117
if err != nil {
117118
return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName)
118119
}
@@ -139,7 +140,6 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
139140
} else {
140141
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
141142
}
142-
143143
}
144144

145145
if withCache {

libbeat/processors/add_process_metadata/cache.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,22 @@ type processCacheEntry struct {
2929
}
3030

3131
type processCache struct {
32-
cache map[int]processCacheEntry
3332
provider processMetadataProvider
3433
expiration time.Duration
35-
rwMutex sync.RWMutex
34+
35+
cap int // cap is the maximum number of elements the cache will hold.
36+
effort int // effort is the number of entries to examine during expired element eviction.
37+
38+
rwMutex sync.RWMutex // rwMutex protects the cache map.
39+
cache map[int]processCacheEntry
3640
}
3741

38-
func newProcessCache(expiration time.Duration, provider processMetadataProvider) processCache {
42+
func newProcessCache(expiration time.Duration, cap, effort int, provider processMetadataProvider) processCache {
3943
return processCache{
4044
cache: make(map[int]processCacheEntry),
4145
expiration: expiration,
46+
cap: cap,
47+
effort: effort,
4248
provider: provider,
4349
}
4450
}
@@ -58,6 +64,12 @@ func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) {
5864
if !valid {
5965
pc.rwMutex.Lock()
6066
defer pc.rwMutex.Unlock()
67+
68+
pc.tryEvictExpired()
69+
if len(pc.cache) >= pc.cap {
70+
pc.evictRandomEntry()
71+
}
72+
6173
// Make sure someone else didn't generate this entry while we were
6274
// waiting for the write lock
6375
if entry, valid = pc.getEntryUnlocked(pid); !valid {
@@ -68,3 +80,27 @@ func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) {
6880
}
6981
return entry.metadata, entry.err
7082
}
83+
84+
// tryEvictExpired implements a random sampling expired element cache
85+
// eviction policy.
86+
func (pc *processCache) tryEvictExpired() {
87+
now := time.Now()
88+
n := 0
89+
for pid, entry := range pc.cache {
90+
if n >= pc.effort {
91+
return
92+
}
93+
if now.After(entry.expiration) {
94+
delete(pc.cache, pid)
95+
}
96+
n++
97+
}
98+
}
99+
100+
// evictRandomEntry implements a random cache eviction policy.
101+
func (pc *processCache) evictRandomEntry() {
102+
for pid := range pc.cache {
103+
delete(pc.cache, pid)
104+
return
105+
}
106+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package add_process_metadata
19+
20+
import (
21+
"math/rand"
22+
"testing"
23+
"time"
24+
)
25+
26+
var cacheEvictionTests = []struct {
27+
name string
28+
29+
expire time.Duration
30+
cap, effort int
31+
32+
iters int
33+
maxPID int
34+
delay time.Duration
35+
}{
36+
{
37+
name: "small sparse",
38+
expire: time.Millisecond,
39+
cap: 100,
40+
effort: 5,
41+
iters: 1000,
42+
maxPID: 100000,
43+
delay: 2 * time.Millisecond,
44+
},
45+
{
46+
name: "small dense",
47+
expire: time.Millisecond,
48+
cap: 100,
49+
effort: 5,
50+
iters: 1000,
51+
maxPID: 10,
52+
delay: 2 * time.Millisecond,
53+
},
54+
{
55+
name: "large sparse",
56+
expire: time.Millisecond,
57+
cap: 100,
58+
effort: 5,
59+
iters: 10000,
60+
maxPID: 100000,
61+
delay: time.Millisecond / 10,
62+
},
63+
{
64+
name: "large dense",
65+
expire: time.Millisecond,
66+
cap: 100,
67+
effort: 5,
68+
iters: 10000,
69+
maxPID: 10,
70+
delay: time.Millisecond / 10,
71+
},
72+
{
73+
name: "huge sparse",
74+
expire: time.Millisecond,
75+
cap: 100,
76+
effort: 5,
77+
iters: 1000,
78+
maxPID: 100000,
79+
delay: time.Millisecond / 100,
80+
},
81+
{
82+
name: "huge dense",
83+
expire: time.Millisecond,
84+
cap: 100,
85+
effort: 5,
86+
iters: 1000,
87+
maxPID: 10,
88+
delay: time.Millisecond / 100,
89+
},
90+
}
91+
92+
func TestCacheEviction(t *testing.T) {
93+
for _, test := range cacheEvictionTests {
94+
rnd := rand.New(rand.NewSource(1))
95+
c := newProcessCache(test.expire, test.cap, test.effort, emptyProvider{})
96+
97+
for i := 0; i < test.iters; i++ {
98+
pid := rnd.Intn(test.maxPID)
99+
c.GetProcessMetadata(pid)
100+
if len(c.cache) > test.cap {
101+
t.Errorf("cache overflow for %s after %d iterations", test.name, i)
102+
break
103+
}
104+
time.Sleep(test.delay)
105+
}
106+
}
107+
}
108+
109+
type emptyProvider struct{}
110+
111+
func (emptyProvider) GetProcessMetadata(pid int) (*processMetadata, error) {
112+
return &processMetadata{pid: pid}, nil
113+
}

0 commit comments

Comments
 (0)