Skip to content

Conversation

@zuston
Copy link
Member

@zuston zuston commented Oct 8, 2022

What changes were proposed in this pull request?

  1. Introduce the ZSTD compression
  2. Introduce the abstract interface of codec
  3. Recycle the buffer to optimize the performance

Why are the changes needed?

ZSTD has a good tradeoff between compression ratio and de/compress speed. For reducing the shuffle-data stored size, it's necessary to support this compression algorithm.

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Manual tests and UTs

@zuston
Copy link
Member Author

zuston commented Oct 8, 2022

Terasort Compression Benchmark

100GB terasort

c1
c2
c3
c4

@zuston zuston requested a review from jerqi October 8, 2022 07:49
@zuston
Copy link
Member Author

zuston commented Oct 8, 2022

PTAL @jerqi

@codecov-commenter
Copy link

codecov-commenter commented Oct 8, 2022

Codecov Report

Merging #254 (4557866) into master (47effb2) will decrease coverage by 0.14%.
The diff coverage is 63.54%.

@@             Coverage Diff              @@
##             master     #254      +/-   ##
============================================
- Coverage     59.71%   59.56%   -0.15%     
- Complexity     1377     1381       +4     
============================================
  Files           166      171       +5     
  Lines          8918     8983      +65     
  Branches        853      859       +6     
============================================
+ Hits           5325     5351      +26     
- Misses         3318     3353      +35     
- Partials        275      279       +4     
Impacted Files Coverage Δ
...rg/apache/hadoop/mapred/RssMapOutputCollector.java 0.00% <0.00%> (ø)
.../java/org/apache/hadoop/mapreduce/RssMRConfig.java 23.07% <0.00%> (-51.93%) ⬇️
...pache/hadoop/mapreduce/task/reduce/RssShuffle.java 0.00% <0.00%> (ø)
.../java/org/apache/spark/shuffle/RssSparkConfig.java 90.90% <0.00%> (-5.87%) ⬇️
...ava/org/apache/uniffle/common/RssShuffleUtils.java 0.00% <ø> (-95.66%) ⬇️
...g/apache/uniffle/common/compression/NoOpCodec.java 0.00% <0.00%> (ø)
...g/apache/uniffle/common/compression/ZstdCodec.java 72.22% <72.22%> (ø)
...a/org/apache/uniffle/common/compression/Codec.java 80.00% <80.00%> (ø)
...e/spark/shuffle/reader/RssShuffleDataIterator.java 90.54% <84.61%> (+1.80%) ⬆️
...rg/apache/uniffle/common/config/RssClientConf.java 90.90% <90.90%> (ø)
... and 5 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@zuston zuston requested a review from jerqi October 9, 2022 02:02

int uncompressedLen = compressedBlock.getUncompressLength();
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
uncompressedData = ByteBuffer.allocate(uncompressedLen);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In original implementation, the bytebuffer will be destoryed and recreate. So to avoid the frequent GC, it use the offheap-bytebuffer.

And in this PR, we will recycle the bytebuffer, so I think it's no need to use the off-heap memory now. Maybe we should add the off-heap support in the next PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @jerqi . This is the different with the original implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok for me.

@jerqi
Copy link
Contributor

jerqi commented Oct 9, 2022

Should we add the document?

@zuston
Copy link
Member Author

zuston commented Oct 9, 2022

Should we add the document?

Done

@zuston zuston requested a review from jerqi October 9, 2022 05:39

int uncompressedLen = compressedBlock.getUncompressLength();
if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
uncompressedData = ByteBuffer.allocate(uncompressedLen);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok for me.

@zuston zuston requested a review from jerqi October 10, 2022 03:35
@jerqi jerqi requested a review from frankliee October 10, 2022 03:58
@frankliee
Copy link
Contributor

frankliee commented Oct 10, 2022

Can you provide the test report of JVM memory usage?
If the new compressor uses much more memory, it will increase the risk of OOM.

return builder.stringConf();
}

public static RssConf toRssConf(SparkConf sparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change conf design in this ZSTD PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make compressorFactory accessed by MR and Spark to create concrete codec which will be initialized by specified conf, so it will have two choice.

  1. Use the shareable RssConf like this PR
  2. Introduce the extra config bean of compression (I think there is no need to do so)

Besides, I want to refactor the code of MR/Spark client conf entry, this PR is to do some partial work. Please refer to #200

@zuston
Copy link
Member Author

zuston commented Oct 10, 2022

Can you provide the test report of JVM memory usage? If the new compressor uses much more memory, it will increase the risk of OOM.

The monitor sceenshot is as follows, I dont find the obvious difference.

x1

x2

@zuston zuston requested a review from frankliee October 10, 2022 08:46
@zuston
Copy link
Member Author

zuston commented Oct 10, 2022

Updated @frankliee

@zuston
Copy link
Member Author

zuston commented Oct 12, 2022

Gentle ping @frankliee @jerqi

@zuston
Copy link
Member Author

zuston commented Oct 13, 2022

Do u have any other concerns? Please let me know @jerqi @frankliee


package org.apache.uniffle.common.compression;

public interface Compressor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to merge Compressor and Decompressor into one interface "Codec" like hadoop ?
It is more concise and avoid to mix different pair of Compressor and Decompressor.

@jerqi @zuston

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do a simple review about hadoop codec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the compress/decompress could share the same interface for the user.
For example, CompressionCodec has createOutputStream (for compress) and createInputStream (for decompress).

Copy link
Member Author

@zuston zuston Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean that I need to create similar Zstd/LZ4CompressionCodec to implement the Compressor and Decompressor interface?

If that, it will make hard to init the corresponding var for specific compressor or decompressor, like the this.lz4Factory = LZ4Factory.fastestInstance();.

Please let me know if i'm wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can only provide a Codec instead of CompressionFactory, which hides the inner compressor and decompressor.
The user could directly use Codec to compress or decompress data, so that the user does not need to use compressor and decompressor directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose the new commit according to your idea, 866b642

Do I get your point? @frankliee

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this style

abstract class Codec {
   private static class Compressor {}
   private static class deCompressor{}
   private getCompressor()  // for init lazily
   private getDeCompressor()
 
   public compress() 
   public deCompress()
}

ZSTDCodec extends Codec {}
LZ4Codec extends Codec {}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm.... OK. I will obey this project style.

@zuston zuston requested a review from jerqi October 18, 2022 08:26
@zuston
Copy link
Member Author

zuston commented Oct 18, 2022

PTAL @jerqi

@zuston
Copy link
Member Author

zuston commented Oct 21, 2022

Bug: I found the zstd has no such method of decompressByteArray in Spark2.4.6 zstd version of 1.3.2-2

To be compatible with the older version, I think I should use the reflection to check it.

This problem will be fixed in the next PR. We could merge this firstly.

I have updated latest commit, could u help review @frankliee @jerqi . If having any problem, I think I could do quick fix in this weekend.

Latest commit changelog:

  1. Introduce the abstract class Codec

@zuston
Copy link
Member Author

zuston commented Oct 24, 2022

Gentle ping @jerqi @frankliee

@zuston
Copy link
Member Author

zuston commented Oct 26, 2022

Updated @frankliee . Could you help review again?

@zuston zuston requested a review from frankliee October 26, 2022 06:57
@frankliee
Copy link
Contributor

LGTM, thanks for your contributions.

@frankliee frankliee merged commit 01def93 into apache:master Oct 26, 2022
jerqi pushed a commit that referenced this pull request Nov 6, 2022
### What changes were proposed in this pull request?
This PR adds the support of snappy compression/decompression based on the example of #254. 

### Why are the changes needed?
Add a new feature.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants