-
Notifications
You must be signed in to change notification settings - Fork 270
fix: Simplify CometShuffleMemoryAllocator logic, rename classes, remove config #1485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… to the test allocator.
spark/src/main/java/org/apache/spark/shuffle/comet/CometTestShuffleMemoryAllocator.java
Show resolved
Hide resolved
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
Outdated
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1485 +/- ##
============================================
+ Coverage 56.12% 58.56% +2.43%
- Complexity 976 1014 +38
============================================
Files 119 123 +4
Lines 11743 12259 +516
Branches 2251 2304 +53
============================================
+ Hits 6591 7179 +588
+ Misses 4012 3932 -80
- Partials 1140 1148 +8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which is not singleton, | ||
| // but one instance per task. So we need to create a new instance for each task. | ||
| return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize); | ||
| if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still confused about this. This seems to imply that we only perform columnar shuffle when off-heap mode is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested with TPC-H without using off-heap and ran into this:
Caused by: java.lang.IllegalArgumentException: CometShuffleMemoryAllocator should be used with off-heap memory mode, but got ON_HEAP
at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.getInstance(CometShuffleMemoryAllocator.java:74)
at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.<init>(CometDiskBlockWriter.java:142)
at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:181)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Comet 0.3.0 we had a different implementation of CometShuffleMemoryAllocator that managed its own memory separately from Spark:
/**
* A simple memory allocator used by `CometShuffleExternalSorter` to allocate memory blocks which
* store serialized rows. We don't rely on Spark memory allocator because we need to allocate
* off-heap memory no matter memory mode is on-heap or off-heap. This allocator is configured with
* fixed size of memory, and it will throw `SparkOutOfMemoryError` if the memory is not enough.
*
* <p>Some methods are copied from `org.apache.spark.unsafe.memory.TaskMemoryManager` with
* modifications. Most modifications are to remove the dependency on the configured memory mode.
*/
public final class CometShuffleMemoryAllocator extends MemoryConsumer {
In Comet 0.4.0 we forced use of off-heap memory and in Comet 0.5.0 we reverted that decision but inadvertently started using the test version of the allocator.
Perhaps we must re-instate this earlier version to support the on-heap use case. I am not sure, though. I wonder if @viirya can make any recommendation on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, the original implementation was renamed to CometTestShuffleMemoryAllocator. So maybe we need to rename this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this memory allocator implementation doesn't change across the versions. It was just renamed to CometTestShuffleMemoryAllocator to reflect the decision to use it for test only purpose at the time.
spark/src/main/java/org/apache/spark/shuffle/comet/CometBoundedShuffleMemoryAllocator.java
Show resolved
Hide resolved
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
Outdated
Show resolved
Hide resolved
…related to shuffle memory allocator after conversation with @andygrove. Behavior now is on-heap enabled -> CometBoundedShuffleMemory Allocator. Off-heap enabled -> CometUnifiedShuffleMemoryAllocator
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. I ran some quick tests locally and did not find any issues. I will test more thoroughly once the test regressions are fixed.
…unified memory with bounded shuffle memory allocator, which is no longer a valid config.
| CometConf.COMET_EXEC_ENABLED.defaultValue.get) | ||
| val unified_memory = CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf) | ||
|
|
||
| comet_enabled && (comet_exec_shuffle || comet_exec) && !unified_memory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is much clearer
# Conflicts: # spark/src/main/scala/org/apache/spark/Plugins.scala
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java
Outdated
Show resolved
Hide resolved
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @mbutrovich!
Which issue does this PR close?
Closes #1438.
Rationale for this change
We want to use the unified shuffle memory allocator in unified memory mode (off-heap) and the bounded shuffle memory allocator in on-heap mode.
What changes are included in this PR?
How are these changes tested?
Existing tests.