Skip to content

fix: support ZSTD-compressed IR and fix filteredLogEvents_ lifecycle.#36

Merged
anlowee merged 8 commits into
y-scope:presto-0.293-clp-connectorfrom
anlowee:xwei/zstd-wrapper
Oct 9, 2025
Merged

fix: support ZSTD-compressed IR and fix filteredLogEvents_ lifecycle.#36
anlowee merged 8 commits into
y-scope:presto-0.293-clp-connectorfrom
anlowee:xwei/zstd-wrapper

Conversation

@anlowee

@anlowee anlowee commented Sep 29, 2025

Copy link
Copy Markdown

Description

Since all current sources generating IR files also compress the IR files with ZSTD, this PR wraps the IR reader with a ZSTD decompressor so that it can directly search on ZSTD-compressed IR files.

While doing E2E testing, the filteredLogEvents_ which was originally owned by ClpIrUnitHandler had a shorter life cycle then ClpIrCursor, this PR also transfers the ownership from ClpIrUnitHandler to ClpIrCursor.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

Passed the CI.

Summary by CodeRabbit

  • New Features

    • Added support for reading Zstandard-compressed IR files and using a decompressed IR stream.
  • Bug Fixes

    • Improved deserialization reliability with enhanced error logging and safer validation.
    • More accurate filtered-event counting and reduced failures when opening/processing IR streams.
  • Tests

    • Updated tests to use compressed IR inputs and validate behaviour.
  • Refactor

    • Reworked stream and in-memory event handling for clearer flow and stability.

@coderabbitai

coderabbitai Bot commented Sep 29, 2025

Copy link
Copy Markdown

Walkthrough

Adds a zstd decompression wrapper and an explicit in-memory filtered log-event buffer to ClpIrCursor. ClpIrUnitHandler now requires an externally provided log-event container and its public accessors were removed. Deserialization and vector loading were updated to use the decompressed stream and shared buffer; tests now reference .clp.zst inputs.

Changes

Cohort / File(s) Summary of changes
IR cursor: zstd integration & filtered log-events
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp, velox/connectors/clp/search_lib/ir/ClpIrCursor.h
Adds irReaderZstdWrapper_ (zstd Decompressor wrapper) and filteredLogEvents_ (shared_ptr vector buffer); switches deserialization and reads to use the decompressor wrapper; initializes ClpIrUnitHandler with the shared buffer; clears/loads from the new buffer; improves deserializer error logging.
IR unit handler API change
velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h
Replaces default constructor with one that accepts a shared_ptr<vector<unique_ptr<::clp::ffi::KeyValuePairLogEvent>>> and checks non-null; removes public getFilteredLogEvents() and clearFilteredLogEvents() methods.
Tests: compressed IR inputs
velox/connectors/clp/tests/ClpConnectorTest.cpp
Updates test IR file paths from .clps to .clp.zst and adjusts a test comment; no functional test logic changes.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller
  participant Cursor as ClpIrCursor
  participant RawReader as Raw IR Reader
  participant Zstd as Zstd Decompressor
  participant Handler as ClpIrUnitHandler
  participant Deser as IR Deserializer
  participant Buf as filteredLogEvents_ (shared)

  Caller->>Cursor: loadSplit(split)
  Cursor->>Buf: create shared filteredLogEvents_
  Cursor->>Handler: construct with Buf
  Cursor->>RawReader: open IR stream
  Cursor->>Zstd: wrap RawReader -> irReaderZstdWrapper_
  Cursor->>Deser: create deserializer using irReaderZstdWrapper_
  Note over Cursor,Deser: on failure -> detailed log

  Caller->>Cursor: deserialize()
  Cursor->>Buf: clear buffer
  Zstd->>Deser: provide decompressed bytes
  Deser-->>Handler: emit decoded IR units into Handler/Buf
  Cursor->>Buf: build vectors from buffer
  Cursor-->>Caller: return vectors / row counts
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly and concisely highlights the two main changes in the pull request—adding support for ZSTD‐compressed IR and correcting the lifecycle of filteredLogEvents_—which aligns with the modifications to the IR reader and ownership transfer described in the PR.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 10508bd and 9fa199c.

📒 Files selected for processing (1)
  • velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (4 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 12521a1 and d0f5021.

⛔ Files ignored due to path filters (3)
  • velox/connectors/clp/tests/examples/test_1_ir.clp.zst is excluded by !**/*.zst
  • velox/connectors/clp/tests/examples/test_2_ir.clp.zst is excluded by !**/*.zst
  • velox/connectors/clp/tests/examples/test_4_ir.clp.zst is excluded by !**/*.zst
📒 Files selected for processing (4)
  • velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (6 hunks)
  • velox/connectors/clp/search_lib/ir/ClpIrCursor.h (2 hunks)
  • velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1 hunks)
  • velox/connectors/clp/tests/ClpConnectorTest.cpp (8 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T18:00:51.241Z
Learnt from: anlowee
PR: y-scope/velox#25
File: velox/connectors/clp/search_lib/archive/ClpArchiveCursor.h:36-39
Timestamp: 2025-08-20T18:00:51.241Z
Learning: In ClpArchiveCursor.h, the clp_s::InputSource type is available through transitive inclusion via BaseClpCursor.h, which includes "clp_s/InputConfig.hpp" where InputSource is defined. The constructor parameter type is properly accessible without needing direct inclusion.

Applied to files:

  • velox/connectors/clp/search_lib/ir/ClpIrCursor.h
🧬 Code graph analysis (1)
velox/connectors/clp/search_lib/ir/ClpIrCursor.h (1)
velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1)
  • ClpIrUnitHandler (29-48)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build with GCC / Ubuntu release with resolve_dependency
🔇 Additional comments (12)
velox/connectors/clp/tests/ClpConnectorTest.cpp (2)

156-156: LGTM! Test inputs updated to use ZSTD-compressed IR files.

The test file paths have been updated from .clps to .clp.zst format across all IR-based tests, correctly reflecting the new ZSTD compression support.

Also applies to: 204-204, 263-263, 320-320, 381-381, 436-436, 467-467


448-449: Helpful clarification added.

The comment appropriately explains the expected test behavior regarding NULL handling for unsupported date strings.

velox/connectors/clp/search_lib/ir/ClpIrCursor.h (2)

19-19: LGTM! ZSTD decompressor support added.

The include for the ZSTD decompressor is appropriate for the new decompression functionality.


76-78: LGTM! Lifecycle fix implemented correctly.

The addition of filteredLogEvents_ as a member of ClpIrCursor addresses the lifecycle mismatch described in the PR objectives. The irReaderZstdWrapper_ member enables ZSTD-compressed IR stream handling. Both members are appropriately scoped as private.

Also applies to: 84-85

velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (7)

56-56: LGTM! Lifecycle fix applied.

Correctly uses the filteredLogEvents_ member owned by ClpIrCursor rather than accessing through the handler, consistent with the lifecycle fix.


63-68: LGTM! Improved bounds checking.

The change from VELOX_CHECK_LT to VELOX_CHECK_LE is appropriate as the resolved node-id map size should be allowed to equal the projected columns size. The enhanced error message provides better diagnostics.


77-79: LGTM! Lifecycle ownership transferred correctly.

The filteredLogEvents_ buffer is now initialized and owned by ClpIrCursor, then passed to ClpIrUnitHandler, resolving the lifecycle mismatch described in the PR objectives.


106-116: LGTM! Enhanced error handling added.

The additional error checking and logging when deserializer creation fails improves debuggability. The use of irReaderZstdWrapper_ enables ZSTD decompression support.


164-164: LGTM! Consistent with lifecycle fix.

Correctly clears the filteredLogEvents_ member owned by ClpIrCursor rather than accessing through the handler.


168-168: LGTM! ZSTD decompression integrated.

Correctly reads from the ZSTD-wrapped reader to support compressed IR files.


218-218: LGTM! Vector loader uses correct buffer.

The ClpIrVectorLoader now receives filteredLogEvents_ directly, consistent with the lifecycle fix.

velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1)

41-48: Incorrect thread safety concern—sequential access pattern confirmed.

The code follows a single-threaded sequential pattern:

  1. filteredLogEvents_ is created in ClpIrCursor::createVector() (line 77-78)
  2. A single ClpIrUnitHandler writes during deserialize() (lines 164-167)
  3. ClpIrVectorLoader instances read only after deserialization completes (line 218)

The shared_ptr enables passing the container to multiple components, not concurrent access. No threading constructs were found in the codebase.

Likely an incorrect or invalid review comment.

Comment thread velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp Outdated
Comment on lines +95 to 103
irReaderZstdWrapper_ =
std::make_shared<::clp::streaming_compression::zstd::Decompressor>();
constexpr size_t cReaderBufferSize{64L * 1024L};
irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
if (nullptr == irReaderZstdWrapper_) {
VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_
<< "\" for reading.";
return ErrorCode::InternalError;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix null-check logic error.

The null check on line 99 occurs after calling open() on line 98, but irReaderZstdWrapper_ is initialized via make_shared on line 96 and will never be null at line 99. The check should validate the result of the open() operation instead.

Apply this diff to fix the validation:

   irReaderZstdWrapper_ =
       std::make_shared<::clp::streaming_compression::zstd::Decompressor>();
   constexpr size_t cReaderBufferSize{64L * 1024L};
-  irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
-  if (nullptr == irReaderZstdWrapper_) {
+  auto openResult = irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
+  if (openResult.has_error()) {
     VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_
-            << "\" for reading.";
+            << "\" for reading: " << openResult.error().message();
     return ErrorCode::InternalError;
   }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp around lines 95 to 103,
the current null check is validating irReaderZstdWrapper_ after it was created
with make_shared (so it can never be null) instead of checking whether open()
succeeded; change the code to capture the result of
irReaderZstdWrapper_->open(...) (or catch exceptions if open() throws) and
validate that result, logging the failure and returning ErrorCode::InternalError
when open fails, rather than checking the shared_ptr for null.

Comment thread velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (1)

93-101: Fix logic error: validate open() result instead of checking for null.

The null check on line 96 is incorrect because std::make_shared never returns null. More critically, the open() call on line 101 likely returns a result that should be validated, but it's currently ignored.

Apply this diff to properly validate the decompressor initialization:

   irReaderZstdWrapper_ =
       std::make_shared<::clp::streaming_compression::zstd::Decompressor>();
   constexpr size_t cReaderBufferSize{64L * 1024L};
-  if (nullptr == irReaderZstdWrapper_) {
+  auto openResult = irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
+  if (openResult.has_error()) {
     VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_
-            << "\" for reading.";
+            << "\" for reading: " << openResult.error().message();
     return ErrorCode::InternalError;
   }
-  irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d0f5021 and 1ebce8c.

📒 Files selected for processing (2)
  • velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (5 hunks)
  • velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build with GCC / Ubuntu release with resolve_dependency
🔇 Additional comments (6)
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (5)

61-66: LGTM: Relaxed validation is appropriate.

Changing from VELOX_CHECK_EQ to VELOX_CHECK_LE appropriately allows scenarios where not all output columns have resolved node IDs, which is valid when some columns are unresolved.


75-77: LGTM: Lifecycle issue properly addressed.

The explicit initialization and ownership transfer of filteredLogEvents_ to the cursor (while sharing with the handler) correctly addresses the lifecycle issue described in the PR objectives.


103-114: LGTM: Enhanced error handling and correct wrapper usage.

The deserializer correctly uses the zstd-wrapped reader and provides detailed error logging. The fallback logging at lines 112-113 is technically unreachable if make_deserializer always sets an error when returning an empty result, but keeping it as defensive programming is acceptable.


166-166: LGTM: Correctly reads from zstd-wrapped reader.

The deserializer correctly reads from irReaderZstdWrapper_ to support ZSTD-compressed IR files as intended by this PR.


216-216: LGTM: Correctly uses cursor-owned log events buffer.

The vector loader correctly receives filteredLogEvents_ from the cursor, aligning with the lifecycle fix. However, this inherits the null-safety concern if called before successful initialization.

velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1)

31-38: LGTM: Proper null validation prevents undefined behaviour.

The constructor correctly validates that filteredLogEvents is non-null before storing it, preventing potential null dereferences in handle_log_event at line 47. This addresses the concern raised in previous reviews.


size_t ClpIrCursor::getNumFilteredRows() const {
return irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()->size();
return filteredLogEvents_->size();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add null safety check before dereferencing filteredLogEvents_.

If getNumFilteredRows() is called before loadSplit() completes successfully, filteredLogEvents_ will be null, causing undefined behaviour.

Apply this diff to add a safety check:

 size_t ClpIrCursor::getNumFilteredRows() const {
+  if (nullptr == filteredLogEvents_) {
+    return 0;
+  }
   return filteredLogEvents_->size();
 }
🤖 Prompt for AI Agents
In velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp around line 54,
getNumFilteredRows() dereferences filteredLogEvents_ without checking for null;
add a null-safety check so that if filteredLogEvents_ is nullptr (e.g., called
before loadSplit() succeeded) the function returns 0 (or an appropriate default)
instead of accessing size(), and optionally log or DCHECK to signal the
unexpected state.

Comment thread velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp
@kirkrodrigues kirkrodrigues requested a review from wraymo October 2, 2025 14:07

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1ebce8c and 5cd496d.

📒 Files selected for processing (1)
  • velox/connectors/clp/tests/ClpConnectorTest.cpp (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build with GCC / Ubuntu release with resolve_dependency
🔇 Additional comments (1)
velox/connectors/clp/tests/ClpConnectorTest.cpp (1)

156-158: Updated fixtures target the .clp.zst IR assets

Nice job pointing every IR split at the compressed fixtures so the tests now exercise the ZSTD pipeline end-to-end.

Also applies to: 204-206, 263-265, 320-322, 381-383, 436-438, 467-469

Comment on lines +448 to +449
// Only the second event meet the condition, the first event is a date string
// which is not supported yet so the value will be NULL.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Tweak comment grammar

meetmeets keeps the note polished.

🤖 Prompt for AI Agents
In velox/connectors/clp/tests/ClpConnectorTest.cpp around lines 448 to 449, the
inline comment uses incorrect verb agreement ("meet"); update the comment text
to use "meets" so it reads: "Only the second event meets the condition, the
first event is a date string which is not supported yet so the value will be
NULL."

const TypePtr& vectorType,
size_t vectorSize) {
VELOX_CHECK_EQ(
VELOX_CHECK_LE(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed by the other PR?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the other PR already merged.

}

TEST_F(ClpConnectorTest, test4IrTimestampPushdown) {
// Only the second event meet the condition, the first event is a date string

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the original JSON log file? And why the date string is not supported?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is previously discussed with @kirkrodrigues . Now we only support numeric types (int, float) of timestamps for IR stream. In IR stream, if a log event's timestamp is a datestring, it will simply be skipped.

But for SFA, we support datestring.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first question, yea we have a test_4.ndjson:

{"timestamp": "2025-04-30T08:50:05.000Z"}
{"timestamp": 1746003005000000}
{"timestamp": 1766003005000000}

auto irPath = Path{.source = inputSource_, .path = splitPath_};
irReader_ = try_create_reader(irPath, networkAuthOption);
if (nullptr == irReader_) {
irReaderZstdWrapper_ =

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check irReader_?


auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer(
*irReader_, std::move(irHandler), std::move(queryHandler));
*irReaderZstdWrapper_, irHandler, std::move(queryHandler));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do std::move(irHandler)?

auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer(
*irReader_, std::move(irHandler), std::move(queryHandler));
*irReaderZstdWrapper_, irHandler, std::move(queryHandler));
if (!deserializerResult) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this line? If it's nullptr, I don't think we can call has_error?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of this?

bool ignoreCase_;
std::shared_ptr<
::clp::ffi::ir_stream::Deserializer<ClpIrUnitHandler, QueryHandlerType>>
irDeserializer_;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add nullptr?

@anlowee anlowee requested a review from wraymo October 8, 2025 15:20
@anlowee anlowee merged commit cf89596 into y-scope:presto-0.293-clp-connector Oct 9, 2025
6 checks passed
@anlowee anlowee deleted the xwei/zstd-wrapper branch October 9, 2025 17:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants