Do not create new Executor everytime createRunner#32272
Conversation
| OutputManager outputManager, | ||
| DoFnSchemaInformation doFnSchemaInformation, | ||
| Map<String, PCollectionView<?>> sideInputMapping) { | ||
| if (this.ses == null) { |
There was a problem hiding this comment.
added debug log in both branch and run a sample job, saw
INFO 2024-08-21T18:47:53.569Z creating new sdf executor
INFO 2024-08-21T18:47:55.698Z Reuse old sdf executor
INFO 2024-08-21T18:47:56.812Z Reuse old sdf executor
INFO 2024-08-21T18:47:57.919Z Reuse old sdf executor
INFO 2024-08-21T18:47:59.030Z Reuse old sdf executor
INFO 2024-08-21T18:48:00.138Z Reuse old sdf executor
INFO 2024-08-21T18:48:00.699Z Reuse old sdf executor
INFO 2024-08-21T18:48:01.871Z Reuse old sdf executor
INFO 2024-08-21T18:48:03.044Z Reuse old sdf executor
INFO 2024-08-21T18:48:04.121Z Reuse old sdf executor
INFO 2024-08-21T18:48:04.715Z Reuse old sdf executor
INFO 2024-08-21T18:48:05.821Z Reuse old sdf executor
...
indicating this change is effective
|
internal tracker: 361080602 example dataflow job before: 2024-08-21_11_14_45-8135795605757551727 memory usage increasing overtime (will ultimately OOM due to thread leak) after: 2024-08-21_11_39_15-12114433774010769671 memory usage is flat |
|
R: @scwhittle |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
|
assign set of reviewers |
|
Assigning reviewers. If you would like to opt out of this review, comment R: @chamikaramj added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| Map<String, PCollectionView<?>> sideInputMapping) { | ||
| if (this.ses == null) { | ||
| this.ses = | ||
| Executors.newSingleThreadScheduledExecutor( |
There was a problem hiding this comment.
I don't think we want a single threaded one here, because I believe the factory vends many different dofnrunner which will want some parallel threads for splits.
There was a problem hiding this comment.
changed to use Executors.newScheduledThreadPool
| OutputManager outputManager, | ||
| DoFnSchemaInformation doFnSchemaInformation, | ||
| Map<String, PCollectionView<?>> sideInputMapping) { | ||
| if (this.ses == null) { |
There was a problem hiding this comment.
if this factory possibly called concurrently? might need some synchronization if so?
There was a problem hiding this comment.
I am not sure but now guard with AtomicReference
|
|
||
| if (this.ses == null) { | ||
| this.ses = | ||
| Executors.newSingleThreadScheduledExecutor( |
There was a problem hiding this comment.
not yet using this below
...ava/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/beam/runners/samza/runtime/SplittableParDoProcessKeyedElementsOp.java
Outdated
Show resolved
Hide resolved
scwhittle
left a comment
There was a problem hiding this comment.
Thanks for fixing these!
| OutputManager outputManager, | ||
| DoFnSchemaInformation doFnSchemaInformation, | ||
| Map<String, PCollectionView<?>> sideInputMapping) { | ||
| if (this.ses.get() == null) { |
There was a problem hiding this comment.
remembered a cleaner way to do this:
private final Supplier sesSupplier =
Suppliers.memoize(
() -> Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build())));
and then just ses.get() below
There was a problem hiding this comment.
thanks, good to learn, for now merging it before release cut
* Do not create new Executor everytime createRunner * reset executorService after shutdown * Switch to use newScheduledThreadPool; guard ses with AtomicReference * Partially revert changes on flink and samza runner
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.