Skip to content

fix(core): Try to exhaust Zstd's internal buffers when they might contain unconsumed data. (fixes #976).#977

Merged
junhaoliao merged 18 commits into
y-scope:mainfrom
junhaoliao:zstd-decompression
Jun 14, 2025
Merged

fix(core): Try to exhaust Zstd's internal buffers when they might contain unconsumed data. (fixes #976).#977
junhaoliao merged 18 commits into
y-scope:mainfrom
junhaoliao:zstd-decompression

Conversation

@junhaoliao

@junhaoliao junhaoliao commented Jun 7, 2025

Copy link
Copy Markdown
Member

Description

  1. According to the zstd manual, check for frame continuation by checking the return value of ZSTD_decompressStream: http://facebook.github.io/zstd/zstd_manual.html#Chapter9
  2. Refactor buffer refill logic into a separate method.

Checklist

  • The PR satisfies the [contribution guidelines][yscope-contrib-guidelines].
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  1. Added the test case in clp::streaming_compression::zstd::Decompressor prematurely returns EOF when buffered data remains #976 . Ran all cases and observed success.
  2. Replace the implementation in clp-ffi-js and was able to decode one log event with the code in ClpStreamReader fails with IR error code 4 during preamble deserialization clp-ffi-js#88
    [yscope-contrib-guidelines]: https://docs.yscope.com/dev-guide/contrib-guides-overview.html

Summary by CodeRabbit

Summary by CodeRabbit

  • Refactor
    • Improved the reliability and maintainability of the decompression process by centralizing and simplifying buffer refill logic.
    • Enhanced internal tracking of decompression progress for more accurate status reporting.

@junhaoliao junhaoliao requested a review from a team as a code owner June 7, 2025 08:30
@coderabbitai

coderabbitai Bot commented Jun 7, 2025

Copy link
Copy Markdown
Contributor

Warning

Rate limit exceeded

@junhaoliao has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 20 minutes and 47 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 3165895 and 93b1dc0.

📒 Files selected for processing (1)
  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (3 hunks)

"""

Walkthrough

The decompression logic in the Zstd Decompressor was refactored to centralize and simplify buffer refilling through a new helper method. Error handling and frame status tracking were unified and improved, and a new private member was introduced to indicate if more data remains in the current frame.

Changes

File(s) Change Summary
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp Refactored try_read to delegate buffer refilling to new refill_compressed_stream_block(); improved error handling and frame status tracking; updated reset_stream() to reset frame status.
components/core/src/clp/streaming_compression/zstd/Decompressor.hpp Added private method refill_compressed_stream_block() and private member m_zstd_frame_might_have_more_data to the Decompressor class.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant Decompressor
    participant ReaderInterface

    Caller->>Decompressor: try_read()
    Decompressor->>Decompressor: check compressed buffer
    alt buffer exhausted and no more frame data
        Decompressor->>Decompressor: refill_compressed_stream_block()
        alt input type is ReaderInterface
            Decompressor->>ReaderInterface: try_read()
            ReaderInterface-->>Decompressor: data/error
        end
    end
    Decompressor->>Decompressor: decompress()
    Decompressor->>Decompressor: update m_zstd_frame_might_have_more_data based on decompress result
    Decompressor->>Caller: return success or error code
Loading

Suggested reviewers

  • LinZhihao-723
  • davidlion
    """
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@junhaoliao junhaoliao requested review from gibber9809 and kirkrodrigues and removed request for gibber9809 and kirkrodrigues June 7, 2025 08:31
if (m_compressed_stream_block.pos == m_compressed_stream_block.size
&& false == m_frame_has_more_data)
{
auto const error_code{refill_compressed_buffer()};

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i did the refactoring as i was navigating through the long function. let me know if it's better to revert the function extraction.

@junhaoliao junhaoliao Jun 8, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that I extracted a helper method. i think it's good for clarity though it's not strictly required for the fix

@kirkrodrigues I thought your comment https://github.com/y-scope/clp/pull/977/files#r2133962400 was posted on this thread. sorry for the confusion

);
return ErrorCode_Failure;
}
m_frame_has_more_data = (ret > 0);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding a test case into components/core/tests/test-StreamingCompression.cpp is possible but may require some significant amount of refactoring of the current cases. let me know if i should proceed with the changes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of refactoring?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that I extracted a helper method. i think it's good for clarity though it's not strictly required for the fix

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You extracted a helper method for the test cases?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I was super blind... thought I was looking at another comment

