KAFKA-7798: Expose embedded clientIds#6107
Conversation
|
|
||
| final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); | ||
| final StreamThread thread = new StreamThread( | ||
| mockTime, |
There was a problem hiding this comment.
I just did a copy-paste of the code and this is intellij auto formatting. Current style seems right to me.
There was a problem hiding this comment.
Thanks @guozhangwang for the patch. Overall looks good to me, but I'm getting some test failures locally on StreamThreadTest.
|
@bbejeck That's due to a recent commit that caused conflicts, pushed another commit to resolve it. |
abbccdda
left a comment
There was a problem hiding this comment.
LGTM overall. Just left some comments
| return threadClientId + "-restore-consumer"; | ||
| } | ||
|
|
||
| public static String getSharedAdminClientId(final String clientId) { |
There was a problem hiding this comment.
Could we have a comment on explaining what shared means in this context?
| return this; | ||
| } | ||
|
|
||
| private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, |
There was a problem hiding this comment.
Could we keep the format of updateThreadMetadata to avoid unnecessary changes on L1221?
There was a problem hiding this comment.
I cannot completely follow since L1221 did not change, could you elaborate?
There was a problem hiding this comment.
I mean, if we keep parameters in one line:
private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, final Map<TaskId, StandbyTask> standbyTasks) then the changes shall be reduced.
There was a problem hiding this comment.
The line is quite long, thus we should reformat it to make the code more readable. I am +1 on this change -- also, I don't think we should optimize for fewer line changes :)
| // package-private for testing only | ||
| StreamThread updateThreadMetadata(final String adminClientId) { | ||
|
|
||
| threadMetadata = new ThreadMetadata( |
There was a problem hiding this comment.
Since this constructor is only called once, could we just reuse the constructor augmented:
public ThreadMetadata(final String threadName,
final String threadState,
final String mainConsumerClientId,
final String restoreConsumerClientId,
final Set<String> producerClientIds,
final String adminClientId,
final Set<TaskMetadata> activeTasks,
final Set<TaskMetadata> standbyTasks)
and set active & standby tasks with empty set? I figure this could reduce code redundancy.
| if (globalStreamThread != null) { | ||
| result.putAll(globalStreamThread.consumerMetrics()); | ||
| } | ||
| // own streams metrics |
There was a problem hiding this comment.
Own could sound like a verb. Maybe self streams metrics will be better?
…bed-client-context
|
@mjsax could you give another look on this? |
dguy
left a comment
There was a problem hiding this comment.
Thanks @guozhangwang left a few minor comments
| @Deprecated | ||
| public Map<String, Object> getConsumerConfigs(final String groupId, | ||
| final String clientId) { | ||
| public Map<String, Object> getConsumerConfigs(final String groupId, final String clientId) { |
There was a problem hiding this comment.
don't we usually split params over multiple lines?
There was a problem hiding this comment.
It's a subjective question. I personally prefer to have one parameter per line. :)
There was a problem hiding this comment.
Yeah I personally prefer only doing that for 3+ parameters and they cannot fit in 150 chars which is what a normal laptop screen would fit.
| @SuppressWarnings("WeakerAccess") | ||
| public Map<String, Object> getMainConsumerConfigs(final String groupId, | ||
| final String clientId) { | ||
| public Map<String, Object> getMainConsumerConfigs(final String groupId, final String clientId) { |
| return threadClientId + "-producer"; | ||
| } | ||
|
|
||
| private static String getThreadConsumerClientId(final String threadClientId) { |
There was a problem hiding this comment.
nit: I guess this and the next one could be getConsumerClientId and getRestoreConsumerId as there is only ever one of each?
| final String clientId = "client"; | ||
| final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId); | ||
| assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer"); | ||
| assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId); |
There was a problem hiding this comment.
shouldn't the arguments be the other way around? i.e., assertEquals(expected, actual) ?
better still use assertThat() :-P
| final String clientId = "client"; | ||
| final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId); | ||
| assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer"); | ||
| assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId); |
There was a problem hiding this comment.
ditto here. I know it was wrong before, but would be good to correct.
| } | ||
|
|
||
| // add client id with stream client id prefix, and group id | ||
| // this is a hack to work around StreamsConfig constructor inside StreamPartitionAssignor to avoid casting |
There was a problem hiding this comment.
Don't understand this comment? Can you elaborate?
There was a problem hiding this comment.
Inside StreamPartitionAssignor we wrap the passed-in props map into a InternalStreamsConfig in order to read its config values without casting. But InternalStreamsConfig extends StreamsConfig requires application id as a must-have, so we need to pass in this value to consumer config prop values as well.
There was a problem hiding this comment.
Ack. Makes sense.
However, this is not really clear from the comment. Maybe rephrase to avoid confusion?
| logContext, | ||
| assignmentErrorCode); | ||
| assignmentErrorCode) | ||
| .updateThreadMetadata(getSharedAdminClientId(clientId)); |
There was a problem hiding this comment.
Why do we not pass the admin client-id into the constructor?
There was a problem hiding this comment.
I've thought about passing in the client-id, it is a bit overkill in addition to the thread-client-id just to set the shared admin client inside the constructor. I felt the current way is less intrusive but if you feel strong the other way we can discuss.
There was a problem hiding this comment.
Because the shared admit client id does never change, it seems weird to have a setter that allow to update is at any time. It seems cleaner to pass it in the constructor and make it immutable this way.
| return this; | ||
| } | ||
|
|
||
| private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, |
There was a problem hiding this comment.
The line is quite long, thus we should reformat it to make the code more readable. I am +1 on this change -- also, I don't think we should optimize for fewer line changes :)
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(threadName, threadState, activeTasks, standbyTasks); | ||
| return Objects.hash(threadName, threadState, activeTasks, standbyTasks, mainConsumerClientId, restoreConsumerClientId, producerClientIds, adminClientId); |
There was a problem hiding this comment.
Nit: Can we break this long line into one parameter per line?
| public void before() { | ||
| props = new Properties(); | ||
| props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); | ||
| props.put(StreamsConfig.CLIENT_ID_CONFIG, "appId"); |
There was a problem hiding this comment.
Nit: why reuse appId -- if there is a bug, and we set APPLICATION_ID_CONFIG by mistake, it would not be detected. Maybe better to set a different id.
…bed-client-context
…bed-client-context
| result.putAll(thread.consumerMetrics()); | ||
| // admin client is shared, so we can actually move it | ||
| // to result.putAll(adminClient.metrics()). | ||
| // we did it intentionally just for flexibility. |
There was a problem hiding this comment.
why not go ahead and do this now?
|
@guozhangwang I am merging this to cut the release branch. Please address the left over comment in a follow up PR. Thanks. |
* AK/trunk: fix typo (apache#5150) MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887) KAFKA-7766: Fail fast PR builds (apache#6059) KAFKA-7798: Expose embedded clientIds (apache#6107) KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163) KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377) MINOR: Fix some field definitions for ListOffsetReponse (apache#6214) KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203) KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022) MINOR: fix checkstyle suppressions for generated RPC code to work on Windows KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188) KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161) KAFKA-3522: Add RocksDBTimestampedStore (apache#6149) KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
Reviewers: Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>, Boyang Chen <bchen11@outlook.com>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
Add consumer / restoreConsumer / producer(s) / admin client ids via ThreadMetadata; for producerIds, if EOS is turned on add the list of task-producer-ids, otherwise it is a singleton of thread-producer-id.
Consolidate the logic of generating clientIds from thread names and client id into StreamThread.
Committer Checklist (excluded from commit message)