Skip to content

KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads#9614

Merged
mjsax merged 8 commits into
apache:trunkfrom
lct45:thread_metrics
Dec 4, 2020
Merged

KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads#9614
mjsax merged 8 commits into
apache:trunkfrom
lct45:thread_metrics

Conversation

@lct45

@lct45 lct45 commented Nov 18, 2020

Copy link
Copy Markdown

Per KIP-663, adding a metric to record the failed streams threads over the life of a client.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lct45

lct45 commented Nov 18, 2020

Copy link
Copy Markdown
Author

@wcarlson5 @cadonna for initial review

);
checkCacheMetrics(builtInMetricsVersion);

verifyFailedStreamThreadsSensor(0.0);

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to verify that the metric works when there is a failed stream thread. Options are (1) to create a custom processor now and (IIRC) run the test suite twice, once with failing stream threads and once without to confirm that the metric works. I'm not sure if the custom processor will let us just fail one stream thread right before closing the app. Or (2) wait until add/remove stream threads is implemented and remove threads and test the metric after removing some threads before closing the app. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should try using the custom processor. You can find an example in StreamsUncaughtExceptionHandlerIntegrationTest.java

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put the test whether the metric is recorded correctly in StreamThreadTest. An example for such a test is shouldLogAndRecordSkippedRecordsForInvalidTimestamps(). I do not think an integration test is needed. The test regarding the existence of the metric, i.e., checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1); should stay here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at both test classes, I think it actually might make the most sense to put the test for this metric in StreamsUncaughtExceptionHandlerIntegrationTest, since the metric is so closely aligned with the exception handler anyways and the setup works nicely with what we're trying to test with the metric. From the size + complexity of the other test classes, I think creating an overloaded processor for one test out of 20+ tests seems tricky.

@wcarlson5 wcarlson5 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it is necessary but you might want to add an integration test that kills a few threads and check the metrics. You would need to set the old handler to get a single thread to die as of now

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lct45 Thank you for the PR!

Here my feedback.

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
log.info("State transition from {} to {}", oldState, newState);
if (newState == State.DEAD) {
failedStreamThreadSensor.record();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not every dead stream thread is a failed stream thread. You should record this metric where the uncaught exception handler is called because there we now that a stream thread died unexpectedly.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that just be in run() of the GlobalStreamThread then?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that would be in StreamThread#runLoop().

);
checkCacheMetrics(builtInMetricsVersion);

verifyFailedStreamThreadsSensor(0.0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put the test whether the metric is recorded correctly in StreamThreadTest. An example for such a test is shouldLogAndRecordSkippedRecordsForInvalidTimestamps(). I do not think an integration test is needed. The test regarding the existence of the metric, i.e., checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1); should stay here.

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lct45 Thank you for the updates!

I have rather minor comments.

@cadonna cadonna left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Call for committer review and merge: @ableegoldman @mjsax @vvcephei @guozhangwang @abbccdda

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits

final Sensor... parents) {
synchronized (clientLevelSensors) {
final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.getSensor(fullSensorName);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rewrite this the same way threadLevelSensor is written (ie, using orElseGet) for consistency?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I requested this. See my comment #9614 (comment)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good either way (:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way, too, but I prefer consistency... So should we rewrite the other method as a side cleanup?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with consistency and clean-up, but I would like to have the clean-up in a separate PR.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it back for consistency and will open up a fix PR to update both of them to the new syntax

metrics.removeSensor(sensorKeys.getValues().get(0));
metrics.removeSensor(sensorKeys.getValues().get(1));
expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class));
expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change this from andStubReturn(null) to andReturn(mock(KafkaMetric.class))?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must've been an accidental change when trying to get the test to work. shouldRemoveStateStoreLevelSensors uses andReturn(mock(KafkaMetric.class)) so that's where it came from, but this test works with andStubReturn(null) so I changed it back to that

@mjsax mjsax added the streams label Dec 1, 2020

private void setupRemoveSensorsTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't being used so I went ahead and took it out

@mjsax

mjsax commented Dec 3, 2020

Copy link
Copy Markdown
Member

Retest this please

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Will merge after the build passed.

@mjsax mjsax merged commit 4cc6d20 into apache:trunk Dec 4, 2020
@mjsax mjsax added the kip Requires or implements a KIP label Jan 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants