-
Notifications
You must be signed in to change notification settings - Fork 168
Support ZSTD #254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ZSTD #254
Conversation
|
PTAL @jerqi |
Codecov Report
@@ Coverage Diff @@
## master #254 +/- ##
============================================
- Coverage 59.71% 59.56% -0.15%
- Complexity 1377 1381 +4
============================================
Files 166 171 +5
Lines 8918 8983 +65
Branches 853 859 +6
============================================
+ Hits 5325 5351 +26
- Misses 3318 3353 +35
- Partials 275 279 +4
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java
Outdated
Show resolved
Hide resolved
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
Outdated
Show resolved
Hide resolved
|
|
||
| int uncompressedLen = compressedBlock.getUncompressLength(); | ||
| if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) { | ||
| uncompressedData = ByteBuffer.allocate(uncompressedLen); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In original implementation, the bytebuffer will be destoryed and recreate. So to avoid the frequent GC, it use the offheap-bytebuffer.
And in this PR, we will recycle the bytebuffer, so I think it's no need to use the off-heap memory now. Maybe we should add the off-heap support in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL @jerqi . This is the different with the original implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for me.
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
Outdated
Show resolved
Hide resolved
|
Should we add the document? |
Done |
|
|
||
| int uncompressedLen = compressedBlock.getUncompressLength(); | ||
| if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) { | ||
| uncompressedData = ByteBuffer.allocate(uncompressedLen); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok for me.
common/src/main/java/org/apache/uniffle/common/compression/Lz4Compressor.java
Outdated
Show resolved
Hide resolved
|
Can you provide the test report of JVM memory usage? |
common/src/main/java/org/apache/uniffle/common/compression/NoOpCompressor.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java
Outdated
Show resolved
Hide resolved
...ration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
Outdated
Show resolved
Hide resolved
| return builder.stringConf(); | ||
| } | ||
|
|
||
| public static RssConf toRssConf(SparkConf sparkConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change conf design in this ZSTD PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to make compressorFactory accessed by MR and Spark to create concrete codec which will be initialized by specified conf, so it will have two choice.
- Use the shareable RssConf like this PR
- Introduce the extra config bean of compression (I think there is no need to do so)
Besides, I want to refactor the code of MR/Spark client conf entry, this PR is to do some partial work. Please refer to #200
|
Updated @frankliee |
|
Gentle ping @frankliee @jerqi |
|
Do u have any other concerns? Please let me know @jerqi @frankliee |
...tion-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/compression/ZstdCompressor.java
Outdated
Show resolved
Hide resolved
|
|
||
| package org.apache.uniffle.common.compression; | ||
|
|
||
| public interface Compressor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do a simple review about hadoop codec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont find co/decompressor mixed in Hadoop one interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the compress/decompress could share the same interface for the user.
For example, CompressionCodec has createOutputStream (for compress) and createInputStream (for decompress).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you mean that I need to create similar Zstd/LZ4CompressionCodec to implement the Compressor and Decompressor interface?
If that, it will make hard to init the corresponding var for specific compressor or decompressor, like the this.lz4Factory = LZ4Factory.fastestInstance();.
Please let me know if i'm wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can only provide a Codec instead of CompressionFactory, which hides the inner compressor and decompressor.
The user could directly use Codec to compress or decompress data, so that the user does not need to use compressor and decompressor directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I propose the new commit according to your idea, 866b642
Do I get your point? @frankliee
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer this style
abstract class Codec {
private static class Compressor {}
private static class deCompressor{}
private getCompressor() // for init lazily
private getDeCompressor()
public compress()
public deCompress()
}
ZSTDCodec extends Codec {}
LZ4Codec extends Codec {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm.... OK. I will obey this project style.
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java
Show resolved
Hide resolved
|
PTAL @jerqi |
This problem will be fixed in the next PR. We could merge this firstly. I have updated latest commit, could u help review @frankliee @jerqi . If having any problem, I think I could do quick fix in this weekend. Latest commit changelog:
|
|
Gentle ping @jerqi @frankliee |
common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
Outdated
Show resolved
Hide resolved
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
Show resolved
Hide resolved
...test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
Outdated
Show resolved
Hide resolved
|
Updated @frankliee . Could you help review again? |
|
LGTM, thanks for your contributions. |
### What changes were proposed in this pull request? This PR adds the support of snappy compression/decompression based on the example of #254. ### Why are the changes needed? Add a new feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT






What changes were proposed in this pull request?
Why are the changes needed?
ZSTD has a good tradeoff between compression ratio and de/compress speed. For reducing the shuffle-data stored size, it's necessary to support this compression algorithm.
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Manual tests and UTs