[improve][broker] PIP-307 Added assignedBrokerUrl to CloseProducerCmd to skip lookups upon producer reconnections during unloading#21408
Conversation
| /** | ||
| * Close this topic - close all producers and subscriptions associated with this topic. | ||
| * | ||
| * @param closeWithoutDisconnectingClients don't disconnect clients |
There was a problem hiding this comment.
nit: I think the parameter should be disconnectingClients, it will be clearer to other developers. But it is not this PR's responsibility.
There was a problem hiding this comment.
yes, I was following the closeWithoutWaitingClientDisconnect's convention. Probably we need a minor PR to fix this naming, if we want to refactor it.
There was a problem hiding this comment.
Agree, the naming is confusing at first.
There was a problem hiding this comment.
+1, disconnectingClients is better.
025aac7 to
2d42d6e
Compare
| final URI uri = new URI(producer.client.conf.isUseTls() | ||
| ? closeProducer.getAssignedBrokerServiceUrlTls() | ||
| : closeProducer.getAssignedBrokerServiceUrl()); | ||
| log.info("[{}] Broker notification of Closed producer: {}. Redirecting to {}.", | ||
| remoteAddress, closeProducer.getProducerId(), uri); | ||
| producer.getConnectionHandler().connectionClosed(this, 0L, Optional.of(uri)); | ||
| } catch (URISyntaxException e) { | ||
| log.error("[{}] Invalid redirect url {}/{} for {}", remoteAddress, | ||
| closeProducer.getAssignedBrokerServiceUrl(), | ||
| closeProducer.getAssignedBrokerServiceUrlTls(), |
There was a problem hiding this comment.
GetAssignedBrokerServiceUrl or GetAssignedBrokerServiceUrlTls could throw an InvalidStateException if the respective value is not present.
There was a problem hiding this comment.
Good point. I will update this part.
dragosvictor
left a comment
There was a problem hiding this comment.
Looks good overall, left a couple of cosmetic suggestions and questions.
| } | ||
| ); | ||
| } catch (Throwable e) { | ||
| log.error("Failed to DestinationBrokerLookupData for topic:{}", topic, e); |
There was a problem hiding this comment.
Nit: can we improve the error message here?
| log.error("Failed to DestinationBrokerLookupData for topic:{}", topic, e); | |
| log.error("Failed to lookup destination broker for topic:{}", topic, e); |
There was a problem hiding this comment.
sure. I will update this.
| if (data.dstBroker() != null) { | ||
| return Optional.of(data.dstBroker()); | ||
| } | ||
| return Optional.empty(); |
There was a problem hiding this comment.
Nit: we can simplify this to:
| if (data.dstBroker() != null) { | |
| return Optional.of(data.dstBroker()); | |
| } | |
| return Optional.empty(); | |
| return Optional.ofNullable(data.dstBroker()); |
There was a problem hiding this comment.
sure. I will update this.
| getOwnerRequest.complete(data.dstBroker()); | ||
| } | ||
| stateChangeListeners.notify(serviceUnit, data, null); | ||
| CompletableFuture<Integer> ownFuture = null; |
There was a problem hiding this comment.
The null assignment doesn't seem necessary here.
There was a problem hiding this comment.
sure. I will update this.
| stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data) | ||
| .whenComplete((__, e) -> log(e, serviceUnit, data, null)); |
There was a problem hiding this comment.
Question: why this is needed now?
There was a problem hiding this comment.
Good point.
I think we need to log the first and second if cases only.
Let me update this.
| /** | ||
| * Close this topic - close all producers and subscriptions associated with this topic. | ||
| * | ||
| * @param closeWithoutDisconnectingClients don't disconnect clients |
There was a problem hiding this comment.
Agree, the naming is confusing at first.
| connectionClosed(cnx, null, Optional.empty()); | ||
| } | ||
|
|
||
| public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs, Optional<URI> hostUrl) { |
There was a problem hiding this comment.
Just an observation: it's not clear to me when we're using a raw reference and when an Optional. Seems like we could've used Optional<Long> initialConnectionDelayMs just as well.
There was a problem hiding this comment.
I agree. I will make this Optional<Long> initialConnectionDelayMs
… to skip lookups upon producer reconnections during unloading
gaoran10
left a comment
There was a problem hiding this comment.
Great work! Left some trival comments.
| /** | ||
| * Close this topic - close all producers and subscriptions associated with this topic. | ||
| * | ||
| * @param closeWithoutDisconnectingClients don't disconnect clients |
There was a problem hiding this comment.
+1, disconnectingClients is better.
| final URI uri = new URI(producer.client.conf.isUseTls() | ||
| ? closeProducer.getAssignedBrokerServiceUrlTls() | ||
| : closeProducer.getAssignedBrokerServiceUrl()); |
There was a problem hiding this comment.
Thank you for fixing the similar issue below, but this one is still present.
There was a problem hiding this comment.
Thank you for checking this.
I changed the catch clause to catch IllegalStateException as well in case the broker does not return both urls.
|
Nice, looks good! |
PIP: #20748
Motivation
Please refer to the pip #20748
There will be a separate PR for the similar changes on
CloseConsumerCmd.Modifications
Please refer to the pip #20748
topic.close(..)will be called at the source broker two times:Releasing, without disconnecting producersOwned, to fully close the producers with assigned broker informationVerifying this change
Added new tests In
ExtensibleLoadManagerTestto cover this logic.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: heesung-sohn#53