Skip to content

Conversation

@mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Mar 7, 2025

Which issue does this PR close?

Closes #1438.

Rationale for this change

We want to use the unified shuffle memory allocator in unified memory mode (off-heap) and the bounded shuffle memory allocator in on-heap mode.

What changes are included in this PR?

  • Rename both shuffle memory allocators to make their behavior more clear.
  • Simplify shuffle memory allocator instantiation
  • Remove config to get bounded allocator
  • Remove test for config that we no longer support (unified memory with bounded allocator)

How are these changes tested?

Existing tests.

@codecov-commenter
Copy link

codecov-commenter commented Mar 7, 2025

Codecov Report

Attention: Patch coverage is 56.66667% with 13 lines in your changes missing coverage. Please review.

Project coverage is 58.56%. Comparing base (f09f8af) to head (daa6c61).
Report is 75 commits behind head on main.

Files with missing lines Patch % Lines
...ark/shuffle/comet/CometShuffleMemoryAllocator.java 12.50% 6 Missing and 1 partial ⚠️
...ffle/comet/CometUnifiedShuffleMemoryAllocator.java 66.66% 3 Missing and 1 partial ⚠️
...ffle/comet/CometBoundedShuffleMemoryAllocator.java 0.00% 1 Missing ⚠️
...park/src/main/scala/org/apache/spark/Plugins.scala 88.88% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1485      +/-   ##
============================================
+ Coverage     56.12%   58.56%   +2.43%     
- Complexity      976     1014      +38     
============================================
  Files           119      123       +4     
  Lines         11743    12259     +516     
  Branches       2251     2304      +53     
============================================
+ Hits           6591     7179     +588     
+ Misses         4012     3932      -80     
- Partials       1140     1148       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mbutrovich mbutrovich changed the title fix: Only instantiate CometTestShuffleMemoryAllocator in testing fix: Adjust CometTestShuffleMemoryAllocator instantiation Mar 10, 2025
// CometShuffleMemoryAllocator stores pages in TaskMemoryManager which is not singleton,
// but one instance per task. So we need to create a new instance for each task.
return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still confused about this. This seems to imply that we only perform columnar shuffle when off-heap mode is enabled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with TPC-H without using off-heap and ran into this:

Caused by: java.lang.IllegalArgumentException: CometShuffleMemoryAllocator should be used with off-heap memory mode, but got ON_HEAP
	at org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.getInstance(CometShuffleMemoryAllocator.java:74)
	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.<init>(CometDiskBlockWriter.java:142)
	at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:181)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Comet 0.3.0 we had a different implementation of CometShuffleMemoryAllocator that managed its own memory separately from Spark:

/**
 * A simple memory allocator used by `CometShuffleExternalSorter` to allocate memory blocks which
 * store serialized rows. We don't rely on Spark memory allocator because we need to allocate
 * off-heap memory no matter memory mode is on-heap or off-heap. This allocator is configured with
 * fixed size of memory, and it will throw `SparkOutOfMemoryError` if the memory is not enough.
 *
 * <p>Some methods are copied from `org.apache.spark.unsafe.memory.TaskMemoryManager` with
 * modifications. Most modifications are to remove the dependency on the configured memory mode.
 */
public final class CometShuffleMemoryAllocator extends MemoryConsumer {

In Comet 0.4.0 we forced use of off-heap memory and in Comet 0.5.0 we reverted that decision but inadvertently started using the test version of the allocator.

Perhaps we must re-instate this earlier version to support the on-heap use case. I am not sure, though. I wonder if @viirya can make any recommendation on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the original implementation was renamed to CometTestShuffleMemoryAllocator. So maybe we need to rename this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this memory allocator implementation doesn't change across the versions. It was just renamed to CometTestShuffleMemoryAllocator to reflect the decision to use it for test only purpose at the time.

@mbutrovich mbutrovich changed the title fix: Adjust CometTestShuffleMemoryAllocator instantiation fix: Rename CometTestShuffleMemoryAllocator since it can be used outside of tests Mar 12, 2025
@mbutrovich mbutrovich requested a review from andygrove March 12, 2025 13:57
…related to shuffle memory allocator after conversation with @andygrove. Behavior now is on-heap enabled -> CometBoundedShuffleMemory Allocator. Off-heap enabled -> CometUnifiedShuffleMemoryAllocator
@mbutrovich mbutrovich changed the title fix: Rename CometTestShuffleMemoryAllocator since it can be used outside of tests fix: Simplify CometShuffleMemoryAllocator logic, rename classes, remove config Mar 12, 2025
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good. I ran some quick tests locally and did not find any issues. I will test more thoroughly once the test regressions are fixed.

…unified memory with bounded shuffle memory allocator, which is no longer a valid config.
CometConf.COMET_EXEC_ENABLED.defaultValue.get)
val unified_memory = CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)

comet_enabled && (comet_exec_shuffle || comet_exec) && !unified_memory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is much clearer

# Conflicts:
#	spark/src/main/scala/org/apache/spark/Plugins.scala
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @mbutrovich!

@andygrove andygrove merged commit 4ea60ce into apache:main Mar 13, 2025
78 checks passed
@mbutrovich mbutrovich deleted the test_allocator branch April 11, 2025 17:21
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Columnar shuffle uses wrong memory allocator in unified memory mode

4 participants