Skip to content

Commit bac5ced

Browse files
authored
feat: Update Stream to accept a bufSize for tar CopyBuffer window (#26964) (#27077)
1 parent aa1775b commit bac5ced

File tree

5 files changed

+101
-18
lines changed

5 files changed

+101
-18
lines changed

etc/config.sample.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,12 @@
167167
# increase in cache size may lead to an increase in heap usage.
168168
series-id-set-cache-size = 100
169169

170+
# The size of tar buffer window size in bytes while running tar
171+
# streaming operations such as renaming and copying tar files during backups.
172+
# The default value is 1MB. This should only change if backups are having performance issues
173+
# and you understand that this value is a heuristic that may need to be tweaked.
174+
# tar-stream-buffer-size = 1048576
175+
170176
###
171177
### [coordinator]
172178
###

pkg/tar/stream.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
// possibly writing each file to a tar writer stream. By default StreamFile is used, which will result in all files
1717
// being written. A custom writeFunc can be passed so that each file may be written, modified+written, or skipped
1818
// depending on the custom logic.
19-
func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error) error {
19+
func Stream(w io.Writer, dir, relativePath string, bufSize uint64, writeFunc func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error) error {
2020
tw := tar.NewWriter(w)
2121
defer tw.Close()
2222

@@ -41,30 +41,36 @@ func Stream(w io.Writer, dir, relativePath string, writeFunc func(f os.FileInfo,
4141
return err
4242
}
4343

44-
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw)
44+
return writeFunc(f, filepath.Join(relativePath, subDir), path, tw, bufSize)
4545
})
4646
}
4747

4848
// Generates a filtering function for Stream that checks an incoming file, and only writes the file to the stream if
4949
// its mod time is later than since. Example: to tar only files newer than a certain datetime, use
5050
// tar.Stream(w, dir, relativePath, SinceFilterTarFile(datetime))
51-
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
52-
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
51+
func SinceFilterTarFile(since time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
52+
return func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
5353
if f.ModTime().After(since) {
54-
return StreamFile(f, shardRelativePath, fullPath, tw)
54+
return StreamFile(f, shardRelativePath, fullPath, tw, bufSize)
5555
}
5656
return nil
5757
}
5858
}
5959

6060
// stream a single file to tw, extending the header name using the shardRelativePath
61-
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
62-
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw)
61+
func StreamFile(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
62+
return StreamRenameFile(f, f.Name(), shardRelativePath, fullPath, tw, bufSize)
6363
}
6464

6565
// / Stream a single file to tw, using tarHeaderFileName instead of the actual filename
6666
// e.g., when we want to write a *.tmp file using the original file's non-tmp name.
67-
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer) error {
67+
func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
68+
// We NEVER want this to be 0 (or less). This is just a safety harness.
69+
// influxd will panic if we pass a 0 size buffer into CopyBuffer.
70+
if bufSize <= 0 {
71+
bufSize = 1024 * 1024 // 1MB
72+
}
73+
buf := make([]byte, bufSize)
6874
h, err := tar.FileInfoHeader(f, f.Name())
6975
if err != nil {
7076
return err
@@ -86,9 +92,14 @@ func StreamRenameFile(f os.FileInfo, tarHeaderFileName, relativePath, fullPath s
8692

8793
defer fr.Close()
8894

89-
_, err = io.CopyN(tw, fr, h.Size)
95+
bytesWritten, err := io.CopyBuffer(tw, fr, buf)
96+
if err != nil {
97+
return err
98+
} else if bytesWritten != h.Size {
99+
return fmt.Errorf("StreamRenameFile: Error while copying buffer, expected %d bytes but wrote %d.", h.Size, bytesWritten)
100+
}
90101

91-
return err
102+
return nil
92103
}
93104

94105
// Restore reads a tar archive from r and extracts all of its files into dir,

pkg/tar/stream_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package tar_test
2+
3+
import (
4+
"archive/tar"
5+
"bytes"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
pkgtar "github.com/influxdata/influxdb/pkg/tar"
14+
)
15+
16+
func TestStreamRenameWithBufSize(t *testing.T) {
17+
dir := t.TempDir()
18+
testFile := filepath.Join(dir, "test2.txt.tar")
19+
testData := []byte("test data for buffer size")
20+
21+
testFileRename := "test.txt.tar"
22+
23+
require.NoError(t, os.WriteFile(testFile, testData, 0644))
24+
25+
var buf bytes.Buffer
26+
bufSize := uint64(1024 * 1024)
27+
28+
tw := tar.NewWriter(&buf)
29+
30+
f, err := os.Open(testFile)
31+
require.NoError(t, err, "error opening testFile")
32+
info, err := f.Stat()
33+
require.NoError(t, err, "error stat testFile")
34+
35+
require.NoError(t, pkgtar.StreamRenameFile(info, testFileRename, "", testFile, tw, bufSize))
36+
require.NoError(t, tw.Close())
37+
38+
tr := tar.NewReader(&buf)
39+
hdr, err := tr.Next()
40+
require.NoError(t, err)
41+
require.Equal(t, testFileRename, hdr.Name)
42+
43+
content, err := io.ReadAll(tr)
44+
require.NoError(t, err)
45+
require.Equal(t, testData, content)
46+
}

tsdb/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ const (
8484

8585
// MaxTSMFileSize is the maximum size of TSM files.
8686
MaxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB
87+
88+
// DefaultTarStreamBufferSize is the default window size to use during tar file streaming.
89+
// This impacts backups and should only be modified if backups are having performance issues.
90+
DefaultTarStreamBufferSize = uint64(1024 * 1024) // 1MB
8791
)
8892

8993
var SingleGenerationReasonText string = SingleGenerationReason()
@@ -186,6 +190,11 @@ type Config struct {
186190
// been found to be problematic in some cases. It may help users who have
187191
// slow disks.
188192
TSMWillNeed bool `toml:"tsm-use-madv-willneed"`
193+
194+
// TarStreamBufferSize is the size of tar buffer window size in bytes while running tar
195+
// streaming operations such as renaming and copying tar files during backups.
196+
// The default value is 1MB. This should only change if backups are having performance issues.
197+
TarStreamBufferSize uint64 `toml:"tar-stream-buffer-size"`
189198
}
190199

191200
// NewConfig returns the default configuration for tsdb.
@@ -217,6 +226,8 @@ func NewConfig() Config {
217226

218227
TraceLoggingEnabled: false,
219228
TSMWillNeed: false,
229+
230+
TarStreamBufferSize: DefaultTarStreamBufferSize,
220231
}
221232
}
222233

@@ -266,6 +277,10 @@ func (c *Config) Validate() error {
266277
return fmt.Errorf("unrecognized index %s", c.Index)
267278
}
268279

280+
if c.TarStreamBufferSize <= 0 {
281+
return fmt.Errorf("tar-stream-buffer-size cannot be %d, value must be greater than zero and non-negative", c.TarStreamBufferSize)
282+
}
283+
269284
return nil
270285
}
271286

tsdb/engine/tsm1/engine.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ type Engine struct {
226226

227227
// muDigest ensures only one goroutine can generate a digest at a time.
228228
muDigest sync.RWMutex
229+
230+
// TarStreamBufferSize is the size used for our buffer windows within the tar package.
231+
TarStreamBufferSize uint64
229232
}
230233

231234
// NewEngine returns a new instance of Engine.
@@ -285,6 +288,8 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
285288
optimizedCompactionLimiter: opt.OptimizedCompactionLimiter,
286289
Scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
287290
seriesIDSets: opt.SeriesIDSets,
291+
292+
TarStreamBufferSize: opt.Config.TarStreamBufferSize,
288293
}
289294

290295
// Feature flag to enable per-series type checking, by default this is off and
@@ -993,13 +998,13 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
993998
}
994999
}()
9951000

996-
return intar.Stream(w, path, basePath, intar.SinceFilterTarFile(since))
1001+
return intar.Stream(w, path, basePath, e.TarStreamBufferSize, intar.SinceFilterTarFile(since))
9971002
}
9981003

999-
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
1000-
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer) error {
1004+
func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
1005+
return func(fi os.FileInfo, shardRelativePath, fullPath string, tw *tar.Writer, bufSize uint64) error {
10011006
if !strings.HasSuffix(fi.Name(), ".tsm") {
1002-
return intar.StreamFile(fi, shardRelativePath, fullPath, tw)
1007+
return intar.StreamFile(fi, shardRelativePath, fullPath, tw, e.TarStreamBufferSize)
10031008
}
10041009

10051010
f, err := os.Open(fullPath)
@@ -1013,7 +1018,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
10131018

10141019
// Grab the tombstone file if one exists.
10151020
if ts := r.TombstoneStats(); ts.TombstoneExists {
1016-
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw)
1021+
return intar.StreamFile(fi, shardRelativePath, filepath.Base(ts.Path), tw, e.TarStreamBufferSize)
10171022
}
10181023

10191024
min, max := r.TimeRange()
@@ -1041,7 +1046,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
10411046

10421047
// the TSM file is 100% inside the range, so we can just write it without scanning each block
10431048
if min >= start.UnixNano() && max <= end.UnixNano() {
1044-
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw); err != nil {
1049+
if err := intar.StreamFile(fi, shardRelativePath, fullPath, tw, bufSize); err != nil {
10451050
return err
10461051
}
10471052
}
@@ -1061,7 +1066,7 @@ func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.
10611066
}
10621067
}()
10631068

1064-
return intar.Stream(w, path, basePath, e.timeStampFilterTarFile(start, end))
1069+
return intar.Stream(w, path, basePath, e.TarStreamBufferSize, e.timeStampFilterTarFile(start, end))
10651070
}
10661071

10671072
func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativePath, fullPath string, start, end int64, tw *tar.Writer) error {
@@ -1116,7 +1121,7 @@ func (e *Engine) filterFileToBackup(r *TSMReader, fi os.FileInfo, shardRelativeP
11161121
return err
11171122
}
11181123

1119-
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw)
1124+
return intar.StreamRenameFile(tmpFi, fi.Name(), shardRelativePath, path, tw, e.TarStreamBufferSize)
11201125
}
11211126

11221127
// Restore reads a tar archive generated by Backup().

0 commit comments

Comments
 (0)