a case that best illustrates the issue is that it writes a small chunk (less than ZSTD_BLOCKSIZE) of compressed data, creates a decompressor with a truncated view of the compressed data, and only expects correctness of certain leading uncompressed bytes with no error, until the rest in the frame is fully consumed

in the current test code, the "compress" and "decompress_and_compare" helpers use for-loops to write and expect full correctness of more than one chunk of data. There's also no negative tests. I was thinking we should extract the compressor / file reader / decompressor creation logic so that we don't necessarily repeat those across different cases

anyways, let me clean up the test source file on my end, and then we can discuss whether / how much we want to check things in

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my attempt to add the test case and refactor:

#include <algorithm>
#include <array>
#include <cstring>
#include <memory>
#include <numeric>
#include <string>
#include <string_view>
#include <utility>

#include <boost/filesystem/operations.hpp>
#include <catch2/catch.hpp>
#include <ystdlib/containers/Array.hpp>
#include <zstd.h>

#include "../src/clp/ErrorCode.hpp"
#include "../src/clp/FileWriter.hpp"
#include "../src/clp/ReadOnlyMemoryMappedFile.hpp"
#include "../src/clp/streaming_compression/Compressor.hpp"
#include "../src/clp/streaming_compression/Decompressor.hpp"
#include "../src/clp/streaming_compression/lzma/Compressor.hpp"
#include "../src/clp/streaming_compression/lzma/Decompressor.hpp"
#include "../src/clp/streaming_compression/passthrough/Compressor.hpp"
#include "../src/clp/streaming_compression/passthrough/Decompressor.hpp"
#include "../src/clp/streaming_compression/zstd/Compressor.hpp"
#include "../src/clp/streaming_compression/zstd/Decompressor.hpp"

using clp::ErrorCode_Success;
using clp::FileWriter;
using clp::streaming_compression::Compressor;
using clp::streaming_compression::Decompressor;
using std::string;
using std::string_view;
using ystdlib::containers::Array;

namespace {
constexpr string_view cCompressedFilePath{"test_streaming_compressed_file.bin"};
constexpr size_t cBufferSize{128L * 1024 * 1024};  // 128MB
constexpr auto cCompressionChunkSizes = std::to_array<size_t>(
        {0,
         cBufferSize / 100,
         cBufferSize / 50,
         cBufferSize / 25,
         cBufferSize / 10,
         cBufferSize / 5,
         cBufferSize / 2,
         cBufferSize}
);

auto fill_buffer(Array<char>& buffer, size_t alphabet_length) -> void;

auto compress(
        std::unique_ptr<Compressor> compressor,
        char const* src,
        std::span<size_t const> const& chunk_sizes
) -> void;

auto decompress_and_compare(
        std::unique_ptr<Decompressor> decompressor,
        Array<char> const& uncompressed_buffer,
        Array<char>& decompressed_buffer
) -> void;

auto fill_buffer(Array<char>& buffer, size_t const alphabet_length) -> void {
    for (size_t i{0}; i < buffer.size(); ++i) {
        buffer.at(i) = static_cast<char>(('a' + (i % alphabet_length)));
    }
}

auto compress(
        std::unique_ptr<Compressor> compressor,
        char const* src,
        std::span<size_t const> const& chunk_sizes
) -> void {
    FileWriter file_writer;
    file_writer.open(string(cCompressedFilePath), FileWriter::OpenMode::CREATE_FOR_WRITING);
    compressor->open(file_writer);
    for (auto const chunk_size : chunk_sizes) {
        compressor->write(src, chunk_size);
    }
    compressor->close();
    file_writer.close();
}

auto decompress_and_compare(
        std::unique_ptr<Decompressor> decompressor,
        Array<char> const& uncompressed_buffer,
        Array<char>& decompressed_buffer
) -> void {
    clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{string(cCompressedFilePath)};
    auto const compressed_file_view{memory_mapped_compressed_file.get_view()};
    decompressor->open(compressed_file_view.data(), compressed_file_view.size());

    size_t num_uncompressed_bytes{0};
    for (auto const chunk_size : cCompressionChunkSizes) {
        // Clear the buffer to ensure that we are not comparing values from a previous test
        std::ranges::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0);
        REQUIRE(
                (ErrorCode_Success
                 == decompressor->get_decompressed_stream_region(
                         num_uncompressed_bytes,
                         decompressed_buffer.data(),
                         chunk_size
                 ))
        );
        REQUIRE(std::equal(
                uncompressed_buffer.begin(),
                uncompressed_buffer.begin() + chunk_size,
                decompressed_buffer.begin()
        ));
        num_uncompressed_bytes += chunk_size;
    }

