-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][client]:Perform health checks on the endpoints that passed in by serviceUrl of PulsarClient #22935
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][client]:Perform health checks on the endpoints that passed in by serviceUrl of PulsarClient #22935
Conversation
|
@AuroraTwinkle Please add the following content to your PR description and select a checkbox: |
8ec287e to
45ffddf
Compare
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java
Outdated
Show resolved
Hide resolved
ec0f81d to
78e5c9b
Compare
e801c1f to
e66426e
Compare
e66426e to
c1d73d1
Compare
|
@liangyepianzhou I have fixed all your comments, PTAL, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a health check mechanism for endpoints passed through the serviceUrl in PulsarClient. Key changes include:
- Adding periodic health checks and removal of unhealthy endpoints in PulsarServiceNameResolver.
- Updating tests and client modules (HttpClient, BinaryProtoLookupService, AutoClusterFailover) to integrate and validate the new health check behavior.
- Introducing a Caffeine eviction listener to properly close resolvers when they expire.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java | Added a local server socket in tests to simulate endpoint health and verify removal of unreachable hosts. |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java | Extended ServiceNameResolver with the Closeable interface and a default close method. |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java | Introduced health check logic with scheduled periodic checks and updated endpoint lists. |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java | Ensured the resolver is closed when the HttpClient is shut down. |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java | Closed the resolver during cleanup. |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java | Closed the resolver during failover cleanup. |
| pulsar-broker/src/test/java/org/apache/pulsar/client/api/CreateConsumerProducerTest.java | Added a new test to validate client creation in scenarios with unavailable broker nodes. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java | Integrated a Caffeine eviction listener to close the resolver on cache eviction. |
Comments suppressed due to low confidence (1)
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarServiceNameResolverTest.java:59
- The accept loop in the anonymously spawned thread does not check whether the serverSocket has been closed, which can lead to continuous exception logging. Consider adding a condition to exit the loop when the serverSocket is closed.
new Thread(() -> { while (true) { try { serverSocket.accept(); } catch (IOException e) { e.printStackTrace(); } } }).start();
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
Outdated
Show resolved
Hide resolved
…arServiceNameResolver.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
034148e to
37de78a
Compare
| @Slf4j | ||
| public class PulsarServiceNameResolver implements ServiceNameResolver { | ||
|
|
||
| private static final int HEALTH_CHECK_TIMEOUT_MS = 5000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this feature gets added, it should be configurable.
| if (!healthCheckScheduled.get()) { | ||
| ScheduledFuture<?> future = | ||
| ((ScheduledExecutorService) executorProvider.getExecutor()).scheduleWithFixedDelay( | ||
| this::doHealthCheck, 0, 5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interval should be configurable. If interval is 0, there should be no healthchecks at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will write a new pip to add the health check interval and timeout parameters
| private static boolean checkAddress(InetSocketAddress address) { | ||
| try (Socket socket = new Socket()) { | ||
| socket.connect(new InetSocketAddress(address.getHostName(), address.getPort()), HEALTH_CHECK_TIMEOUT_MS); | ||
| return true; | ||
| } catch (Exception e) { | ||
| log.error("Health check error, failed to connect to {}, error:{}", address, e.getMessage()); | ||
| return false; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Pulsar code base, Java's Socket API is avoided. Netty API is used instead. Another detail is that Netty's DNS resolver is used and it has it's own DNS cache. Making a test connection where the address is resolved using the same DNS resolver cache will be more useful.
In this case, Netty API could be used synchronously so there wouldn't be a need to change to async style completely in health checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Netty DNS resolver (AddressResolver<InetSocketAddress>) for the client is created in ConnectionPool class. It would be necessary to use the same instance so that the client wouldn't have 2 separate DNS caches and 2 different DNS configurations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will fix it.
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general design is problematic since the health checking will keep running and creating TCP/IP connections that are immediately closed. This will cause additional load in the overall system, including endpoints (proxies / brokers). Additionally opening and closing a TCP/IP connection will keep the local port occupied in TIME_WAIT state for some time (2*MSL, 60s-240s depending on OS and it's config). SO_REUSEADDR/SO_REUSEPORT doesn't prevent port occupation since it doesn't help bypass TIME_WAIT restrictions for outbound client connections to the same 4-tuple (local ip, local port, remote ip, remote port).
Before actually implementing this health check feature, it would be necessary to describe the issue that is currently caused by not adding the health check and primary addressing that issue instead of implementing this solution in this PR.
|
Replied in #22934 (comment) about a better way to solve the actual problem. |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please read #22934 (comment). A different type of solution would solve the problem in a better way since this health checking solution causes it's own problems and overhead as explained in previous comments.
Ok, I will start a new PR for a better solution that mentioned at #22934 (comment). And I will close current PR. |
…nt for multi-endpoint serviceUrls (#24394) Fixes #22934 (comment) Main Issue: #22934 (comment) Implementation: #24387 ### Motivation As #22934 and #22933 mentioned, when most of the nodes in serviceurl are down (but there is at least one available node), creating consumers and producers through PulsarClient will most likely fail. I think this is not as expected. If the code is robust enough, as long as there is one available node, it should be accessible normally. Therefore, this pip is going to optimize the logic, remove unavailable nodes through the feedback mechanism, and improve the success rate of PulsarClient requests. By the way, #22935 removes faulty nodes through a regular health check mechanism, but this brings new problems (frequent creation of connections and increased system load), so this solution is abandoned. See #22934 (comment) for more details!
…nt for multi-endpoint serviceUrls (apache#24394) Fixes apache#22934 (comment) Main Issue: apache#22934 (comment) Implementation: apache#24387 ### Motivation As apache#22934 and apache#22933 mentioned, when most of the nodes in serviceurl are down (but there is at least one available node), creating consumers and producers through PulsarClient will most likely fail. I think this is not as expected. If the code is robust enough, as long as there is one available node, it should be accessible normally. Therefore, this pip is going to optimize the logic, remove unavailable nodes through the feedback mechanism, and improve the success rate of PulsarClient requests. By the way, apache#22935 removes faulty nodes through a regular health check mechanism, but this brings new problems (frequent creation of connections and increased system load), so this solution is abandoned. See apache#22934 (comment) for more details!
…nt for multi-endpoint serviceUrls (apache#24394) Fixes apache#22934 (comment) Main Issue: apache#22934 (comment) Implementation: apache#24387 ### Motivation As apache#22934 and apache#22933 mentioned, when most of the nodes in serviceurl are down (but there is at least one available node), creating consumers and producers through PulsarClient will most likely fail. I think this is not as expected. If the code is robust enough, as long as there is one available node, it should be accessible normally. Therefore, this pip is going to optimize the logic, remove unavailable nodes through the feedback mechanism, and improve the success rate of PulsarClient requests. By the way, apache#22935 removes faulty nodes through a regular health check mechanism, but this brings new problems (frequent creation of connections and increased system load), so this solution is abandoned. See apache#22934 (comment) for more details!
Main Issue: #22934
Motivation
Refer to issue: #22934
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: AuroraTwinkle#4