Optimize streaming DataFrame execution for 80% memory reduction#545
Merged
Conversation
…urces Previously, tail(n) on remote ClickHouse tables would download the entire table via _get_df(), making it O(N) for large tables. Now for positive n with SQL sources: - Uses count_rows() (SQL COUNT(*)) to get total rows efficiently - Uses offset(total - n).limit(n) to fetch only the last N rows - Falls back to pandas path for negative n or non-SQL sources Adds _can_use_sql_tail() helper to check if optimization is safe (requires SQL source and all lazy ops must be SQL-expressible). Adds 21 new tests covering tail correctness, edge cases, SQL optimization verification, and remote ClickHouse integration.
…wnload For SQL-backed DataStores (files, remote ClickHouse), these methods now use SQL queries instead of loading all data: - info(): Uses COUNT(*) + LIMIT 0 probe + COUNT(col) for non-null counts - describe(): Single SQL query with COUNT/avg/stddevSamp/min/max/quantileExactInclusive - sample(n): Uses ORDER BY rand() LIMIT n Falls back to pandas for unsupported cases (replace=True, weights, random_state, include='all').
…cates/memory_usage
…state
When use(database, table) was called, it set _connection_mode='table'
but didn't create _table_function or set table_name. This caused
_require_table_context() to pass while SQL generation failed due to
missing FROM clause source.
Add _bind_table_context() method that properly sets table_name and
creates _table_function for remote sources. Call this from use() when
a table argument is provided (2-arg and 3-arg cases).
Now ds.use('db', 'tbl').head() works correctly.
…pushdown helper - Fix drop_duplicates(subset=...) to generate LIMIT 1 BY in SQL instead of ignoring subset in SQL path - Add _distinct_subset field to track subset columns for SQL generation - Extract repeated has_non_sql_ops check to _can_sql_pushdown() helper - Add LazyDistinct to query planner's _can_push_op_to_sql() for SQL pushdown - Add 21 tests for SQL generation, correctness, and log verification
…ushdown check Fix ClickHouse SQL syntax by moving LIMIT 1 BY before LIMIT/OFFSET in: - core.py:_generate_select_sql() - sql_executor.py:build_simple_sql() - sql_executor.py:build_sql_with_row_order_subquery() - sql_executor.py:assemble_sql() Refactor memory_usage() to use existing _can_sql_pushdown() helper instead of duplicating the has_non_sql_ops check inline. Add 12 SQL verification tests for clause ordering, _can_sql_pushdown with LazyDistinct, and extended log capture scenarios.
Prevent unbounded ORDER BY from being pushed to SQL, which would force
remote servers to sort entire tables before returning results.
- ORDER BY only pushed to SQL when LIMIT follows in operation chain
- ORDER BY before GROUP BY with non-order-sensitive aggs (sum, mean, count)
is classified as pandas (semantically meaningless sort dropped)
- Fix groupby.agg('sum') string argument being passed as agg_dict
This optimization is critical for remote ClickHouse tables where
unbounded ORDER BY would be extremely expensive.
Fix bug where ds["users"] in database mode (after ds.use("mydb"))
fell through to column access instead of table selection. The inner
condition requiring dot notation or connection mode was too restrictive.
Add comprehensive tests for use() -> table() state management covering
all connection/database/table mode transitions and independence.
Add comprehensive tests verifying ClickHouse-specific data types preserve data through DataStore -> pandas DataFrame conversion: - Decimal(18,4) -> float64 (precision loss documented for large values) - FixedString(N) -> str with null padding - Enum8 -> str labels - UUID -> uuid.UUID objects - IPv4/IPv6 -> ipaddress objects - LowCardinality(String) -> transparent str - Nullable(Int64) -> pandas Int64 nullable dtype - DateTime64(3, 'UTC') -> datetime64[ns, UTC] with subsecond precision - UInt64 -> uint64 (no precision loss for values > 2^53) - Map(String, String) -> dict - Array(String) -> numpy array
…kHouse errors Add translate_remote_error() to convert raw chdb/ClickHouse exceptions into actionable user messages with hints for: authentication failures, unknown database/table, access denied, DNS errors, connection refused, timeouts, and SQL syntax errors. Applied at query execution points in connection.py and core.py. Includes 35 new tests covering all error categories.
Move 'import re' to top of file with other imports and add translate_remote_error, _extract_clickhouse_error_code, _extract_host_from_error to __all__ export list.
…e, date
Add _format_insert_value() to properly handle:
- float('nan')/np.nan/pd.NaT → NULL (was rendering as invalid SQL)
- datetime → quoted string '2024-01-15 10:30:00'
- date → quoted string '2024-01-15'
Consolidate formatting logic between _generate_insert_sql() and insert().
Add comprehensive test suite for INSERT INTO remote ClickHouse (32 tests).
Fix three critical bugs in _rewrite_table_references: - SQL keywords (JOIN, WHERE, UNION, etc.) incorrectly captured as table aliases - CTE references incorrectly rewritten as remote tables - Backtick-quoted identifiers not supported Add comprehensive _SQL_ALIAS_KEYWORDS list with negative lookahead to prevent keyword capture. Extract CTE names before rewriting to preserve references. Extend regex to match backtick-quoted table names. Add 46 tests covering JOINs, CTEs, subqueries, UNION, aliases, quoted identifiers, and complex analytics queries.
ORDER BY following GROUP BY operates on aggregated (small) result set, so SQL sort is cheap. Previously, ORDER BY without LIMIT was rejected from SQL pushdown, causing unnecessary segment boundaries and pandas fallback after groupby().agg().sort_values() chains. Add test_remote_lazy_chain_pushdown.py verifying complete SQL pushdown for multi-step lazy chains on remote ClickHouse.
Add 44 tests verifying mixed operations between SQL-backed and DataFrame-backed DataStores: - Cross-source merge (inner/left/right/outer, both directions) - DataFrame-only merge (inner, no-match, many-to-many, NaN keys) - Concat (two DataStores, explicit materialization, mixed DS+DF) - isin with local list (SQL table, DataFrame, empty, strings) - Chained operations (merge->filter, merge->assign->filter, etc.) - assign() + filter on SQL results - SQL table join operations - Complex workflows (filter->merge->assign->sort, isin->merge->groupby)
…te tables Replace manual `SELECT * FROM (subquery) LIMIT 0` probe in `__repr__` with existing optimized `dtypes` and `columns` properties, preventing hangs when working with remote ClickHouse tables.
Remove forbidden hasattr pattern in test_groupby_agg_sum and test_groupby_agg_mean per project rules that prohibit hasattr checks in test code.
Add comprehensive tests verifying operations on external data sources (MySQL, PostgreSQL, SQLite, remote ClickHouse, MongoDB, Redis) do not inject rowNumberInAllBlocks() and generate correct SQL for filter/sort/ agg chains. Covers preserves_row_order() behavior, SQL clause generation, and edge cases.
Add 18 tests for 100k+ row data to verify deterministic row order: - filter→assign→sort→head chain repeated 5x for consistency - dropna (non-contiguous index) + assign + groupby alignment - rank(method='first') tie-breaking matches pandas on large data - PythonTableFunction path vs direct execution path comparison
Fix count() to execute with LIMIT when head()/limit() is applied, matching pandas behavior. Previously count() unconditionally stripped LIMIT, causing incorrect results for ds.head(3).count(). Add 27 tests verifying: - COUNT subquery never contains redundant ORDER BY - count_rows() uses SQL COUNT(*) for pure SQL chains - PANDAS_FILTER chains fall back to DataFrame execution - Empty filter results return 0/zero Series - LIMIT + count_rows/count executes then counts
Add 50 tests verifying DataFrame operations with non-contiguous indices (step slicing, dropna, filter, sample) work correctly through chDB SQL execution. Tests cover filter+sort, assign, groupby+agg, and complex chained operations. Validates _prepare_df_for_chdb() properly resets and restores indices.
Add 29 new tests across 5 test classes to verify that pristine DataStore metadata access (.columns/.dtypes/.shape/.size/.empty) does not trigger full data loading: - TestZeroOverheadDataFrameSource: mock-verify _execute() not called - TestZeroOverheadFileSource: verify parquet/CSV metadata fast path - TestProbeDtypesLimit0: verify LIMIT 0 query generation - TestPostFilterMetadata: verify filtered DataStore uses execution path - TestRemoteDescribeNotSelectStar: verify DESCRIBE/schema() path for remote
Add comprehensive tests for ClickHouse Cloud connection auto-detection: - explicit secure=False override for cloud hosts - normalize_clickhouse_connection() idempotency (4 scenarios) - URI format parsing with cloud host detection - URI secure=false override for cloud hosts
- Fix var mapping: varPop → varSamp to match pandas sample variance (ddof=1) - Add nunique mapping to uniqExact for distinct count support - Add comprehensive test coverage for all groupby agg functions in both named_agg and dict_agg modes (60 new tests)
Add 30 tests covering: - JSON accessor chain determinism (extract + filter) - Nested subquery determinism (filter on computed columns) - Concat determinism matching pandas order - Large scale concat (1000+ rows) - UNION ALL source data determinism - Mixed operations chains All tests verify results are non-flaky by running 5x iterations.
Fix InCondition.to_sql() for pandas-compatible NULL semantics: - isin([]) -> '1=0' (always false), notin([]) -> '1=1' (always true) - isin([nan, 1, 2]) -> '(col IS NULL OR col IN (1,2))' - notin([nan, 1]) -> '(col IS NOT NULL AND col NOT IN (1))' - notin([1, 2]) -> '(col NOT IN (1,2) OR col IS NULL)' to include NULL rows Fix Literal(nan).to_sql() to generate 'NULL' instead of invalid 'nan'. Add comprehensive boundary tests for empty lists, NULL handling, single-row groupby, empty DataFrame operations, and all-NULL aggregation.
Add 125 exploratory tests covering under-tested pandas compatibility areas: select_dtypes, where/mask, query(), eval(), compare(), update(), nunique, cumsum/diff/pct_change, any/all, merge with indicator, pivot_table with margins, apply axis=1, rolling/expanding/ewm, melt/explode/transpose/isin, null handling, value_counts, describe, duplicates, empty/single-row DataFrames, groupby named agg, concat, head/tail/sample, rename/drop chains. Fix dtype mismatch when where/mask with other=None introduces NaN: pandas converts non-nullable int to float64, but chDB SQL returns nullable Int64. Now correctly converts to float64 only when input was non-nullable integer.
…tswith/endswith dtype Add 190 new exploratory tests covering: - rename, replace, nlargest/nsmallest, rank operations - cumulative ops with NaN, assign, select_dtypes, duplicated - value_counts, clip, diff, chained operations - all-NaN columns, single-row DataFrames, groupby with NaN keys - where/mask, string operations, iloc, head/tail edge cases - between, min/max, apply, concat, pivot_table Fix str.startswith() and str.endswith() to return bool dtype instead of uint8 by wrapping ClickHouse's startsWith/endsWith with toBool(), following the same pattern used by isna()/notna().
…y index Add comprehensive exploratory test batch 103 covering under-tested areas: - eval/query with complex expressions - clip boundary conditions - replace with dicts, regex, NaN - combine_first with different columns - groupby transform/filter/apply edge cases - cumulative operations (cumprod with zero, cumsum with NaN) - nlargest/nsmallest with ties and NaN - idxmin/idxmax edge cases - pipe chaining, between inclusive variations - DataFrame.map/applymap, iterrows/itertuples - Complex chained operations - Type coercion, describe, empty/single-row DataFrames - Multi-column groupby, arithmetic, unique/nunique - drop_duplicates, sort_values, fillna, dropna, isna/notna, abs Fix empty groupby results to set group keys as index when as_index=True to match pandas behavior.
…t paths - Extract _build_limit_by_clause() in sql_executor.py (3 inline LIMIT 1 BY blocks → 1 helper) - Extract _all_lazy_ops_are_sql_compatible() in core.py (3 near-identical SQL-check methods → shared predicate + thin wrappers) - Extract _build_count_subquery() in core.py (count/count_rows ORDER BY stripping dedup) - Extract _probe_schema() in core.py (4 inline LIMIT 0 schema probes → 1 helper) - Remove ~15 unnecessary hasattr(self, '_get_df'/'_wrap_result') guards (always True on DataStore) - Add logging to _bind_table_context silent catch; upgrade SQL fallback log level to info - Remove private helpers from __all__ in exceptions.py - Move math/datetime imports to module level in core.py - Fix test file paths: use os.path.dirname(__file__) instead of hardcoded relative paths
…ote sources
count_rows() and count() wrapped to_sql() in SELECT COUNT(*) FROM (...),
creating nested subqueries over remote() table functions that hang in
chDB's embedded ClickHouse engine.
Replace the subquery approach with flat SQL generation:
- Simple cases: SELECT count() FROM {source} WHERE {conditions}
- Complex + remote: fall back to len(_execute()) which generates flat SQL
- Complex + local: keep existing subquery (no regression)
Also fixes:
- count() alias collision: COUNT("value") AS "value" conflicted with
WHERE "value" > N in ClickHouse; removed aliases from flat path
- self.columns clearing state: capture table source and WHERE before
columns access which triggers execution on non-pristine sources
- assign() + filter: computed columns excluded from flat SQL path since
WHERE may reference columns that don't exist in the raw source
- loc[] WHERE: extract_clauses_from_ops captures WHERE from _lazy_ops,
fixing latent bug where loc[] conditions were ignored by count
e3f40a5 to
eadb5e2
Compare
…eak by 80%
Replace synchronous query("DataFrame") with send_query("DataFrame") +
pd.concat in Connection.execute() for database/file queries. This avoids
ChunkCollectorOutputFormat accumulating all result chunks in C++ memory
before building the DataFrame.
Simulation on 500M rows (50M matching) showed:
- MemoryTracker peak: 12.68 GiB -> 2.53 GiB (80% reduction)
- Process RSS peak: 14.28 GiB -> 4.54 GiB (68% reduction)
- Query time: 10.0s -> 7.0s (30% faster)
The _execute_df_query() path (Python table function) remains synchronous
because send_query() cannot find __df__ across stack frames.
Includes config toggle set_streaming_df(bool) as safety escape hatch,
and automatic fallback for DDL/empty results.
bedd457 to
50e6def
Compare
… pandas 3.x compat - __del__: make no-op to avoid dropping self._conn reference which triggers chdb's own __del__ -> close() -> EmbeddedServer shutdown, aborting in-flight streaming queries on other Connection objects - _query_df_streaming: only fallback to sync query() for read-only statements (SELECT/DESCRIBE/SHOW/EXPLAIN/WITH) when 0 chunks returned; return empty DataFrame for DDL/INSERT to prevent double execution - test_fixedstring_dtype_is_object: accept both object and StringDtype for pandas 3.x compatibility
50e6def to
f0d473c
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
query("DataFrame")withsend_query("DataFrame")+pd.concatinConnection.execute()for database/file queriesset_streaming_df()config toggle as safety escape hatchBenchmark Results (500M rows, 50M matching)
Test plan