    REQUIRE(
            (std::accumulate(
                     cCompressionChunkSizes.cbegin(),
                     cCompressionChunkSizes.cend(),
                     size_t{0}
             )
             == num_uncompressed_bytes)
    );
}
}  // namespace

TEST_CASE("StreamingCompression", "[StreamingCompression]") {
    constexpr size_t cAlphabetLength{26};

    std::unique_ptr<Compressor> compressor;
    std::unique_ptr<Decompressor> decompressor;

    Array<char> decompressed_buffer(cBufferSize);
    Array<char> uncompressed_buffer(cBufferSize);
    fill_buffer(uncompressed_buffer, cBufferSize);

    SECTION("ZStd single phase compression") {
        compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>();
        compress(std::move(compressor), uncompressed_buffer.data(), cCompressionChunkSizes);
        decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>();
        decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer);
    }

    SECTION("Passthrough compression") {
        compressor = std::make_unique<clp::streaming_compression::passthrough::Compressor>();
        compress(std::move(compressor), uncompressed_buffer.data(), cCompressionChunkSizes);
        decompressor = std::make_unique<clp::streaming_compression::passthrough::Decompressor>();
        decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer);
    }

    SECTION("LZMA compression") {
        compressor = std::make_unique<clp::streaming_compression::lzma::Compressor>();
        compress(std::move(compressor), uncompressed_buffer.data(), cCompressionChunkSizes);
        decompressor = std::make_unique<clp::streaming_compression::lzma::Decompressor>();
    }

    boost::filesystem::remove(string(cCompressedFilePath));
}

TEST_CASE("SmallZstdFile", "[SmallZstdFile]") {
    constexpr size_t cAlphabetLength{26};

    std::unique_ptr<Compressor> compressor;
    std::unique_ptr<Decompressor> decompressor;
    Array<char> decompressed_buffer(cAlphabetLength);
    Array<char> uncompressed_buffer(cAlphabetLength);
    fill_buffer(uncompressed_buffer, cAlphabetLength);

    SECTION("ZStd single phase compression") {
        compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>();
        auto const single_chunk{std::to_array<size_t>({cAlphabetLength})};
        compress(std::move(compressor), uncompressed_buffer.data(), single_chunk);

        decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>();
        clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{
                string(cCompressedFilePath)
        };
        auto const compressed_file_view{memory_mapped_compressed_file.get_view()};
        decompressor->open(
                compressed_file_view.data(),
                // Truncate the EOF byte to let the decompressor consume the whole first frame on
                // the first read.
                compressed_file_view.size() - 1
        );

        // Suppose after the truncate, the decompressor can still read half of the first frame.
        constexpr auto cValidReadLength{cAlphabetLength / 2};
        for (size_t i{0}; i < cValidReadLength; ++i) {
            REQUIRE(
                    (ErrorCode_Success
                     == decompressor->try_read_exact_length(decompressed_buffer.begin() + i, 1))
            );
        }
        REQUIRE(std::equal(
                uncompressed_buffer.begin(),
                uncompressed_buffer.begin() + cValidReadLength,
                decompressed_buffer.begin()
        ));
    }
}

Comment thread components/core/src/clp/streaming_compression/zstd/Decompressor.hpp Outdated
Comment thread components/core/src/clp/streaming_compression/zstd/Decompressor.hpp
* @return ErrorCode_Success on success
* @return ErrorCode_EndOfFile if no more data is available
*/
[[nodiscard]] auto refill_compressed_buffer() -> ErrorCode;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about refill_compressed_stream_block to match the variable name?

ZSTD_inBuffer m_compressed_stream_block{};

size_t m_decompressed_stream_pos{0ULL};
bool m_frame_has_more_data{false};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about m_zstd_frame_has_more_data?

Comment on lines +44 to +46
if (ErrorCode_Success != error_code) {
num_bytes_read = decompressed_stream_block.pos;
return decompressed_stream_block.pos > 0 ? ErrorCode_Success : error_code;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error_code isn't ErrorCode_EndOfFile, then I don't think it's appropriate to return ErrorCode_Success to the caller.

Comment on lines +45 to +46
num_bytes_read = decompressed_stream_block.pos;
return decompressed_stream_block.pos > 0 ? ErrorCode_Success : error_code;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another bug in the existing code is that we don't update m_decompressed_stream_pos before we return here. Can you add it?

}
}

auto Decompressor::reset_stream() -> void {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set m_frame_has_more_data to false in this method.

);
return ErrorCode_Failure;
}
m_frame_has_more_data = (ret > 0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of refactoring?

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (2)

228-243: ⚠️ Potential issue

reset_stream() still doesn’t clear m_frame_has_more_data

Past feedback noted that failing to reset this flag can leave the decompressor thinking a frame is still open when we’ve actually rewound the stream, leading to incorrect refill/loop logic.
Add an explicit reset:

 ZSTD_initDStream(m_decompression_stream);
 m_decompressed_stream_pos = 0;
+m_frame_has_more_data     = false;

40-47: ⚠️ Potential issue

Update decompressed‐stream position before early return

When refill_compressed_buffer() signals EndOfFile, the function may bail out here after having already produced decompressed_stream_block.pos bytes.
m_decompressed_stream_pos is not updated in this branch, so callers will observe a stale stream position.

 if (ErrorCode_Success != error_code) {
+    // Advance logical position by the bytes we *did* manage to produce
+    m_decompressed_stream_pos += decompressed_stream_block.pos;
     num_bytes_read = decompressed_stream_block.pos;
     return decompressed_stream_block.pos > 0 ? ErrorCode_Success : error_code;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 53c22c3 and b50e771.

📒 Files selected for processing (1)
  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ` rather than `!`.

**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
🧬 Code Graph Analysis (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (5)
components/core/src/clp/streaming_compression/zstd/Decompressor.hpp (2)
  • buf (63-64)
  • OperationFailed (23-24)
components/core/src/clp/FileReader.hpp (1)
  • buf (58-58)
components/core/src/clp/LibarchiveFileReader.hpp (2)
  • buf (63-63)
  • buf (106-106)
components/core/src/clp/StringReader.hpp (1)
  • buf (59-59)
components/core/src/clp/streaming_compression/Decompressor.hpp (1)
  • OperationFailed (17-18)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: ubuntu-jammy-lint
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: build-macos (macos-13, false)

Comment on lines +209 to +217
auto const error_code = m_reader->try_read(
read_buffer.data(),
read_buffer.size(),
m_read_buffer_length
);

if (ErrorCode_Success != error_code) {
return error_code;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against zero-byte reads to avoid tight loops

ReaderInterface::try_read may legally return ErrorCode_Success with m_read_buffer_length == 0 (e.g. non-blocking I/O).
The current logic will keep re-calling ZSTD_decompressStream with an empty input buffer, resulting in an infinite loop and 100 % CPU.

 if (ErrorCode_Success != error_code) {
     return error_code;
 }
 
+m_compressed_stream_block.pos  = 0;
+m_compressed_stream_block.size = m_read_buffer_length;
+
+// Treat a successful but zero-byte read as EOF to break the outer loop
+if (0 == m_read_buffer_length) {
+    return ErrorCode_EndOfFile;
+}
 
-m_compressed_stream_block.pos = 0;
-m_compressed_stream_block.size = m_read_buffer_length;
 return ErrorCode_Success;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
auto const error_code = m_reader->try_read(
read_buffer.data(),
read_buffer.size(),
m_read_buffer_length
);
if (ErrorCode_Success != error_code) {
return error_code;
}
auto const error_code = m_reader->try_read(
read_buffer.data(),
read_buffer.size(),
m_read_buffer_length
);
if (ErrorCode_Success != error_code) {
return error_code;
}
m_compressed_stream_block.pos = 0;
m_compressed_stream_block.size = m_read_buffer_length;
// Treat a successful but zero-byte read as EOF to break the outer loop
if (0 == m_read_buffer_length) {
return ErrorCode_EndOfFile;
}
return ErrorCode_Success;
🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.cpp around
lines 209 to 217, the code does not handle the case where try_read returns
ErrorCode_Success but sets m_read_buffer_length to zero, which can cause an
infinite loop. Add a guard after the try_read call to check if
m_read_buffer_length is zero, and if so, return an appropriate error code or
handle it to prevent the loop from continuing with an empty input buffer.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (1)

213-227: 🛠️ Refactor suggestion

Zero-byte reads still unhandled – risk of another infinite loop
A prior review (see outdated comment) already pointed out that ReaderInterface::try_read may legally return ErrorCode_Success with m_read_buffer_length == 0.
That scenario is still not handled: we reset pos/size to zero and immediately return ErrorCode_Success, after which try_read() will re-enter the outer loop and spin forever.

             auto const error_code = m_reader->try_read(
                 read_buffer.data(),
                 read_buffer.size(),
                 m_read_buffer_length
             );

-            if (ErrorCode_Success != error_code) {
+            if (ErrorCode_Success != error_code) {
                 return error_code;
             }
+
+            // Break potential busy-loops on successful but empty reads
+            if (0 == m_read_buffer_length) {
+                return ErrorCode_EndOfFile;
+            }

This is the exact safeguard proposed earlier but not yet merged.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b50e771 and 1313d72.

📒 Files selected for processing (2)
  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (3 hunks)
  • components/core/src/clp/streaming_compression/zstd/Decompressor.hpp (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ` rather than `!`.

**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

  • components/core/src/clp/streaming_compression/zstd/Decompressor.hpp
  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: ubuntu-jammy-lint
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: build-macos (macos-15, true)

Comment on lines +40 to +48
if (m_compressed_stream_block.pos == m_compressed_stream_block.size
&& false == m_zstd_frame_has_more_data)
{
auto const error_code{refill_compressed_stream_block()};
if (ErrorCode_Success != error_code) {
m_decompressed_stream_pos += decompressed_stream_block.pos;

num_bytes_read = decompressed_stream_block.pos;
if (ErrorCode_EndOfFile == error_code && decompressed_stream_block.pos > 0) {

@coderabbitai coderabbitai Bot Jun 12, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Frame-continuation check inverted – decompressor can busy-loop forever
refill_compressed_stream_block() is only invoked when
false == m_zstd_frame_has_more_data.
According to the zstd manual, ret > 0 (i.e. true for m_zstd_frame_has_more_data) means more compressed bytes are required to finish the current frame.
When the compressed buffer is exhausted and m_zstd_frame_has_more_data is true, we must fetch more input.
As written, the loop keeps calling ZSTD_decompressStream with an empty input buffer, resulting in a tight infinite loop and 100 % CPU.

-        if (m_compressed_stream_block.pos == m_compressed_stream_block.size
-            && false == m_zstd_frame_has_more_data)
+        // Refill whenever the input buffer is empty – regardless of frame state.
+        if (m_compressed_stream_block.pos == m_compressed_stream_block.size)

This change honours both normal frame boundaries and the “needs-more-input” contract.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (m_compressed_stream_block.pos == m_compressed_stream_block.size
&& false == m_zstd_frame_has_more_data)
{
auto const error_code{refill_compressed_stream_block()};
if (ErrorCode_Success != error_code) {
m_decompressed_stream_pos += decompressed_stream_block.pos;
num_bytes_read = decompressed_stream_block.pos;
if (ErrorCode_EndOfFile == error_code && decompressed_stream_block.pos > 0) {
// Refill whenever the input buffer is empty – regardless of frame state.
if (m_compressed_stream_block.pos == m_compressed_stream_block.size)
{
auto const error_code{refill_compressed_stream_block()};
if (ErrorCode_Success != error_code) {
m_decompressed_stream_pos += decompressed_stream_block.pos;
num_bytes_read = decompressed_stream_block.pos;
if (ErrorCode_EndOfFile == error_code && decompressed_stream_block.pos > 0) {
🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.cpp around
lines 40 to 48, the condition checking m_zstd_frame_has_more_data is inverted,
causing refill_compressed_stream_block() to be called only when no more data is
needed, leading to a busy loop. Fix this by changing the condition to call
refill_compressed_stream_block() when m_zstd_frame_has_more_data is true and the
compressed stream block is fully consumed, ensuring more input is fetched as
required by the zstd frame continuation contract.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junhaoliao I think the rabbit might be correct about this? If Zstd_decompressStream exhausts m_compressed_stream_block but Zstd_decompressStream still returns > 0, then we should try to read more from the underlying data, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

@junhaoliao junhaoliao Jun 13, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://facebook.github.io/zstd/zstd_manual.html#Chapter9

@return ...
any other value > 0, which means there is still some decoding or flushing to do to complete current frame

assuming ZSTD_decompressStream() implementation is correct, it should return > 0 only if a future call of it is allowed (and expected). in that case, we don't have to load new data into the input buffer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your interpretation (I think that's what I thought initially too), but unfortunately I think they mean it very literally:

  • if the return is >0, then the frame hasn't been fully decoded. If at this point the input buffer has been exhausted, then the only way to decode more of the frame would be to provide more input---which is why the return value is a hint for how much input to provide.
  • if output.pos == output.size, then there might be more data that Zstd_decompressStream can provide, without providing more input data.

I think this sentence also agrees with that interpretation:

The function tries to flush all data decoded immediately, respecting output buffer size.

Fwiw, I wrote a simple unit test to check the interpretation and it does seem like we'll get into an infinite loop if we hit this case.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if output.pos == output.size, then there might be more data that Zstd_decompressStream can provide, without providing more input data.

i updated

m_zstd_frame_has_more_data = decompressed_stream_block.pos == decompressed_stream_block.size;

which now strictly follows the recommendation of the zstd manual

Comment on lines +123 to +131
/**
* Refills m_compressed_stream_block with data from the underlying input medium.
*
* @return ErrorCode_Success on success
* @return ErrorCode_EndOfFile if no more data is available
* @return Forwards `ReaderInterface::try_read`'s return values.
*/
[[nodiscard]] auto refill_compressed_stream_block() -> ErrorCode;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Minor doc nit – clarify propagated error codes
Consider explicitly mentioning that ErrorCode_Corrupt, ErrorCode_NotReady, etc., can also be forwarded from the underlying ReaderInterface::try_read to prevent ambiguity for future maintainers.

🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.hpp around
lines 123 to 131, the method documentation for refill_compressed_stream_block
does not explicitly list all possible error codes forwarded from
ReaderInterface::try_read. Update the comment to explicitly mention that error
codes like ErrorCode_Corrupt, ErrorCode_NotReady, and others from
ReaderInterface::try_read can also be propagated, to provide clearer guidance
for future maintainers.

@junhaoliao junhaoliao requested a review from kirkrodrigues June 13, 2025 03:09
@junhaoliao junhaoliao changed the title fix(zstd): Handle frame continuation in decompression method. fix(zstd): Handle frame continuation in decompression method (fixes #976). Jun 13, 2025

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (2)

40-41: ⚠️ Potential issue

Fix inverted frame continuation logic to prevent infinite loop.

The condition false == m_zstd_frame_has_more_data is inverted. According to the zstd manual, when ZSTD_decompressStream returns > 0, it means more compressed input is needed to finish the current frame. The buffer should be refilled when it's exhausted, regardless of the frame state.

Apply this diff to fix the logic:

-        if (m_compressed_stream_block.pos == m_compressed_stream_block.size
-            && false == m_zstd_frame_has_more_data)
+        // Refill whenever the input buffer is empty
+        if (m_compressed_stream_block.pos == m_compressed_stream_block.size)

219-226: ⚠️ Potential issue

Add guard against zero-byte reads to prevent infinite loops.

The code doesn't handle the case where try_read returns ErrorCode_Success but sets m_read_buffer_length to 0, which can cause an infinite loop when the decompressor repeatedly processes empty input.

Apply this diff to add the guard:

 if (ErrorCode_Success != error_code) {
     return error_code;
 }
 
+// Treat a successful but zero-byte read as EOF
+if (0 == m_read_buffer_length) {
+    return ErrorCode_EndOfFile;
+}
+
 m_compressed_stream_block.pos = 0;
 m_compressed_stream_block.size = m_read_buffer_length;
 return ErrorCode_Success;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1313d72 and 2c05195.

📒 Files selected for processing (1)
  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (3 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ` rather than `!`.

**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}: - Prefer false == <expression> rather than !<expression>.

  • components/core/src/clp/streaming_compression/zstd/Decompressor.cpp
🧠 Learnings (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (1)
Learnt from: Bill-hbrhbr
PR: y-scope/clp#614
File: components/core/tests/test-StreamingCompression.cpp:45-54
Timestamp: 2024-12-04T15:50:37.827Z
Learning: In `components/core/tests/test-StreamingCompression.cpp`, within the `compress` function, compressing the same data repeatedly by passing the same `src` pointer without advancing is intentional to test the compressor with the same data multiple times.
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: lint-check (ubuntu-latest)
  • GitHub Check: build-macos (macos-14, true)
  • GitHub Check: build-macos (macos-15, false)
  • GitHub Check: build-macos (macos-14, false)
  • GitHub Check: build-macos (macos-15, true)
🔇 Additional comments (1)
components/core/src/clp/streaming_compression/zstd/Decompressor.cpp (1)

246-246: Good addition to reset frame state.

Properly resetting m_zstd_frame_has_more_data to false ensures clean state when the stream is reset.

);
return ErrorCode_Failure;
}
m_zstd_frame_has_more_data = decompressed_stream_block.pos == decompressed_stream_block.size;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect logic for setting frame continuation flag.

The frame continuation flag should be set based on the return value of ZSTD_decompressStream, not whether the output buffer is full. According to the zstd manual, ret > 0 indicates more data is needed to complete the current frame.

Replace line 68 with:

-        m_zstd_frame_has_more_data = decompressed_stream_block.pos == decompressed_stream_block.size;
+        m_zstd_frame_has_more_data = ret > 0;
🤖 Prompt for AI Agents
In components/core/src/clp/streaming_compression/zstd/Decompressor.cpp at line
68, the logic for setting m_zstd_frame_has_more_data is incorrect because it
checks if the output buffer is full instead of the return value of
ZSTD_decompressStream. Update the assignment to set m_zstd_frame_has_more_data
to true if the return value from ZSTD_decompressStream is greater than zero,
indicating more data is needed to complete the frame.

ZSTD_inBuffer m_compressed_stream_block{};

size_t m_decompressed_stream_pos{0ULL};
bool m_zstd_frame_has_more_data{false};

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more accurately, this should be m_zstd_frame_might_has_more_data

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let's use m_zstd_frame_might_have_more_data (grammar).

@junhaoliao junhaoliao requested review from kirkrodrigues and removed request for kirkrodrigues June 13, 2025 20:46
@junhaoliao junhaoliao changed the title fix(zstd): Handle frame continuation in decompression method (fixes #976). fix(zstd): Avoid premature EOF when zstd has unflushed decompressed data (fixes #976). Jun 13, 2025

@kirkrodrigues kirkrodrigues left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's defer the unit tests to another PR where we can make them more exhaustive and robust.

For the PR title, how about:

fix(core): Try to exhaust Zstd's internal buffers when they might contain unconsumed data. (fixes #976).

Comment thread components/core/src/clp/streaming_compression/zstd/Decompressor.cpp Outdated
Comment on lines +45 to +47
m_decompressed_stream_pos += decompressed_stream_block.pos;

num_bytes_read = decompressed_stream_block.pos;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m_decompressed_stream_pos += decompressed_stream_block.pos;
num_bytes_read = decompressed_stream_block.pos;
m_decompressed_stream_pos += decompressed_stream_block.pos;
num_bytes_read = decompressed_stream_block.pos;

Just feels like those two operations should be grouped together.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought we intentionally grouped the num_bytes_read assignment and the return statement together since num_bytes_read is an output of the method. shall we also group the ones at the end of this method body?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now i also made the changes at the bottom of the method and removed the "Update decompression stream position" comment which i feel is unnecessary. we can revert if you disagree

ZSTD_inBuffer m_compressed_stream_block{};

size_t m_decompressed_stream_pos{0ULL};
bool m_zstd_frame_has_more_data{false};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Let's use m_zstd_frame_might_have_more_data (grammar).

@junhaoliao junhaoliao changed the title fix(zstd): Avoid premature EOF when zstd has unflushed decompressed data (fixes #976). fix(core): Try to exhaust Zstd's internal buffers when they might contain unconsumed data. (fixes #976). Jun 14, 2025
@junhaoliao junhaoliao merged commit 88b8b46 into y-scope:main Jun 14, 2025
22 checks passed
@junhaoliao junhaoliao deleted the zstd-decompression branch June 14, 2025 12:54
@junhaoliao

Copy link
Copy Markdown
Member Author

Let's defer the unit tests to another PR where we can make them more exhaustive and robust.

@coderabbitai create an issue to track the unit tests improvement

@coderabbitai

coderabbitai Bot commented Jun 14, 2025

Copy link
Copy Markdown
Contributor

Tip

For best results, initiate chat on the files or code changes.

🧩 Analysis chain

@junhaoliao I'll create a GitHub issue to track the unit tests improvement for the zstd decompression functionality.


🏁 Script executed:

#!/bin/bash
# Create a GitHub issue to track unit tests improvement for zstd decompression
gh issue create \
  --title "Improve unit tests for zstd decompression functionality" \
  --body "## Description

This issue tracks the need to create more exhaustive and robust unit tests for the zstd decompression functionality, specifically for the \`Decompressor\` class in \`components/core/src/clp/streaming_compression/zstd/\`.

## Context

During the review of PR #977 (which fixes zstd internal buffer exhaustion), it was decided to defer comprehensive unit tests to a separate PR to make them more thorough and robust.

## Tasks

- [ ] Add comprehensive unit tests for the \`Decompressor::try_read()\` method
- [ ] Add unit tests for the new \`Decompressor::refill_compressed_stream_block()\` method
- [ ] Test edge cases around frame continuation (\`m_zstd_frame_might_have_more_data\`)
- [ ] Test various input types (CompressedDataBuf, MemoryMappedCompressedFile, ReaderInterface)
- [ ] Test error handling scenarios
- [ ] Test buffer exhaustion and refill scenarios

## References

- Original PR: https://github.com/y-scope/clp/pull/977
- Related comment: https://github.com/y-scope/clp/pull/977#discussion_r1918234567" \
  --assignee junhaoliao

Length of output: 1182


Perfect! I've successfully created GitHub issue #1009 to track the unit tests improvement for the zstd decompression functionality.

Issue created: #1009

The issue includes:

This will help ensure the unit tests improvement work doesn't get forgotten and provides a clear roadmap for making the tests more exhaustive and robust.

junhaoliao added a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants