fix: use least loaded broker to refresh metadata#2645
Conversation
8d06ab9 to
ede5508
Compare
functional_test.go
Outdated
| for _, broker := range brokers { | ||
| err := broker.Open(client.Config()) | ||
| if err != nil { | ||
| if err != nil && err != ErrAlreadyConnected { |
There was a problem hiding this comment.
Updated the functional tests to skip the ErrAlreadyConnected error because this change will use one broker from the broker list to refresh metadata, which is already connected when reaching the following validation.
|
@dnwe Could you please review my change? Thanks |
19b1221 to
f5c9e47
Compare
Seed brokers never change after client initialization. If the first seed broker became stale (still online, but moved to other Kafka cluster), Sarama client may use this stale broker to get the wrong metadata. To avoid using the stale broker to do metadata refresh, we will choose the least loaded broker in the cached broker list which is the similar to how the Java client implementation works: https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736 Contributes-to: IBM#2637 Signed-off-by: Hao Sun <haos@uber.com>
f5c9e47 to
98ec384
Compare
dnwe
left a comment
There was a problem hiding this comment.
Changes look good to me. One minor query on the existing deregisterBroker behaviour for the seedBrokers list, but happy to approve and merge as-is
| // deregisterBroker removes a broker from the broker list, and if it's | ||
| // not in the broker list, removes it from seedBrokers. | ||
| func (client *client) deregisterBroker(broker *Broker) { | ||
| client.lock.Lock() | ||
| defer client.lock.Unlock() | ||
|
|
||
| _, ok := client.brokers[broker.ID()] | ||
| if ok { | ||
| Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr()) | ||
| delete(client.brokers, broker.ID()) | ||
| return | ||
| } | ||
| if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] { | ||
| client.deadSeeds = append(client.deadSeeds, broker) | ||
| client.seedBrokers = client.seedBrokers[1:] |
There was a problem hiding this comment.
Currently we only seem to deregister the broker from the seedBrokers list if it's the first element in the list (after the most recent shuffle) — is that still the desired behaviour?
There was a problem hiding this comment.
Yes, I think it is expected. seed brokers only shuffle during client initialization or hard RefreshBrokers. After that, the cached broker list is empty, and the client will use the first seed broker to fetch metadata. deregisterBroker method deregisters the first seed broker will apply that moment when this first seed broker is unavailable.
There was a problem hiding this comment.
Do you know when we have a new release including this change so that our side can use it? @dnwe
There was a problem hiding this comment.
Seed brokers never change after client initialization. If the first seed
broker became stale (still online, but moved to other Kafka cluster),
Sarama client may use this stale broker to get the wrong metadata. To
avoid using the stale broker to do metadata refresh, we will choose the
least loaded broker in the cached broker list which is the similar to
how the Java client implementation works:
https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736
Contributes-to: #2637