WIP: Remote write checkpointing#8918
WIP: Remote write checkpointing#8918cstyan wants to merge 14 commits intoprometheus:mainfrom cstyan:remote-write-checkpointing
Conversation
…te the segment record to a json file Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: njingco <jingco.nicole@gmail.com>
…e, updated the file as well Signed-off-by: njingco <jingco.nicole@gmail.com>
…segment records from multiple endpoints Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: njingco <jingco.nicole@gmail.com>
…tly update the json file every set time, only made queue_manager have an update function for the CheckpointRecord Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: njingco <jingco.nicole@gmail.com>
…reating the checkpoint file, added the context the timed checkpointing go thread Signed-off-by: njingco <jingco.nicole@gmail.com>
Signed-off-by: Callum Styan <callumstyan@gmail.com>
rfratto
left a comment
There was a problem hiding this comment.
(FYI, you probably didn't mean to check in the tsdb/tsdb or documentation/examples/remote_storage/example_write_adapter/server files)
| checkpointDir := filepath.Join(t.tsdbDir, "remote", t.client().Name()) | ||
| err := os.MkdirAll(checkpointDir, 0777) | ||
| if err != nil { | ||
| level.Warn(t.logger).Log("msg", "error creating temporary remote write checkpoint directory", "error", err) |
There was a problem hiding this comment.
Should this return on error? Ditto with WriteFile/Replace failing.
tsdb/wal/wal.go
Outdated
| var ( | ||
| castagnoliTable = crc32.MakeTable(crc32.Castagnoli) | ||
|
|
||
| MaxSegmentNumber = 99999999 |
There was a problem hiding this comment.
Is math.MaxInt32/math.MaxInt64 more appropriate here?
There was a problem hiding this comment.
yep, but these aren't being used anymore so I will remove them
| func (w *Watcher) Run() error { | ||
| // or an error case is hit. Use -1 for startSegment if the watcher should not | ||
| // start tailing samples until after startTime. | ||
| func (w *Watcher) Run(startSegment int) error { |
There was a problem hiding this comment.
Don't segment files start at 1? You might be able to use 0 instead of -1 here.
There was a problem hiding this comment.
Nope, they start at 0
ls data/wal
00000000
| for _, s := range segments { | ||
| if s == segmentNum { | ||
| return true, nil | ||
| } | ||
| } |
There was a problem hiding this comment.
Since the segments are sorted you may be able to do an early return here if s > segmentNum.
| } | ||
| for _, s := range samples { | ||
| w.samplesRead++ | ||
| if s.T > w.startTimestamp { |
There was a problem hiding this comment.
Hmm, doesn't this have the same problem? Even if we start at a newer segment, we're still not going to queue any new samples until after time.Now() has passed.
There was a problem hiding this comment.
It's smelly but at the moment we're only setting the start time if we didn't find/set the starting segment.
if startSegment == -1 {
w.SetStartTime(time.Now())
}
storage/remote/write.go
Outdated
| done chan struct{} | ||
| // Soft shutdown context will prevent new enqueues and deadlocks. | ||
| softShutdown chan struct{} | ||
|
|
||
| // Hard shutdown context is used to terminate outgoing HTTP connections | ||
| // after giving them a chance to terminate. | ||
| hardShutdown context.CancelFunc | ||
| droppedOnHardShutdown atomic.Uint32 |
There was a problem hiding this comment.
Are these being used anywhere? I see (at least) done being closed, but never read. Am I missing something?
There was a problem hiding this comment.
No none of these were being used, don't think done was being closed either?
test. Signed-off-by: Callum Styan <callumstyan@gmail.com>
There was a problem hiding this comment.
I don't love that the checkpoint logic, and things such as the tsdbDir and path, are spread across multiple packages. Would it be possible to keep most of the logic contained within watcher or queue_manager?
My one other thought is to make sure the checkpoint file format is expandable in the future for adding offsets. Do we want more structured data in that file?
tsdb/wal/watcher.go
Outdated
| metrics *WatcherMetrics | ||
| readerMetrics *LiveReaderMetrics | ||
|
|
||
| samplesSent int |
There was a problem hiding this comment.
Am I missing something, or are these unused other than to increment them?
tsdb/wal/watcher.go
Outdated
| // For testing, stop when we hit this segment. | ||
| MaxSegment int | ||
|
|
||
| mutex sync.RWMutex |
tsdb/wal/watcher.go
Outdated
| startSegment := -1 | ||
|
|
||
| // look for a checkpoint file | ||
| fname := filepath.Join(w.tsdbDir, "remote", w.name, "checkpoint") |
There was a problem hiding this comment.
Putting remote into a wal specific file doesn't seem ideal. Could it just be watcher instead? Or specified when creating a Watcher?
Signed-off-by: Callum Styan <callumstyan@gmail.com>
Agent / Remote Write code currently doesn't send the samples from the WAL on startup at all. We only send the new samples so replaying the WAL doesn't give us much. It does let us load series references from the WAL but we save much more by not reading the WAL at all. See: prometheus/prometheus#9848 and prometheus/prometheus#8918 Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
storage/remote to cache these segment offsets to disk. Deprecates prometheus#7710. Deprecates prometheus#8918. Closes prometheus#8809. Signed-off-by: Robert Fratto <robertfratto@gmail.com>
storage/remote to cache these segment offsets to disk. Deprecates prometheus#7710. Deprecates prometheus#8918. Closes prometheus#8809. Signed-off-by: Robert Fratto <robertfratto@gmail.com>
Still really rough, but working, implementation of #8809
cc: @rfratto