Conversation
| timeout := time.After(5 * time.Second) | ||
| for { | ||
| select { | ||
| case <-time.After(time.Second): | ||
| case <-timeout: | ||
| t.Fatalf("timed out waiting for %v message length %d (last value: %d)", msgType, want, lastValue) | ||
| default: |
There was a problem hiding this comment.
There was a bug here that caused the test to hang indefinitely; moving the timeout up solves it
| tick, done := writers.NewTicker(w.batchTimeout) | ||
| defer done() | ||
| ticker := writers.NewTicker(w.batchTimeout) | ||
| defer ticker.Stop() |
There was a problem hiding this comment.
ticker.Chan() should also be closed in the defer. Could maybe add it in a Stop() wrapper since we only stop on defer and there would be no erroneous tick read in the select:
Stop turns off a ticker. After Stop, no more ticks will be sent.
Stop does not close the channel, to prevent a concurrent goroutine
reading from the channel from seeing an erroneous "tick".
There was a problem hiding this comment.
it doesn't really matter IMO (as the open chan still gets GC'ed)
| } | ||
| } | ||
|
|
||
| type mockTicker struct { |
There was a problem hiding this comment.
Maybe this can be moved to writers package next to ticker.go?
There was a problem hiding this comment.
Maybe, but right now it's only used here and it would then need to be exposed as a public struct in writers
🤖 I have created a release *beep* *boop* --- ## [4.8.0-rc1](v4.7.1-rc1...v4.8.0-rc1) (2023-07-05) ### Features * **transformers:** Add `Apply` to apply extra transformations ([#1069](#1069)) ([a40598e](a40598e)) ### Bug Fixes * Deterministic ordering for records returned by readAll in tests ([#1072](#1072)) ([cf7510f](cf7510f)) * Handle null-related test options ([#1074](#1074)) ([88f08ee](88f08ee)) * **naming:** Rename `SyncMessages.InsertMessage()` to `SyncMessages.GetInserts()` ([#1070](#1070)) ([ab9e768](ab9e768)) * Reset timers on flush ([#1076](#1076)) ([767327f](767327f)) * Reverse order of records in memdb ([#1075](#1075)) ([8356590](8356590)) * **scalar:** Test `AppendTime` on TimestampBuilder ([#1068](#1068)) ([888c9ee](888c9ee)) * **testdata:** Exclude only the correct type ([#1067](#1067)) ([1c72fb2](1c72fb2)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This updates the ticker logic in the batch writers to reset the ticker when a flush happens. This is better, as it still guarantees that a message won't be delayed by more than batch_timeout, but we don't risk flushing a very small batch because we must flush at regular intervals either.
The choice of resetting after the flush is deliberate: it means that the maximum amount of time between flushes is:
otherwise we could do:
but if a flush were to then longer than the batch timeout, we can end up in a cycle of flushing again immediately after the previous flush finishes.