Skip to content

Conversation

@poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Aug 1, 2025

Motivation

Background of scheduleWithFixedDelay and scheduleWithFixedRate

  • scheduleAtFixedRate: Executes tasks at fixed intervals (period) regardless of task execution time,
    • If task execution time ≤ period: Runs normally at the specified interval (e.g., every 1 second).
    • If task execution time > period: New tasks will trigger immediately (or queue up), causing backlog (may lead to consecutive executions).
  • scheduleWithFixedDelay: Trigger Mechanism: Waits for the previous task to complete, then delays by the fixed interval (delay) before the next execution.
    • Regardless of task duration, the interval is always measured from the end of the last task.

Issue: broker ran checkMessageExpiry many times in 1s

When system resources are tight, it leads to a large backlog of tasks checkMessageExpiry, which are executed multiple times at the same time, causing even greater pressure

image

Modifications

Use scheduleAtFixedRateNonConcurrently to override scheduleWithFixedRate, which executes tasks at fixed intervals (period) regardless of task execution time, and never causes backlogs

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.1.0 milestone Aug 1, 2025
@poorbarcode poorbarcode self-assigned this Aug 1, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 1, 2025
@Technoboy-
Copy link
Contributor

Could we just fix the checkMessageExpiry from scheduleWithFixedRate to scheduleWithFixedDelay ?

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid making a major behavioral change that could cause breaking changes, we should only make scheduleAtFixedRate -> scheduleWithFixedDelay changes to the tasks where the execution interval is relatively short.

For scheduled tasks that continue use scheduleAtFixedRate, it would be useful to implement a wrapper that keeps state of in progress execution and skips that task and logs a warning if the previous task is still in execution. We already have the org.apache.pulsar.common.util.Runnables class in pulsar-common and we could add this logic there. The existing catchingAndLoggingThrowables method could take another argument boolean skipConcurrentExecution that would handle skipping a concurrent call to the Runnable instance while the previous execution is happening.

WDYT?

@dao-jun
Copy link
Member

dao-jun commented Aug 4, 2025

I think maybe we don't need to replace all scheduleWithFixedDelay to scheduleWithFixedRate, for these tasks which costs long time, replace to scheduleWithFixedRate I afraid it will cost high broker work load?

@lhotari
Copy link
Member

lhotari commented Aug 4, 2025

I think maybe we don't need to replace all scheduleWithFixedDelay to scheduleWithFixedRate, for these tasks which costs long time, replace to scheduleWithFixedRate I afraid it will cost high broker work load?

@dao-jun I don't think that anyone is suggesting to replace scheduleWithFixedDelay with scheduleAtFixedRate. It's the other way around that is in this PR.

@poorbarcode poorbarcode requested a review from lhotari August 4, 2025 09:56
@lhotari lhotari changed the title [improve][broker]Call scheduleWithFixedDelay instead of scheduleWithFixedRate [improve][broker]Call scheduleWithFixedDelay instead of scheduleAtFixedRate Aug 4, 2025
@lhotari
Copy link
Member

lhotari commented Aug 4, 2025

@poorbarcode The description refers scheduleWithFixedRate. The correct method name is scheduleAtFixedRate

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR involves many changes.

Share the comparison of these two methods from Gemini AI:

scheduleAtFixedRate is best for:
Time-sensitive tasks that need a fixed schedule (e.g., cron jobs, hourly chimes). Tasks where a fixed pause between executions is important, especially if task duration is unpredictable.

scheduleWithFixedDelay is best for
Tasks where a fixed pause between executions is important, especially if task duration is unpredictable.

The choice between scheduleAtFixedRate and scheduleWithFixedDelay depends entirely on your specific use case.

Since the issue is exactly with checkMessageExpiry, please only change its related logic.

@BewareMyPower
Copy link
Contributor

Even with this change, it's nearly impossible to prevent new code from calling scheduleAtFixedRate. It's unfair to say one is definitely better than the other.

@lhotari
Copy link
Member

lhotari commented Aug 4, 2025

This PR involves many changes.

Share the comparison of these two methods from Gemini AI:

scheduleAtFixedRate is best for:
Time-sensitive tasks that need a fixed schedule (e.g., cron jobs, hourly chimes). Tasks where a fixed pause between executions is important, especially if task duration is unpredictable.
scheduleWithFixedDelay is best for
Tasks where a fixed pause between executions is important, especially if task duration is unpredictable.
The choice between scheduleAtFixedRate and scheduleWithFixedDelay depends entirely on your specific use case.

Since the issue is exactly with checkMessageExpiry, please only change its related logic.

@BewareMyPower I think that analysis misses the detail that scheduleWithFixedDelay will prevent concurrent execution. My suggestion would be to continue to use scheduleAtFixedRate, but add the solution to Runnables class to prevent concurrent execution which could cause many issues. (explained in my previous comment)

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Aug 5, 2025

Actually the original analysis includes that point, but it's too long that I didn't include it.

However, back to the issue, users should not configure a short interval like 1s and it's impossible because it can only be configured by messageExpiryCheckIntervalInMinutes, whose time unit is minute. Even with the smallest config (1 minute), could scheduleAtFixedRate be an actual issue? If so, it means BrokerService#checkMessageExpiry could take more than 1 minute, which is crazy. Instead of switching to scheduleWithFixedDelay, we should investigate why it could take so long. If it cannot be optimized, it should not be fixed at Pulsar side. In this case, it should be an operation issue that should be fixed by:

  • Increase the CPU (since it means there are a lot of topics to be served)
  • Increase the number of brokers (reduce the number of topics processed by each broker)
  • Increase the messageExpiryCheckIntervalInMinutes config (to avoid frequent time consuming operation)

