Skip to content

Commit f6a7f39

Browse files
github-actions[bot]Copilotbelimawr
authored
filebeat: stabilize kafka input integration topic readiness (#49228)
Use a shared helper that creates and waits for a test topic to become writable before producing events. Apply it across kafka integration tests, including TestTest, to reduce leader-election race flakes. GenAI-Assisted: Yes Human-Reviewed: Yes Tool: GitHub Copilot CLI, Model: GPT-5.3 Codex --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
1 parent cac2dda commit f6a7f39

1 file changed

Lines changed: 12 additions & 11 deletions

File tree

filebeat/input/kafka/kafka_integration_test.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func recordHeader(key, value string) sarama.RecordHeader {
6767
}
6868

6969
func TestInput(t *testing.T) {
70-
testTopic := createTestTopicName()
70+
testTopic := createReadyTestTopic(t)
7171
groupID := "filebeat"
7272

7373
// Send test messages to the topic for the input to read.
@@ -155,7 +155,7 @@ func TestInput(t *testing.T) {
155155
}
156156

157157
func TestInputWithMultipleEvents(t *testing.T) {
158-
testTopic := createTestTopicName()
158+
testTopic := createReadyTestTopic(t)
159159

160160
// Send test messages to the topic for the input to read.
161161
message := testMessage{
@@ -211,7 +211,7 @@ func TestInputWithMultipleEvents(t *testing.T) {
211211
}
212212

213213
func TestInputWithJsonPayload(t *testing.T) {
214-
testTopic := createTestTopicName()
214+
testTopic := createReadyTestTopic(t)
215215

216216
// Send test message to the topic for the input to read.
217217
message := testMessage{
@@ -273,7 +273,7 @@ func TestInputWithJsonPayload(t *testing.T) {
273273
}
274274

275275
func TestInputWithJsonPayloadAndMultipleEvents(t *testing.T) {
276-
testTopic := createTestTopicName()
276+
testTopic := createReadyTestTopic(t)
277277

278278
// Send test messages to the topic for the input to read.
279279
message := testMessage{
@@ -360,13 +360,9 @@ func TestSASLAuthentication(t *testing.T) {
360360

361361
for _, tc := range testCases {
362362
t.Run(tc.name, func(t *testing.T) {
363-
testTopic := createTestTopicName()
363+
testTopic := createReadyTestTopic(t)
364364
groupID := "filebeat"
365365

366-
// Topic auto-creation is asynchronous; wait for leaders to avoid transient
367-
// "no leader for this partition" write failures in CI.
368-
ensureKafkaTopicReadyForWrites(t, testTopic)
369-
370366
// Send test messages to the topic for the input to read.
371367
messages := []testMessage{
372368
{message: "testing"},
@@ -452,7 +448,7 @@ func TestSASLAuthentication(t *testing.T) {
452448
}
453449

454450
func TestTest(t *testing.T) {
455-
testTopic := createTestTopicName()
451+
testTopic := createReadyTestTopic(t)
456452

457453
// Send test messages to the topic for the input to read.
458454
message := testMessage{
@@ -483,8 +479,13 @@ func TestTest(t *testing.T) {
483479
}
484480
}
485481

486-
func createTestTopicName() string {
482+
func createReadyTestTopic(t *testing.T) string {
483+
t.Helper()
484+
487485
testTopic := fmt.Sprintf("Filebeat-TestInput-%d", rand.Int())
486+
// Topic auto-creation is asynchronous; explicitly wait for leaders to avoid
487+
// transient "no leader for this partition" write failures in CI.
488+
ensureKafkaTopicReadyForWrites(t, testTopic)
488489
return testTopic
489490
}
490491

0 commit comments

Comments
 (0)