KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads#9614
Conversation
|
@wcarlson5 @cadonna for initial review |
| ); | ||
| checkCacheMetrics(builtInMetricsVersion); | ||
|
|
||
| verifyFailedStreamThreadsSensor(0.0); |
There was a problem hiding this comment.
We also need to verify that the metric works when there is a failed stream thread. Options are (1) to create a custom processor now and (IIRC) run the test suite twice, once with failing stream threads and once without to confirm that the metric works. I'm not sure if the custom processor will let us just fail one stream thread right before closing the app. Or (2) wait until add/remove stream threads is implemented and remove threads and test the metric after removing some threads before closing the app. WDYT?
There was a problem hiding this comment.
I think you should try using the custom processor. You can find an example in StreamsUncaughtExceptionHandlerIntegrationTest.java
There was a problem hiding this comment.
I would put the test whether the metric is recorded correctly in StreamThreadTest. An example for such a test is shouldLogAndRecordSkippedRecordsForInvalidTimestamps(). I do not think an integration test is needed. The test regarding the existence of the metric, i.e., checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1); should stay here.
There was a problem hiding this comment.
After looking at both test classes, I think it actually might make the most sense to put the test for this metric in StreamsUncaughtExceptionHandlerIntegrationTest, since the metric is so closely aligned with the exception handler anyways and the setup works nicely with what we're trying to test with the metric. From the size + complexity of the other test classes, I think creating an overloaded processor for one test out of 20+ tests seems tricky.
| log.info("State transition from {} to {}", oldState, newState); | ||
| if (newState == State.DEAD) { | ||
| failedStreamThreadSensor.record(); | ||
| } |
There was a problem hiding this comment.
Not every dead stream thread is a failed stream thread. You should record this metric where the uncaught exception handler is called because there we now that a stream thread died unexpectedly.
There was a problem hiding this comment.
Would that just be in run() of the GlobalStreamThread then?
There was a problem hiding this comment.
No, that would be in StreamThread#runLoop().
| ); | ||
| checkCacheMetrics(builtInMetricsVersion); | ||
|
|
||
| verifyFailedStreamThreadsSensor(0.0); |
There was a problem hiding this comment.
I would put the test whether the metric is recorded correctly in StreamThreadTest. An example for such a test is shouldLogAndRecordSkippedRecordsForInvalidTimestamps(). I do not think an integration test is needed. The test regarding the existence of the metric, i.e., checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1); should stay here.
65aa29a to
f6db8cd
Compare
cadonna
left a comment
There was a problem hiding this comment.
LGTM!
Call for committer review and merge: @ableegoldman @mjsax @vvcephei @guozhangwang @abbccdda
| final Sensor... parents) { | ||
| synchronized (clientLevelSensors) { | ||
| final String fullSensorName = CLIENT_LEVEL_GROUP + SENSOR_NAME_DELIMITER + sensorName; | ||
| final Sensor sensor = metrics.getSensor(fullSensorName); |
There was a problem hiding this comment.
Should we rewrite this the same way threadLevelSensor is written (ie, using orElseGet) for consistency?
There was a problem hiding this comment.
I am fine either way, too, but I prefer consistency... So should we rewrite the other method as a side cleanup?
There was a problem hiding this comment.
I am fine with consistency and clean-up, but I would like to have the clean-up in a separate PR.
There was a problem hiding this comment.
I changed it back for consistency and will open up a fix PR to update both of them to the new syntax
| metrics.removeSensor(sensorKeys.getValues().get(0)); | ||
| metrics.removeSensor(sensorKeys.getValues().get(1)); | ||
| expect(metrics.removeMetric(metricName1)).andReturn(mock(KafkaMetric.class)); | ||
| expect(metrics.removeMetric(metricName2)).andReturn(mock(KafkaMetric.class)); |
There was a problem hiding this comment.
Why did we change this from andStubReturn(null) to andReturn(mock(KafkaMetric.class))?
There was a problem hiding this comment.
Must've been an accidental change when trying to get the test to work. shouldRemoveStateStoreLevelSensors uses andReturn(mock(KafkaMetric.class)) so that's where it came from, but this test works with andStubReturn(null) so I changed it back to that
|
|
||
| private void setupRemoveSensorsTest(final Metrics metrics, | ||
| final String level, | ||
| final RecordingLevel recordingLevel) { |
There was a problem hiding this comment.
This wasn't being used so I went ahead and took it out
|
Retest this please |
mjsax
left a comment
There was a problem hiding this comment.
LGTM. Will merge after the build passed.
Per KIP-663, adding a metric to record the failed streams threads over the life of a client.
Committer Checklist (excluded from commit message)