P.S. From the screenshot, message expiry check is scheduled every 5 minutes, which is a default value. It also shows no queued tasks being executed immediately.

scheduleWithFixedDelay might be able to avoid queuing scheduled tasks in extreme cases, but from all existing info I can see, it does not solve an actual issue.

@lhotari
Copy link
Member

lhotari commented Aug 5, 2025

scheduleWithFixedDelay might be able to avoid queuing scheduled tasks in extreme cases, but from all existing info I can see, it does not solve an actual issue.

@BewareMyPower I agree that it doesn't fix the actual issue. That's why I'd recommend using scheduleAtFixedRate with the wrapper that optionally logs a warning when the previous execution is still running. In that case, it could be caused by a deadlock, for example. Logging would help in investigating and fixing the root cause. It would also be possible to track the thread that is in execution and log the stack trace of the thread that is stuck in executing the task.

Without preventing concurrent job execution, the problem would get worse since another task would start executing the same task concurrently. That could already happen in cases where we have very short delays and lets say use synchronization to run the task one-by-one. All later coming tasks would get back logged in the executor which is a problem that should be solved. I have understood that it's the problem that @poorbarcode would like to address.

@BewareMyPower
Copy link
Contributor

Yes, adding a wrapper of scheduleAtFixedRate makes sense

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Aug 5, 2025

Yes, adding a wrapper of scheduleAtFixedRate makes sense
That's why I'd recommend using scheduleAtFixedRate with the wrapper that optionally logs a warning when the previous execution is still running.

@BewareMyPower @lhotari Thanks for your suggestion, I will improve the implementation

@poorbarcode
Copy link
Contributor Author

Yes, adding a wrapper of scheduleAtFixedRate makes sense
That's why I'd recommend using scheduleAtFixedRate with the wrapper that optionally logs a warning when the previous execution is still running.

@BewareMyPower @lhotari Thanks for your suggestion, I will improve the implementation

Finished, please review again

@BewareMyPower BewareMyPower dismissed their stale review August 7, 2025 02:24

code changed

@poorbarcode poorbarcode requested a review from lhotari August 7, 2025 08:27
@codecov-commenter
Copy link

codecov-commenter commented Aug 7, 2025

Codecov Report

❌ Patch coverage is 71.65354% with 36 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.30%. Comparing base (bbc6224) to head (da0c2be).
⚠️ Report is 1263 commits behind head on master.

Files with missing lines Patch % Lines
...l/SingleThreadNonConcurrentFixedRateScheduler.java 67.28% 20 Missing and 15 partials ⚠️
...rg/apache/pulsar/broker/service/BrokerService.java 95.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24596      +/-   ##
============================================
+ Coverage     73.57%   74.30%   +0.73%     
- Complexity    32624    33185     +561     
============================================
  Files          1877     1882       +5     
  Lines        139502   146862    +7360     
  Branches      15299    16875    +1576     
============================================
+ Hits         102638   109133    +6495     
- Misses        28908    29052     +144     
- Partials       7956     8677     +721     
Flag Coverage Δ
inttests 26.59% <53.54%> (+2.01%) ⬆️
systests 23.29% <53.54%> (-1.03%) ⬇️
unittests 73.79% <71.65%> (+0.95%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...rg/apache/pulsar/broker/service/BrokerService.java 83.63% <95.00%> (+2.85%) ⬆️
...l/SingleThreadNonConcurrentFixedRateScheduler.java 67.28% <67.28%> (ø)

... and 1125 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@poorbarcode poorbarcode requested review from lhotari and removed request for lhotari August 7, 2025 10:47
@poorbarcode poorbarcode changed the title [improve][broker]Call scheduleWithFixedDelay instead of scheduleAtFixedRate [improve][broker]Call scheduleAtFixedRateNonConcurrently instead of scheduleAtFixedRate for scheduled tasks Aug 7, 2025
@poorbarcode poorbarcode changed the title [improve][broker]Call scheduleAtFixedRateNonConcurrently instead of scheduleAtFixedRate for scheduled tasks [improve][broker]Call scheduleAtFixedRateNonConcurrently for scheduled tasks, instead of scheduleAtFixedRate Aug 7, 2025
@coderzc coderzc modified the milestones: 4.1.0, 4.2.0 Sep 1, 2025
@coderzc
Copy link
Member

coderzc commented Sep 10, 2025

Modifications
Call scheduleWithFixedDelay instead of scheduleWithFixedRate

Please update the change description.

@poorbarcode
Copy link
Contributor Author

@coderzc

Please update the change description.

Thanks for mentioning this, please review again

@coderzc
Copy link
Member

coderzc commented Sep 24, 2025

@lhotari please review this pr again

@lhotari lhotari merged commit f41687c into apache:master Sep 25, 2025
58 of 59 checks passed
poorbarcode added a commit that referenced this pull request Sep 26, 2025
…d tasks, instead of scheduleAtFixedRate (#24596)

(cherry picked from commit f41687c)
poorbarcode added a commit that referenced this pull request Sep 26, 2025
…d tasks, instead of scheduleAtFixedRate (#24596)

(cherry picked from commit f41687c)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 29, 2025
…d tasks, instead of scheduleAtFixedRate (apache#24596)

(cherry picked from commit f41687c)
(cherry picked from commit c604530)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 29, 2025
…d tasks, instead of scheduleAtFixedRate (apache#24596)

(cherry picked from commit f41687c)
(cherry picked from commit c604530)
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants