fix(core): Try to exhaust Zstd's internal buffers when they might contain unconsumed data. (fixes #976).#977
Conversation
|
Warning Rate limit exceeded@junhaoliao has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 20 minutes and 47 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
""" WalkthroughThe decompression logic in the Zstd Decompressor was refactored to centralize and simplify buffer refilling through a new helper method. Error handling and frame status tracking were unified and improved, and a new private member was introduced to indicate if more data remains in the current frame. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant Decompressor
participant ReaderInterface
Caller->>Decompressor: try_read()
Decompressor->>Decompressor: check compressed buffer
alt buffer exhausted and no more frame data
Decompressor->>Decompressor: refill_compressed_stream_block()
alt input type is ReaderInterface
Decompressor->>ReaderInterface: try_read()
ReaderInterface-->>Decompressor: data/error
end
end
Decompressor->>Decompressor: decompress()
Decompressor->>Decompressor: update m_zstd_frame_might_have_more_data based on decompress result
Decompressor->>Caller: return success or error code
Suggested reviewers
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
| if (m_compressed_stream_block.pos == m_compressed_stream_block.size | ||
| && false == m_frame_has_more_data) | ||
| { | ||
| auto const error_code{refill_compressed_buffer()}; |
There was a problem hiding this comment.
i did the refactoring as i was navigating through the long function. let me know if it's better to revert the function extraction.
There was a problem hiding this comment.
that I extracted a helper method. i think it's good for clarity though it's not strictly required for the fix
@kirkrodrigues I thought your comment https://github.com/y-scope/clp/pull/977/files#r2133962400 was posted on this thread. sorry for the confusion
| ); | ||
| return ErrorCode_Failure; | ||
| } | ||
| m_frame_has_more_data = (ret > 0); |
There was a problem hiding this comment.
adding a test case into components/core/tests/test-StreamingCompression.cpp is possible but may require some significant amount of refactoring of the current cases. let me know if i should proceed with the changes
There was a problem hiding this comment.
that I extracted a helper method. i think it's good for clarity though it's not strictly required for the fix
There was a problem hiding this comment.
You extracted a helper method for the test cases?
There was a problem hiding this comment.
sorry I was super blind... thought I was looking at another comment
a case that best illustrates the issue is that it writes a small chunk (less than ZSTD_BLOCKSIZE) of compressed data, creates a decompressor with a truncated view of the compressed data, and only expects correctness of certain leading uncompressed bytes with no error, until the rest in the frame is fully consumed
in the current test code, the "compress" and "decompress_and_compare" helpers use for-loops to write and expect full correctness of more than one chunk of data. There's also no negative tests. I was thinking we should extract the compressor / file reader / decompressor creation logic so that we don't necessarily repeat those across different cases
anyways, let me clean up the test source file on my end, and then we can discuss whether / how much we want to check things in
There was a problem hiding this comment.
my attempt to add the test case and refactor:
#include <algorithm>
#include <array>
#include <cstring>
#include <memory>
#include <numeric>
#include <string>
#include <string_view>
#include <utility>
#include <boost/filesystem/operations.hpp>
#include <catch2/catch.hpp>
#include <ystdlib/containers/Array.hpp>
#include <zstd.h>
#include "../src/clp/ErrorCode.hpp"
#include "../src/clp/FileWriter.hpp"
#include "../src/clp/ReadOnlyMemoryMappedFile.hpp"
#include "../src/clp/streaming_compression/Compressor.hpp"
#include "../src/clp/streaming_compression/Decompressor.hpp"
#include "../src/clp/streaming_compression/lzma/Compressor.hpp"
#include "../src/clp/streaming_compression/lzma/Decompressor.hpp"
#include "../src/clp/streaming_compression/passthrough/Compressor.hpp"
#include "../src/clp/streaming_compression/passthrough/Decompressor.hpp"
#include "../src/clp/streaming_compression/zstd/Compressor.hpp"
#include "../src/clp/streaming_compression/zstd/Decompressor.hpp"
using clp::ErrorCode_Success;
using clp::FileWriter;
using clp::streaming_compression::Compressor;
using clp::streaming_compression::Decompressor;
using std::string;
using std::string_view;
using ystdlib::containers::Array;
namespace {
constexpr string_view cCompressedFilePath{"test_streaming_compressed_file.bin"};
constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB
constexpr auto cCompressionChunkSizes = std::to_array<size_t>(
{0,
cBufferSize / 100,
cBufferSize / 50,
cBufferSize / 25,
cBufferSize / 10,
cBufferSize / 5,
cBufferSize / 2,
cBufferSize}
);
auto fill_buffer(Array<char>& buffer, size_t alphabet_length) -> void;
auto compress(
std::unique_ptr<Compressor> compressor,
char const* src,
std::span<size_t const> const& chunk_sizes
) -> void;
auto decompress_and_compare(
std::unique_ptr<Decompressor> decompressor,
Array<char> const& uncompressed_buffer,
Array<char>& decompressed_buffer
) -> void;
auto fill_buffer(Array<char>& buffer, size_t const alphabet_length) -> void {
for (size_t i{0}; i < buffer.size(); ++i) {
buffer.at(i) = static_cast<char>(('a' + (i % alphabet_length)));
}
}
auto compress(
std::unique_ptr<Compressor> compressor,
char const* src,
std::span<size_t const> const& chunk_sizes
) -> void {
FileWriter file_writer;
file_writer.open(string(cCompressedFilePath), FileWriter::OpenMode::CREATE_FOR_WRITING);
compressor->open(file_writer);
for (auto const chunk_size : chunk_sizes) {
compressor->write(src, chunk_size);
}
compressor->close();
file_writer.close();
}
auto decompress_and_compare(
std::unique_ptr<Decompressor> decompressor,
Array<char> const& uncompressed_buffer,
Array<char>& decompressed_buffer
) -> void {
clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{string(cCompressedFilePath)};
auto const compressed_file_view{memory_mapped_compressed_file.get_view()};
decompressor->open(compressed_file_view.data(), compressed_file_view.size());
size_t num_uncompressed_bytes{0};
for (auto const chunk_size : cCompressionChunkSizes) {
// Clear the buffer to ensure that we are not comparing values from a previous test
std::ranges::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0);
REQUIRE(
(ErrorCode_Success
== decompressor->get_decompressed_stream_region(
num_uncompressed_bytes,
decompressed_buffer.data(),
chunk_size
))
);
REQUIRE(std::equal(
uncompressed_buffer.begin(),
uncompressed_buffer.begin() + chunk_size,
decompressed_buffer.begin()
));
num_uncompressed_bytes += chunk_size;
}
REQUIRE(
(std::accumulate(
cCompressionChunkSizes.cbegin(),
cCompressionChunkSizes.cend(),
size_t{0}
)
== num_uncompressed_bytes)
);
}
} // namespace
TEST_CASE("StreamingCompression", "[StreamingCompression]") {
constexpr size_t cAlphabetLength{26};
std::unique_ptr<Compressor> compressor;
std::unique_ptr<Decompressor> decompressor;
Array<char> decompressed_buffer(cBufferSize);
Array<char> uncompressed_buffer(cBufferSize);
fill_buffer(uncompressed_buffer, cBufferSize);
SECTION("ZStd single phase compression") {
compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>();
compress(std::move(compressor), uncompressed_buffer.data(), cCompressionChunkSizes);
decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>();
decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer);
}
SECTION("Passthrough compression") {
compressor = std::make_unique<clp::streaming_compression::passthrough::Compressor>();
compress(std::move(compressor), uncompressed_buffer.data(), cCompressionChunkSizes);
decompressor = std::make_unique<clp::streaming_compression::passthrough::Decompressor>();
decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer);
}
SECTION("LZMA compression") {
compressor = std::make_unique<clp::streaming_compression::lzma::Compressor>();
compress(std::move(compressor), uncompressed_buffer.data(), cCompressionChunkSizes);
decompressor = std::make_unique<clp::streaming_compression::lzma::Decompressor>();
}
boost::filesystem::remove(string(cCompressedFilePath));
}
TEST_CASE("SmallZstdFile", "[SmallZstdFile]") {
constexpr size_t cAlphabetLength{26};
std::unique_ptr<Compressor> compressor;
std::unique_ptr<Decompressor> decompressor;
Array<char> decompressed_buffer(cAlphabetLength);
Array<char> uncompressed_buffer(cAlphabetLength);
fill_buffer(uncompressed_buffer, cAlphabetLength);
SECTION("ZStd single phase compression") {
compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>();
auto const single_chunk{std::to_array<size_t>({cAlphabetLength})};
compress(std::move(compressor), uncompressed_buffer.data(), single_chunk);
decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>();
clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{
string(cCompressedFilePath)
};
auto const compressed_file_view{memory_mapped_compressed_file.get_view()};
decompressor->open(
compressed_file_view.data(),
// Truncate the EOF byte to let the decompressor consume the whole first frame on
// the first read.
compressed_file_view.size() - 1
);
// Suppose after the truncate, the decompressor can still read half of the first frame.
constexpr auto cValidReadLength{cAlphabetLength / 2};
for (size_t i{0}; i < cValidReadLength; ++i) {
REQUIRE(
(ErrorCode_Success
== decompressor->try_read_exact_length(decompressed_buffer.begin() + i, 1))
);
}
REQUIRE(std::equal(
uncompressed_buffer.begin(),
uncompressed_buffer.begin() + cValidReadLength,
decompressed_buffer.begin()
));
}
}| * @return ErrorCode_Success on success | ||
| * @return ErrorCode_EndOfFile if no more data is available | ||
| */ | ||
| [[nodiscard]] auto refill_compressed_buffer() -> ErrorCode; |
There was a problem hiding this comment.
How about refill_compressed_stream_block to match the variable name?
| ZSTD_inBuffer m_compressed_stream_block{}; | ||
|
|
||
| size_t m_decompressed_stream_pos{0ULL}; | ||
| bool m_frame_has_more_data{false}; |
There was a problem hiding this comment.
How about m_zstd_frame_has_more_data?
| if (ErrorCode_Success != error_code) { | ||
| num_bytes_read = decompressed_stream_block.pos; | ||
| return decompressed_stream_block.pos > 0 ? ErrorCode_Success : error_code; |
There was a problem hiding this comment.
If error_code isn't ErrorCode_EndOfFile, then I don't think it's appropriate to return ErrorCode_Success to the caller.
| num_bytes_read = decompressed_stream_block.pos; | ||
| return decompressed_stream_block.pos > 0 ? ErrorCode_Success : error_code; |
There was a problem hiding this comment.
I think another bug in the existing code is that we don't update m_decompressed_stream_pos before we return here. Can you add it?
| } | ||
| } | ||
|
|
||
| auto Decompressor::reset_stream() -> void { |
There was a problem hiding this comment.
We need to set m_frame_has_more_data to false in this method.
| ); | ||
| return ErrorCode_Failure; | ||
| } | ||
| m_frame_has_more_data = (ret > 0); |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (2)
228-243:⚠️ Potential issue
reset_stream()still doesn’t clearm_frame_has_more_dataPast feedback noted that failing to reset this flag can leave the decompressor thinking a frame is still open when we’ve actually rewound the stream, leading to incorrect refill/loop logic.
Add an explicit reset:ZSTD_initDStream(m_decompression_stream); m_decompressed_stream_pos = 0; +m_frame_has_more_data = false;
40-47:⚠️ Potential issueUpdate decompressed‐stream position before early return
When
refill_compressed_buffer()signalsEndOfFile, the function may bail out here after having already produceddecompressed_stream_block.posbytes.
m_decompressed_stream_posis not updated in this branch, so callers will observe a stale stream position.if (ErrorCode_Success != error_code) { + // Advance logical position by the bytes we *did* manage to produce + m_decompressed_stream_pos += decompressed_stream_block.pos; num_bytes_read = decompressed_stream_block.pos; return decompressed_stream_block.pos > 0 ? ErrorCode_Success : error_code; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ` rather than `!`.
**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}: - Preferfalse == <expression>rather than!<expression>.
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
🧬 Code Graph Analysis (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (5)
components/core/src/clp/streaming_compression/zstd/Decompressor.hpp (2)
buf(63-64)OperationFailed(23-24)components/core/src/clp/FileReader.hpp (1)
buf(58-58)components/core/src/clp/LibarchiveFileReader.hpp (2)
buf(63-63)buf(106-106)components/core/src/clp/StringReader.hpp (1)
buf(59-59)components/core/src/clp/streaming_compression/Decompressor.hpp (1)
OperationFailed(17-18)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: ubuntu-jammy-lint
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: build-macos (macos-13, false)
| auto const error_code = m_reader->try_read( | ||
| read_buffer.data(), | ||
| read_buffer.size(), | ||
| m_read_buffer_length | ||
| ); | ||
|
|
||
| if (ErrorCode_Success != error_code) { | ||
| return error_code; | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Guard against zero-byte reads to avoid tight loops
ReaderInterface::try_read may legally return ErrorCode_Success with m_read_buffer_length == 0 (e.g. non-blocking I/O).
The current logic will keep re-calling ZSTD_decompressStream with an empty input buffer, resulting in an infinite loop and 100 % CPU.
if (ErrorCode_Success != error_code) {
return error_code;
}
+m_compressed_stream_block.pos = 0;
+m_compressed_stream_block.size = m_read_buffer_length;
+
+// Treat a successful but zero-byte read as EOF to break the outer loop
+if (0 == m_read_buffer_length) {
+ return ErrorCode_EndOfFile;
+}
-m_compressed_stream_block.pos = 0;
-m_compressed_stream_block.size = m_read_buffer_length;
return ErrorCode_Success;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| auto const error_code = m_reader->try_read( | |
| read_buffer.data(), | |
| read_buffer.size(), | |
| m_read_buffer_length | |
| ); | |
| if (ErrorCode_Success != error_code) { | |
| return error_code; | |
| } | |
| auto const error_code = m_reader->try_read( | |
| read_buffer.data(), | |
| read_buffer.size(), | |
| m_read_buffer_length | |
| ); | |
| if (ErrorCode_Success != error_code) { | |
| return error_code; | |
| } | |
| m_compressed_stream_block.pos = 0; | |
| m_compressed_stream_block.size = m_read_buffer_length; | |
| // Treat a successful but zero-byte read as EOF to break the outer loop | |
| if (0 == m_read_buffer_length) { | |
| return ErrorCode_EndOfFile; | |
| } | |
| return ErrorCode_Success; |
🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.cpp around
lines 209 to 217, the code does not handle the case where try_read returns
ErrorCode_Success but sets m_read_buffer_length to zero, which can cause an
infinite loop. Add a guard after the try_read call to check if
m_read_buffer_length is zero, and if so, return an appropriate error code or
handle it to prevent the loop from continuing with an empty input buffer.
…d_buffer() is not EndOfFile.
…ed_stream_block for consistency
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (1)
213-227: 🛠️ Refactor suggestionZero-byte reads still unhandled – risk of another infinite loop
A prior review (see outdated comment) already pointed out thatReaderInterface::try_readmay legally returnErrorCode_Successwithm_read_buffer_length == 0.
That scenario is still not handled: we resetpos/sizeto zero and immediately returnErrorCode_Success, after whichtry_read()will re-enter the outer loop and spin forever.auto const error_code = m_reader->try_read( read_buffer.data(), read_buffer.size(), m_read_buffer_length ); - if (ErrorCode_Success != error_code) { + if (ErrorCode_Success != error_code) { return error_code; } + + // Break potential busy-loops on successful but empty reads + if (0 == m_read_buffer_length) { + return ErrorCode_EndOfFile; + }This is the exact safeguard proposed earlier but not yet merged.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp(3 hunks)components/core/src/clp/streaming_compression/zstd/Decompressor.hpp(2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ` rather than `!`.
**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}: - Preferfalse == <expression>rather than!<expression>.
components/core/src/clp/streaming_compression/zstd/Decompressor.hppcomponents/core/src/clp/streaming_compression/zstd/Decompressor.cpp
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: ubuntu-jammy-lint
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: build-macos (macos-15, true)
| if (m_compressed_stream_block.pos == m_compressed_stream_block.size | ||
| && false == m_zstd_frame_has_more_data) | ||
| { | ||
| auto const error_code{refill_compressed_stream_block()}; | ||
| if (ErrorCode_Success != error_code) { | ||
| m_decompressed_stream_pos += decompressed_stream_block.pos; | ||
|
|
||
| num_bytes_read = decompressed_stream_block.pos; | ||
| if (ErrorCode_EndOfFile == error_code && decompressed_stream_block.pos > 0) { |
There was a problem hiding this comment.
Frame-continuation check inverted – decompressor can busy-loop forever
refill_compressed_stream_block() is only invoked when
false == m_zstd_frame_has_more_data.
According to the zstd manual, ret > 0 (i.e. true for m_zstd_frame_has_more_data) means more compressed bytes are required to finish the current frame.
When the compressed buffer is exhausted and m_zstd_frame_has_more_data is true, we must fetch more input.
As written, the loop keeps calling ZSTD_decompressStream with an empty input buffer, resulting in a tight infinite loop and 100 % CPU.
- if (m_compressed_stream_block.pos == m_compressed_stream_block.size
- && false == m_zstd_frame_has_more_data)
+ // Refill whenever the input buffer is empty – regardless of frame state.
+ if (m_compressed_stream_block.pos == m_compressed_stream_block.size)This change honours both normal frame boundaries and the “needs-more-input” contract.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (m_compressed_stream_block.pos == m_compressed_stream_block.size | |
| && false == m_zstd_frame_has_more_data) | |
| { | |
| auto const error_code{refill_compressed_stream_block()}; | |
| if (ErrorCode_Success != error_code) { | |
| m_decompressed_stream_pos += decompressed_stream_block.pos; | |
| num_bytes_read = decompressed_stream_block.pos; | |
| if (ErrorCode_EndOfFile == error_code && decompressed_stream_block.pos > 0) { | |
| // Refill whenever the input buffer is empty – regardless of frame state. | |
| if (m_compressed_stream_block.pos == m_compressed_stream_block.size) | |
| { | |
| auto const error_code{refill_compressed_stream_block()}; | |
| if (ErrorCode_Success != error_code) { | |
| m_decompressed_stream_pos += decompressed_stream_block.pos; | |
| num_bytes_read = decompressed_stream_block.pos; | |
| if (ErrorCode_EndOfFile == error_code && decompressed_stream_block.pos > 0) { |
🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.cpp around
lines 40 to 48, the condition checking m_zstd_frame_has_more_data is inverted,
causing refill_compressed_stream_block() to be called only when no more data is
needed, leading to a busy loop. Fix this by changing the condition to call
refill_compressed_stream_block() when m_zstd_frame_has_more_data is true and the
compressed stream block is fully consumed, ensuring more input is fetched as
required by the zstd frame continuation contract.
There was a problem hiding this comment.
@junhaoliao I think the rabbit might be correct about this? If Zstd_decompressStream exhausts m_compressed_stream_block but Zstd_decompressStream still returns > 0, then we should try to read more from the underlying data, right?
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
There was a problem hiding this comment.
https://facebook.github.io/zstd/zstd_manual.html#Chapter9
@return...
any other value > 0, which means there is still some decoding or flushing to do to complete current frame
assuming ZSTD_decompressStream() implementation is correct, it should return > 0 only if a future call of it is allowed (and expected). in that case, we don't have to load new data into the input buffer
There was a problem hiding this comment.
I understand your interpretation (I think that's what I thought initially too), but unfortunately I think they mean it very literally:
- if the return is
>0, then the frame hasn't been fully decoded. If at this point the input buffer has been exhausted, then the only way to decode more of the frame would be to provide more input---which is why the return value is a hint for how much input to provide. - if
output.pos == output.size, then there might be more data that Zstd_decompressStream can provide, without providing more input data.
I think this sentence also agrees with that interpretation:
The function tries to flush all data decoded immediately, respecting output buffer size.
Fwiw, I wrote a simple unit test to check the interpretation and it does seem like we'll get into an infinite loop if we hit this case.
There was a problem hiding this comment.
if output.pos == output.size, then there might be more data that Zstd_decompressStream can provide, without providing more input data.
i updated
m_zstd_frame_has_more_data = decompressed_stream_block.pos == decompressed_stream_block.size;
which now strictly follows the recommendation of the zstd manual
| /** | ||
| * Refills m_compressed_stream_block with data from the underlying input medium. | ||
| * | ||
| * @return ErrorCode_Success on success | ||
| * @return ErrorCode_EndOfFile if no more data is available | ||
| * @return Forwards `ReaderInterface::try_read`'s return values. | ||
| */ | ||
| [[nodiscard]] auto refill_compressed_stream_block() -> ErrorCode; | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Minor doc nit – clarify propagated error codes
Consider explicitly mentioning that ErrorCode_Corrupt, ErrorCode_NotReady, etc., can also be forwarded from the underlying ReaderInterface::try_read to prevent ambiguity for future maintainers.
🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.hpp around
lines 123 to 131, the method documentation for refill_compressed_stream_block
does not explicitly list all possible error codes forwarded from
ReaderInterface::try_read. Update the comment to explicitly mention that error
codes like ErrorCode_Corrupt, ErrorCode_NotReady, and others from
ReaderInterface::try_read can also be propagated, to provide clearer guidance
for future maintainers.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (2)
40-41:⚠️ Potential issueFix inverted frame continuation logic to prevent infinite loop.
The condition
false == m_zstd_frame_has_more_datais inverted. According to the zstd manual, whenZSTD_decompressStreamreturns> 0, it means more compressed input is needed to finish the current frame. The buffer should be refilled when it's exhausted, regardless of the frame state.Apply this diff to fix the logic:
- if (m_compressed_stream_block.pos == m_compressed_stream_block.size - && false == m_zstd_frame_has_more_data) + // Refill whenever the input buffer is empty + if (m_compressed_stream_block.pos == m_compressed_stream_block.size)
219-226:⚠️ Potential issueAdd guard against zero-byte reads to prevent infinite loops.
The code doesn't handle the case where
try_readreturnsErrorCode_Successbut setsm_read_buffer_lengthto 0, which can cause an infinite loop when the decompressor repeatedly processes empty input.Apply this diff to add the guard:
if (ErrorCode_Success != error_code) { return error_code; } +// Treat a successful but zero-byte read as EOF +if (0 == m_read_buffer_length) { + return ErrorCode_EndOfFile; +} + m_compressed_stream_block.pos = 0; m_compressed_stream_block.size = m_read_buffer_length; return ErrorCode_Success;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp(3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ` rather than `!`.
**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}: - Preferfalse == <expression>rather than!<expression>.
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
🧠 Learnings (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (1)
Learnt from: Bill-hbrhbr
PR: y-scope/clp#614
File: components/core/tests/test-StreamingCompression.cpp:45-54
Timestamp: 2024-12-04T15:50:37.827Z
Learning: In `components/core/tests/test-StreamingCompression.cpp`, within the `compress` function, compressing the same data repeatedly by passing the same `src` pointer without advancing is intentional to test the compressor with the same data multiple times.
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: lint-check (ubuntu-latest)
- GitHub Check: build-macos (macos-14, true)
- GitHub Check: build-macos (macos-15, false)
- GitHub Check: build-macos (macos-14, false)
- GitHub Check: build-macos (macos-15, true)
🔇 Additional comments (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (1)
246-246: Good addition to reset frame state.Properly resetting
m_zstd_frame_has_more_datato false ensures clean state when the stream is reset.
| ); | ||
| return ErrorCode_Failure; | ||
| } | ||
| m_zstd_frame_has_more_data = decompressed_stream_block.pos == decompressed_stream_block.size; |
There was a problem hiding this comment.
Incorrect logic for setting frame continuation flag.
The frame continuation flag should be set based on the return value of ZSTD_decompressStream, not whether the output buffer is full. According to the zstd manual, ret > 0 indicates more data is needed to complete the current frame.
Replace line 68 with:
- m_zstd_frame_has_more_data = decompressed_stream_block.pos == decompressed_stream_block.size;
+ m_zstd_frame_has_more_data = ret > 0;🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.cpp at line
68, the logic for setting m_zstd_frame_has_more_data is incorrect because it
checks if the output buffer is full instead of the return value of
ZSTD_decompressStream. Update the assignment to set m_zstd_frame_has_more_data
to true if the return value from ZSTD_decompressStream is greater than zero,
indicating more data is needed to complete the frame.
| ZSTD_inBuffer m_compressed_stream_block{}; | ||
|
|
||
| size_t m_decompressed_stream_pos{0ULL}; | ||
| bool m_zstd_frame_has_more_data{false}; |
There was a problem hiding this comment.
more accurately, this should be m_zstd_frame_might_has_more_data
There was a problem hiding this comment.
I agree. Let's use m_zstd_frame_might_have_more_data (grammar).
kirkrodrigues
left a comment
There was a problem hiding this comment.
Let's defer the unit tests to another PR where we can make them more exhaustive and robust.
For the PR title, how about:
fix(core): Try to exhaust Zstd's internal buffers when they might contain unconsumed data. (fixes #976).
| m_decompressed_stream_pos += decompressed_stream_block.pos; | ||
|
|
||
| num_bytes_read = decompressed_stream_block.pos; |
There was a problem hiding this comment.
| m_decompressed_stream_pos += decompressed_stream_block.pos; | |
| num_bytes_read = decompressed_stream_block.pos; | |
| m_decompressed_stream_pos += decompressed_stream_block.pos; | |
| num_bytes_read = decompressed_stream_block.pos; | |
Just feels like those two operations should be grouped together.
There was a problem hiding this comment.
i thought we intentionally grouped the num_bytes_read assignment and the return statement together since num_bytes_read is an output of the method. shall we also group the ones at the end of this method body?
There was a problem hiding this comment.
for now i also made the changes at the bottom of the method and removed the "Update decompression stream position" comment which i feel is unnecessary. we can revert if you disagree
| ZSTD_inBuffer m_compressed_stream_block{}; | ||
|
|
||
| size_t m_decompressed_stream_pos{0ULL}; | ||
| bool m_zstd_frame_has_more_data{false}; |
There was a problem hiding this comment.
I agree. Let's use m_zstd_frame_might_have_more_data (grammar).
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
…ight_have_more_data` for clarity
@coderabbitai create an issue to track the unit tests improvement |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain@junhaoliao I'll create a GitHub issue to track the unit tests improvement for the zstd decompression functionality. 🏁 Script executed: Length of output: 1182 Perfect! I've successfully created GitHub issue #1009 to track the unit tests improvement for the zstd decompression functionality. Issue created: #1009 The issue includes:
This will help ensure the unit tests improvement work doesn't get forgotten and provides a clear roadmap for making the tests more exhaustive and robust. |
…tain unconsumed data. (fixes y-scope#976). (y-scope#977)
Description
ZSTD_decompressStream: http://facebook.github.io/zstd/zstd_manual.html#Chapter9Checklist
breaking change.
Validation performed
clp::streaming_compression::zstd::Decompressorprematurely returns EOF when buffered data remains #976 . Ran all cases and observed success.[yscope-contrib-guidelines]: https://docs.yscope.com/dev-guide/contrib-guides-overview.html
Summary by CodeRabbit
Summary by CodeRabbit