Skip to content

KAFKA-7798: Expose embedded clientIds#6107

Merged
mjsax merged 10 commits into
apache:trunkfrom
guozhangwang:K7798-embed-client-context
Feb 2, 2019
Merged

KAFKA-7798: Expose embedded clientIds#6107
mjsax merged 10 commits into
apache:trunkfrom
guozhangwang:K7798-embed-client-context

Conversation

@guozhangwang

Copy link
Copy Markdown
Contributor
  1. Add consumer / restoreConsumer / producer(s) / admin client ids via ThreadMetadata; for producerIds, if EOS is turned on add the list of task-producer-ids, otherwise it is a singleton of thread-producer-id.

  2. Consolidate the logic of generating clientIds from thread names and client id into StreamThread.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang guozhangwang changed the title KAFKA-7798: expose embedded client context KAFKA-7798: Expose embedded clientIds Jan 9, 2019
@guozhangwang

Copy link
Copy Markdown
Contributor Author

@abbccdda @vvcephei @bbejeck


final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
final StreamThread thread = new StreamThread(
mockTime,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we change the style here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did a copy-paste of the code and this is intellij auto formatting. Current style seems right to me.

@bbejeck bbejeck left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang for the patch. Overall looks good to me, but I'm getting some test failures locally on StreamThreadTest.

@bbejeck bbejeck left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this comment, I posted twice by accident.

@guozhangwang

Copy link
Copy Markdown
Contributor Author

@bbejeck That's due to a recent commit that caused conflicts, pushed another commit to resolve it.

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Just left some comments

return threadClientId + "-restore-consumer";
}

public static String getSharedAdminClientId(final String clientId) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a comment on explaining what shared means in this context?

return this;
}

private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep the format of updateThreadMetadata to avoid unnecessary changes on L1221?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot completely follow since L1221 did not change, could you elaborate?

@abbccdda abbccdda Jan 16, 2019

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, if we keep parameters in one line:
private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, final Map<TaskId, StandbyTask> standbyTasks) then the changes shall be reduced.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line is quite long, thus we should reformat it to make the code more readable. I am +1 on this change -- also, I don't think we should optimize for fewer line changes :)

// package-private for testing only
StreamThread updateThreadMetadata(final String adminClientId) {

threadMetadata = new ThreadMetadata(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this constructor is only called once, could we just reuse the constructor augmented:

public ThreadMetadata(final String threadName,
                          final String threadState,
                          final String mainConsumerClientId,
                          final String restoreConsumerClientId,
                          final Set<String> producerClientIds,
                          final String adminClientId,
                          final Set<TaskMetadata> activeTasks,
                          final Set<TaskMetadata> standbyTasks)

and set active & standby tasks with empty set? I figure this could reduce code redundancy.

if (globalStreamThread != null) {
result.putAll(globalStreamThread.consumerMetrics());
}
// own streams metrics

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Own could sound like a verb. Maybe self streams metrics will be better?

@mjsax mjsax added the streams label Jan 14, 2019
@guozhangwang

Copy link
Copy Markdown
Contributor Author

@mjsax could you give another look on this?

@dguy dguy left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang left a few minor comments

@Deprecated
public Map<String, Object> getConsumerConfigs(final String groupId,
final String clientId) {
public Map<String, Object> getConsumerConfigs(final String groupId, final String clientId) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we usually split params over multiple lines?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a subjective question. I personally prefer to have one parameter per line. :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I personally prefer only doing that for 3+ parameters and they cannot fit in 150 chars which is what a normal laptop screen would fit.

@SuppressWarnings("WeakerAccess")
public Map<String, Object> getMainConsumerConfigs(final String groupId,
final String clientId) {
public Map<String, Object> getMainConsumerConfigs(final String groupId, final String clientId) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return threadClientId + "-producer";
}

private static String getThreadConsumerClientId(final String threadClientId) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess this and the next one could be getConsumerClientId and getRestoreConsumerId as there is only ever one of each?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the arguments be the other way around? i.e., assertEquals(expected, actual) ?
better still use assertThat() :-P

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getMainConsumerConfigs(groupId, clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here. I know it was wrong before, but would be good to correct.

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of nits.

}

// add client id with stream client id prefix, and group id
// this is a hack to work around StreamsConfig constructor inside StreamPartitionAssignor to avoid casting

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand this comment? Can you elaborate?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside StreamPartitionAssignor we wrap the passed-in props map into a InternalStreamsConfig in order to read its config values without casting. But InternalStreamsConfig extends StreamsConfig requires application id as a must-have, so we need to pass in this value to consumer config prop values as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Makes sense.

However, this is not really clear from the comment. Maybe rephrase to avoid confusion?

logContext,
assignmentErrorCode);
assignmentErrorCode)
.updateThreadMetadata(getSharedAdminClientId(clientId));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not pass the admin client-id into the constructor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about passing in the client-id, it is a bit overkill in addition to the thread-client-id just to set the shared admin client inside the constructor. I felt the current way is less intrusive but if you feel strong the other way we can discuss.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the shared admit client id does never change, it seems weird to have a setter that allow to update is at any time. It seems cleaner to pass it in the constructor and make it immutable this way.

return this;
}

private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line is quite long, thus we should reformat it to make the code more readable. I am +1 on this change -- also, I don't think we should optimize for fewer line changes :)

@Override
public int hashCode() {
return Objects.hash(threadName, threadState, activeTasks, standbyTasks);
return Objects.hash(threadName, threadState, activeTasks, standbyTasks, mainConsumerClientId, restoreConsumerClientId, producerClientIds, adminClientId);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we break this long line into one parameter per line?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

public void before() {
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.put(StreamsConfig.CLIENT_ID_CONFIG, "appId");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why reuse appId -- if there is a bug, and we set APPLICATION_ID_CONFIG by mistake, it would not be detected. Maybe better to set a different id.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call.

@bbejeck bbejeck left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM modulo comments from @dguy

@guozhangwang

Copy link
Copy Markdown
Contributor Author

Updated per your comments @mjsax @dguy

result.putAll(thread.consumerMetrics());
// admin client is shared, so we can actually move it
// to result.putAll(adminClient.metrics()).
// we did it intentionally just for flexibility.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not go ahead and do this now?

@mjsax

mjsax commented Feb 2, 2019

Copy link
Copy Markdown
Member

@guozhangwang I am merging this to cut the release branch. Please address the left over comment in a follow up PR. Thanks.

@mjsax mjsax merged commit 9dc76f8 into apache:trunk Feb 2, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk:
  fix typo (apache#5150)
  MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887)
  KAFKA-7766: Fail fast PR builds (apache#6059)
  KAFKA-7798: Expose embedded clientIds (apache#6107)
  KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163)
  KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377)
  MINOR: Fix some field definitions for ListOffsetReponse (apache#6214)
  KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203)
  KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022)
  MINOR: fix checkstyle suppressions for generated RPC code to work on Windows
  KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188)
  KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161)
  KAFKA-3522: Add RocksDBTimestampedStore (apache#6149)
  KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Reviewers: Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>, Boyang Chen <bchen11@outlook.com>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
@guozhangwang guozhangwang deleted the K7798-embed-client-context branch April 25, 2020 00:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants