fix: support ZSTD-compressed IR and fix filteredLogEvents_ lifecycle.#36
Conversation
WalkthroughAdds a zstd decompression wrapper and an explicit in-memory filtered log-event buffer to Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Caller
participant Cursor as ClpIrCursor
participant RawReader as Raw IR Reader
participant Zstd as Zstd Decompressor
participant Handler as ClpIrUnitHandler
participant Deser as IR Deserializer
participant Buf as filteredLogEvents_ (shared)
Caller->>Cursor: loadSplit(split)
Cursor->>Buf: create shared filteredLogEvents_
Cursor->>Handler: construct with Buf
Cursor->>RawReader: open IR stream
Cursor->>Zstd: wrap RawReader -> irReaderZstdWrapper_
Cursor->>Deser: create deserializer using irReaderZstdWrapper_
Note over Cursor,Deser: on failure -> detailed log
Caller->>Cursor: deserialize()
Cursor->>Buf: clear buffer
Zstd->>Deser: provide decompressed bytes
Deser-->>Handler: emit decoded IR units into Handler/Buf
Cursor->>Buf: build vectors from buffer
Cursor-->>Caller: return vectors / row counts
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (3)
velox/connectors/clp/tests/examples/test_1_ir.clp.zstis excluded by!**/*.zstvelox/connectors/clp/tests/examples/test_2_ir.clp.zstis excluded by!**/*.zstvelox/connectors/clp/tests/examples/test_4_ir.clp.zstis excluded by!**/*.zst
📒 Files selected for processing (4)
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp(6 hunks)velox/connectors/clp/search_lib/ir/ClpIrCursor.h(2 hunks)velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h(1 hunks)velox/connectors/clp/tests/ClpConnectorTest.cpp(8 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T18:00:51.241Z
Learnt from: anlowee
PR: y-scope/velox#25
File: velox/connectors/clp/search_lib/archive/ClpArchiveCursor.h:36-39
Timestamp: 2025-08-20T18:00:51.241Z
Learning: In ClpArchiveCursor.h, the clp_s::InputSource type is available through transitive inclusion via BaseClpCursor.h, which includes "clp_s/InputConfig.hpp" where InputSource is defined. The constructor parameter type is properly accessible without needing direct inclusion.
Applied to files:
velox/connectors/clp/search_lib/ir/ClpIrCursor.h
🧬 Code graph analysis (1)
velox/connectors/clp/search_lib/ir/ClpIrCursor.h (1)
velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1)
ClpIrUnitHandler(29-48)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build with GCC / Ubuntu release with resolve_dependency
🔇 Additional comments (12)
velox/connectors/clp/tests/ClpConnectorTest.cpp (2)
156-156: LGTM! Test inputs updated to use ZSTD-compressed IR files.The test file paths have been updated from
.clpsto.clp.zstformat across all IR-based tests, correctly reflecting the new ZSTD compression support.Also applies to: 204-204, 263-263, 320-320, 381-381, 436-436, 467-467
448-449: Helpful clarification added.The comment appropriately explains the expected test behavior regarding NULL handling for unsupported date strings.
velox/connectors/clp/search_lib/ir/ClpIrCursor.h (2)
19-19: LGTM! ZSTD decompressor support added.The include for the ZSTD decompressor is appropriate for the new decompression functionality.
76-78: LGTM! Lifecycle fix implemented correctly.The addition of
filteredLogEvents_as a member ofClpIrCursoraddresses the lifecycle mismatch described in the PR objectives. TheirReaderZstdWrapper_member enables ZSTD-compressed IR stream handling. Both members are appropriately scoped as private.Also applies to: 84-85
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (7)
56-56: LGTM! Lifecycle fix applied.Correctly uses the
filteredLogEvents_member owned byClpIrCursorrather than accessing through the handler, consistent with the lifecycle fix.
63-68: LGTM! Improved bounds checking.The change from
VELOX_CHECK_LTtoVELOX_CHECK_LEis appropriate as the resolved node-id map size should be allowed to equal the projected columns size. The enhanced error message provides better diagnostics.
77-79: LGTM! Lifecycle ownership transferred correctly.The
filteredLogEvents_buffer is now initialized and owned byClpIrCursor, then passed toClpIrUnitHandler, resolving the lifecycle mismatch described in the PR objectives.
106-116: LGTM! Enhanced error handling added.The additional error checking and logging when deserializer creation fails improves debuggability. The use of
irReaderZstdWrapper_enables ZSTD decompression support.
164-164: LGTM! Consistent with lifecycle fix.Correctly clears the
filteredLogEvents_member owned byClpIrCursorrather than accessing through the handler.
168-168: LGTM! ZSTD decompression integrated.Correctly reads from the ZSTD-wrapped reader to support compressed IR files.
218-218: LGTM! Vector loader uses correct buffer.The
ClpIrVectorLoadernow receivesfilteredLogEvents_directly, consistent with the lifecycle fix.velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1)
41-48: Incorrect thread safety concern—sequential access pattern confirmed.The code follows a single-threaded sequential pattern:
filteredLogEvents_is created inClpIrCursor::createVector()(line 77-78)- A single
ClpIrUnitHandlerwrites duringdeserialize()(lines 164-167)ClpIrVectorLoaderinstances read only after deserialization completes (line 218)The
shared_ptrenables passing the container to multiple components, not concurrent access. No threading constructs were found in the codebase.Likely an incorrect or invalid review comment.
| irReaderZstdWrapper_ = | ||
| std::make_shared<::clp::streaming_compression::zstd::Decompressor>(); | ||
| constexpr size_t cReaderBufferSize{64L * 1024L}; | ||
| irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize); | ||
| if (nullptr == irReaderZstdWrapper_) { | ||
| VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_ | ||
| << "\" for reading."; | ||
| return ErrorCode::InternalError; | ||
| } |
There was a problem hiding this comment.
Fix null-check logic error.
The null check on line 99 occurs after calling open() on line 98, but irReaderZstdWrapper_ is initialized via make_shared on line 96 and will never be null at line 99. The check should validate the result of the open() operation instead.
Apply this diff to fix the validation:
irReaderZstdWrapper_ =
std::make_shared<::clp::streaming_compression::zstd::Decompressor>();
constexpr size_t cReaderBufferSize{64L * 1024L};
- irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
- if (nullptr == irReaderZstdWrapper_) {
+ auto openResult = irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
+ if (openResult.has_error()) {
VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_
- << "\" for reading.";
+ << "\" for reading: " << openResult.error().message();
return ErrorCode::InternalError;
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp around lines 95 to 103,
the current null check is validating irReaderZstdWrapper_ after it was created
with make_shared (so it can never be null) instead of checking whether open()
succeeded; change the code to capture the result of
irReaderZstdWrapper_->open(...) (or catch exceptions if open() throws) and
validate that result, logging the failure and returning ErrorCode::InternalError
when open fails, rather than checking the shared_ptr for null.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (1)
93-101: Fix logic error: validateopen()result instead of checking for null.The null check on line 96 is incorrect because
std::make_sharednever returns null. More critically, theopen()call on line 101 likely returns a result that should be validated, but it's currently ignored.Apply this diff to properly validate the decompressor initialization:
irReaderZstdWrapper_ = std::make_shared<::clp::streaming_compression::zstd::Decompressor>(); constexpr size_t cReaderBufferSize{64L * 1024L}; - if (nullptr == irReaderZstdWrapper_) { + auto openResult = irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize); + if (openResult.has_error()) { VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_ - << "\" for reading."; + << "\" for reading: " << openResult.error().message(); return ErrorCode::InternalError; } - irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp(5 hunks)velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build with GCC / Ubuntu release with resolve_dependency
🔇 Additional comments (6)
velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp (5)
61-66: LGTM: Relaxed validation is appropriate.Changing from
VELOX_CHECK_EQtoVELOX_CHECK_LEappropriately allows scenarios where not all output columns have resolved node IDs, which is valid when some columns are unresolved.
75-77: LGTM: Lifecycle issue properly addressed.The explicit initialization and ownership transfer of
filteredLogEvents_to the cursor (while sharing with the handler) correctly addresses the lifecycle issue described in the PR objectives.
103-114: LGTM: Enhanced error handling and correct wrapper usage.The deserializer correctly uses the zstd-wrapped reader and provides detailed error logging. The fallback logging at lines 112-113 is technically unreachable if
make_deserializeralways sets an error when returning an empty result, but keeping it as defensive programming is acceptable.
166-166: LGTM: Correctly reads from zstd-wrapped reader.The deserializer correctly reads from
irReaderZstdWrapper_to support ZSTD-compressed IR files as intended by this PR.
216-216: LGTM: Correctly uses cursor-owned log events buffer.The vector loader correctly receives
filteredLogEvents_from the cursor, aligning with the lifecycle fix. However, this inherits the null-safety concern if called before successful initialization.velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h (1)
31-38: LGTM: Proper null validation prevents undefined behaviour.The constructor correctly validates that
filteredLogEventsis non-null before storing it, preventing potential null dereferences inhandle_log_eventat line 47. This addresses the concern raised in previous reviews.
|
|
||
| size_t ClpIrCursor::getNumFilteredRows() const { | ||
| return irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()->size(); | ||
| return filteredLogEvents_->size(); |
There was a problem hiding this comment.
Add null safety check before dereferencing filteredLogEvents_.
If getNumFilteredRows() is called before loadSplit() completes successfully, filteredLogEvents_ will be null, causing undefined behaviour.
Apply this diff to add a safety check:
size_t ClpIrCursor::getNumFilteredRows() const {
+ if (nullptr == filteredLogEvents_) {
+ return 0;
+ }
return filteredLogEvents_->size();
}🤖 Prompt for AI Agents
In velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp around line 54,
getNumFilteredRows() dereferences filteredLogEvents_ without checking for null;
add a null-safety check so that if filteredLogEvents_ is nullptr (e.g., called
before loadSplit() succeeded) the function returns 0 (or an appropriate default)
instead of accessing size(), and optionally log or DCHECK to signal the
unexpected state.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
velox/connectors/clp/tests/ClpConnectorTest.cpp(8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build with GCC / Ubuntu release with resolve_dependency
🔇 Additional comments (1)
velox/connectors/clp/tests/ClpConnectorTest.cpp (1)
156-158: Updated fixtures target the .clp.zst IR assetsNice job pointing every IR split at the compressed fixtures so the tests now exercise the ZSTD pipeline end-to-end.
Also applies to: 204-206, 263-265, 320-322, 381-383, 436-438, 467-469
| // Only the second event meet the condition, the first event is a date string | ||
| // which is not supported yet so the value will be NULL. |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Tweak comment grammar
meet → meets keeps the note polished.
🤖 Prompt for AI Agents
In velox/connectors/clp/tests/ClpConnectorTest.cpp around lines 448 to 449, the
inline comment uses incorrect verb agreement ("meet"); update the comment text
to use "meets" so it reads: "Only the second event meets the condition, the
first event is a date string which is not supported yet so the value will be
NULL."
| const TypePtr& vectorType, | ||
| size_t vectorSize) { | ||
| VELOX_CHECK_EQ( | ||
| VELOX_CHECK_LE( |
There was a problem hiding this comment.
Right, the other PR already merged.
| } | ||
|
|
||
| TEST_F(ClpConnectorTest, test4IrTimestampPushdown) { | ||
| // Only the second event meet the condition, the first event is a date string |
There was a problem hiding this comment.
Do we have the original JSON log file? And why the date string is not supported?
There was a problem hiding this comment.
This is previously discussed with @kirkrodrigues . Now we only support numeric types (int, float) of timestamps for IR stream. In IR stream, if a log event's timestamp is a datestring, it will simply be skipped.
But for SFA, we support datestring.
There was a problem hiding this comment.
For the first question, yea we have a test_4.ndjson:
{"timestamp": "2025-04-30T08:50:05.000Z"}
{"timestamp": 1746003005000000}
{"timestamp": 1766003005000000}
| auto irPath = Path{.source = inputSource_, .path = splitPath_}; | ||
| irReader_ = try_create_reader(irPath, networkAuthOption); | ||
| if (nullptr == irReader_) { | ||
| irReaderZstdWrapper_ = |
|
|
||
| auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer( | ||
| *irReader_, std::move(irHandler), std::move(queryHandler)); | ||
| *irReaderZstdWrapper_, irHandler, std::move(queryHandler)); |
| auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer( | ||
| *irReader_, std::move(irHandler), std::move(queryHandler)); | ||
| *irReaderZstdWrapper_, irHandler, std::move(queryHandler)); | ||
| if (!deserializerResult) { |
There was a problem hiding this comment.
Do we need this line? If it's nullptr, I don't think we can call has_error?
| bool ignoreCase_; | ||
| std::shared_ptr< | ||
| ::clp::ffi::ir_stream::Deserializer<ClpIrUnitHandler, QueryHandlerType>> | ||
| irDeserializer_; |
Description
Since all current sources generating IR files also compress the IR files with ZSTD, this PR wraps the IR reader with a ZSTD decompressor so that it can directly search on ZSTD-compressed IR files.
While doing E2E testing, the
filteredLogEvents_which was originally owned byClpIrUnitHandlerhad a shorter life cycle thenClpIrCursor, this PR also transfers the ownership fromClpIrUnitHandlertoClpIrCursor.Checklist
breaking change.
Validation performed
Passed the CI.
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Refactor