Skip to content

Optimize streaming DataFrame execution for 80% memory reduction#545

Merged
auxten merged 35 commits into
mainfrom
supervisor/explore_07
Mar 19, 2026
Merged

Optimize streaming DataFrame execution for 80% memory reduction#545
auxten merged 35 commits into
mainfrom
supervisor/explore_07

Conversation

@auxten

@auxten auxten commented Mar 19, 2026

Copy link
Copy Markdown
Member

Summary

  • Replace synchronous query("DataFrame") with send_query("DataFrame") + pd.concat in Connection.execute() for database/file queries
  • Reduces C++ ChunkCollectorOutputFormat memory accumulation by streaming chunks one at a time
  • Includes set_streaming_df() config toggle as safety escape hatch
  • Automatic fallback for DDL statements and empty result sets

Benchmark Results (500M rows, 50M matching)

Metric Before After Improvement
MemoryTracker peak 12.68 GiB 2.53 GiB 80% reduction
Process RSS peak 14.28 GiB 4.54 GiB 68% reduction

Test plan

  • 9 new streaming tests (sync vs streaming equivalence, filters, aggregation, empty/single-row, config toggle, DataFrame source)
  • Full test suite: 10691 passed, 0 failed
  • Benchmark regression check on 100K/1M/10M rows — small result overhead ~10-15%, large result sets faster

auxten added 30 commits March 19, 2026 00:04
…urces

Previously, tail(n) on remote ClickHouse tables would download the
entire table via _get_df(), making it O(N) for large tables.

Now for positive n with SQL sources:
- Uses count_rows() (SQL COUNT(*)) to get total rows efficiently
- Uses offset(total - n).limit(n) to fetch only the last N rows
- Falls back to pandas path for negative n or non-SQL sources

Adds _can_use_sql_tail() helper to check if optimization is safe
(requires SQL source and all lazy ops must be SQL-expressible).

Adds 21 new tests covering tail correctness, edge cases, SQL
optimization verification, and remote ClickHouse integration.
…wnload

For SQL-backed DataStores (files, remote ClickHouse), these methods
now use SQL queries instead of loading all data:

- info(): Uses COUNT(*) + LIMIT 0 probe + COUNT(col) for non-null counts
- describe(): Single SQL query with COUNT/avg/stddevSamp/min/max/quantileExactInclusive
- sample(n): Uses ORDER BY rand() LIMIT n

Falls back to pandas for unsupported cases (replace=True, weights,
random_state, include='all').
…state

When use(database, table) was called, it set _connection_mode='table'
but didn't create _table_function or set table_name. This caused
_require_table_context() to pass while SQL generation failed due to
missing FROM clause source.

Add _bind_table_context() method that properly sets table_name and
creates _table_function for remote sources. Call this from use() when
a table argument is provided (2-arg and 3-arg cases).

Now ds.use('db', 'tbl').head() works correctly.
…pushdown helper

- Fix drop_duplicates(subset=...) to generate LIMIT 1 BY in SQL instead
  of ignoring subset in SQL path
- Add _distinct_subset field to track subset columns for SQL generation
- Extract repeated has_non_sql_ops check to _can_sql_pushdown() helper
- Add LazyDistinct to query planner's _can_push_op_to_sql() for SQL pushdown
- Add 21 tests for SQL generation, correctness, and log verification
…ushdown check

Fix ClickHouse SQL syntax by moving LIMIT 1 BY before LIMIT/OFFSET in:
- core.py:_generate_select_sql()
- sql_executor.py:build_simple_sql()
- sql_executor.py:build_sql_with_row_order_subquery()
- sql_executor.py:assemble_sql()

Refactor memory_usage() to use existing _can_sql_pushdown() helper
instead of duplicating the has_non_sql_ops check inline.

Add 12 SQL verification tests for clause ordering, _can_sql_pushdown
with LazyDistinct, and extended log capture scenarios.
Prevent unbounded ORDER BY from being pushed to SQL, which would force
remote servers to sort entire tables before returning results.

- ORDER BY only pushed to SQL when LIMIT follows in operation chain
- ORDER BY before GROUP BY with non-order-sensitive aggs (sum, mean, count)
  is classified as pandas (semantically meaningless sort dropped)
- Fix groupby.agg('sum') string argument being passed as agg_dict

This optimization is critical for remote ClickHouse tables where
unbounded ORDER BY would be extremely expensive.
Fix bug where ds["users"] in database mode (after ds.use("mydb"))
fell through to column access instead of table selection. The inner
condition requiring dot notation or connection mode was too restrictive.

Add comprehensive tests for use() -> table() state management covering
all connection/database/table mode transitions and independence.
Add comprehensive tests verifying ClickHouse-specific data types
preserve data through DataStore -> pandas DataFrame conversion:
- Decimal(18,4) -> float64 (precision loss documented for large values)
- FixedString(N) -> str with null padding
- Enum8 -> str labels
- UUID -> uuid.UUID objects
- IPv4/IPv6 -> ipaddress objects
- LowCardinality(String) -> transparent str
- Nullable(Int64) -> pandas Int64 nullable dtype
- DateTime64(3, 'UTC') -> datetime64[ns, UTC] with subsecond precision
- UInt64 -> uint64 (no precision loss for values > 2^53)
- Map(String, String) -> dict
- Array(String) -> numpy array
…kHouse errors

Add translate_remote_error() to convert raw chdb/ClickHouse exceptions into
actionable user messages with hints for: authentication failures, unknown
database/table, access denied, DNS errors, connection refused, timeouts,
and SQL syntax errors. Applied at query execution points in connection.py
and core.py. Includes 35 new tests covering all error categories.
Move 'import re' to top of file with other imports and add
translate_remote_error, _extract_clickhouse_error_code,
_extract_host_from_error to __all__ export list.
…e, date

Add _format_insert_value() to properly handle:
- float('nan')/np.nan/pd.NaT → NULL (was rendering as invalid SQL)
- datetime → quoted string '2024-01-15 10:30:00'
- date → quoted string '2024-01-15'

Consolidate formatting logic between _generate_insert_sql() and insert().
Add comprehensive test suite for INSERT INTO remote ClickHouse (32 tests).
Fix three critical bugs in _rewrite_table_references:
- SQL keywords (JOIN, WHERE, UNION, etc.) incorrectly captured as table aliases
- CTE references incorrectly rewritten as remote tables
- Backtick-quoted identifiers not supported

Add comprehensive _SQL_ALIAS_KEYWORDS list with negative lookahead to prevent
keyword capture. Extract CTE names before rewriting to preserve references.
Extend regex to match backtick-quoted table names.

Add 46 tests covering JOINs, CTEs, subqueries, UNION, aliases, quoted
identifiers, and complex analytics queries.
ORDER BY following GROUP BY operates on aggregated (small) result set,
so SQL sort is cheap. Previously, ORDER BY without LIMIT was rejected
from SQL pushdown, causing unnecessary segment boundaries and pandas
fallback after groupby().agg().sort_values() chains.

Add test_remote_lazy_chain_pushdown.py verifying complete SQL pushdown
for multi-step lazy chains on remote ClickHouse.
Add 44 tests verifying mixed operations between SQL-backed and
DataFrame-backed DataStores:
- Cross-source merge (inner/left/right/outer, both directions)
- DataFrame-only merge (inner, no-match, many-to-many, NaN keys)
- Concat (two DataStores, explicit materialization, mixed DS+DF)
- isin with local list (SQL table, DataFrame, empty, strings)
- Chained operations (merge->filter, merge->assign->filter, etc.)
- assign() + filter on SQL results
- SQL table join operations
- Complex workflows (filter->merge->assign->sort, isin->merge->groupby)
…te tables

Replace manual `SELECT * FROM (subquery) LIMIT 0` probe in `__repr__` with
existing optimized `dtypes` and `columns` properties, preventing hangs when
working with remote ClickHouse tables.
Remove forbidden hasattr pattern in test_groupby_agg_sum and
test_groupby_agg_mean per project rules that prohibit hasattr
checks in test code.
Add comprehensive tests verifying operations on external data sources
(MySQL, PostgreSQL, SQLite, remote ClickHouse, MongoDB, Redis) do not
inject rowNumberInAllBlocks() and generate correct SQL for filter/sort/
agg chains. Covers preserves_row_order() behavior, SQL clause generation,
and edge cases.
Add 18 tests for 100k+ row data to verify deterministic row order:
- filter→assign→sort→head chain repeated 5x for consistency
- dropna (non-contiguous index) + assign + groupby alignment
- rank(method='first') tie-breaking matches pandas on large data
- PythonTableFunction path vs direct execution path comparison
Fix count() to execute with LIMIT when head()/limit() is applied,
matching pandas behavior. Previously count() unconditionally stripped
LIMIT, causing incorrect results for ds.head(3).count().

Add 27 tests verifying:
- COUNT subquery never contains redundant ORDER BY
- count_rows() uses SQL COUNT(*) for pure SQL chains
- PANDAS_FILTER chains fall back to DataFrame execution
- Empty filter results return 0/zero Series
- LIMIT + count_rows/count executes then counts
Add 50 tests verifying DataFrame operations with non-contiguous indices
(step slicing, dropna, filter, sample) work correctly through chDB SQL
execution. Tests cover filter+sort, assign, groupby+agg, and complex
chained operations. Validates _prepare_df_for_chdb() properly resets
and restores indices.
Add 29 new tests across 5 test classes to verify that pristine DataStore
metadata access (.columns/.dtypes/.shape/.size/.empty) does not trigger
full data loading:

- TestZeroOverheadDataFrameSource: mock-verify _execute() not called
- TestZeroOverheadFileSource: verify parquet/CSV metadata fast path
- TestProbeDtypesLimit0: verify LIMIT 0 query generation
- TestPostFilterMetadata: verify filtered DataStore uses execution path
- TestRemoteDescribeNotSelectStar: verify DESCRIBE/schema() path for remote
Add comprehensive tests for ClickHouse Cloud connection auto-detection:
- explicit secure=False override for cloud hosts
- normalize_clickhouse_connection() idempotency (4 scenarios)
- URI format parsing with cloud host detection
- URI secure=false override for cloud hosts
- Fix var mapping: varPop → varSamp to match pandas sample variance (ddof=1)
- Add nunique mapping to uniqExact for distinct count support
- Add comprehensive test coverage for all groupby agg functions in both
  named_agg and dict_agg modes (60 new tests)
Add 30 tests covering:
- JSON accessor chain determinism (extract + filter)
- Nested subquery determinism (filter on computed columns)
- Concat determinism matching pandas order
- Large scale concat (1000+ rows)
- UNION ALL source data determinism
- Mixed operations chains

All tests verify results are non-flaky by running 5x iterations.
Fix InCondition.to_sql() for pandas-compatible NULL semantics:
- isin([]) -> '1=0' (always false), notin([]) -> '1=1' (always true)
- isin([nan, 1, 2]) -> '(col IS NULL OR col IN (1,2))'
- notin([nan, 1]) -> '(col IS NOT NULL AND col NOT IN (1))'
- notin([1, 2]) -> '(col NOT IN (1,2) OR col IS NULL)' to include NULL rows

Fix Literal(nan).to_sql() to generate 'NULL' instead of invalid 'nan'.

Add comprehensive boundary tests for empty lists, NULL handling,
single-row groupby, empty DataFrame operations, and all-NULL aggregation.
Add 125 exploratory tests covering under-tested pandas compatibility
areas: select_dtypes, where/mask, query(), eval(), compare(), update(),
nunique, cumsum/diff/pct_change, any/all, merge with indicator,
pivot_table with margins, apply axis=1, rolling/expanding/ewm,
melt/explode/transpose/isin, null handling, value_counts, describe,
duplicates, empty/single-row DataFrames, groupby named agg, concat,
head/tail/sample, rename/drop chains.

Fix dtype mismatch when where/mask with other=None introduces NaN:
pandas converts non-nullable int to float64, but chDB SQL returns
nullable Int64. Now correctly converts to float64 only when input
was non-nullable integer.
…tswith/endswith dtype

Add 190 new exploratory tests covering:
- rename, replace, nlargest/nsmallest, rank operations
- cumulative ops with NaN, assign, select_dtypes, duplicated
- value_counts, clip, diff, chained operations
- all-NaN columns, single-row DataFrames, groupby with NaN keys
- where/mask, string operations, iloc, head/tail edge cases
- between, min/max, apply, concat, pivot_table

Fix str.startswith() and str.endswith() to return bool dtype instead
of uint8 by wrapping ClickHouse's startsWith/endsWith with toBool(),
following the same pattern used by isna()/notna().
…y index

Add comprehensive exploratory test batch 103 covering under-tested areas:
- eval/query with complex expressions
- clip boundary conditions
- replace with dicts, regex, NaN
- combine_first with different columns
- groupby transform/filter/apply edge cases
- cumulative operations (cumprod with zero, cumsum with NaN)
- nlargest/nsmallest with ties and NaN
- idxmin/idxmax edge cases
- pipe chaining, between inclusive variations
- DataFrame.map/applymap, iterrows/itertuples
- Complex chained operations
- Type coercion, describe, empty/single-row DataFrames
- Multi-column groupby, arithmetic, unique/nunique
- drop_duplicates, sort_values, fillna, dropna, isna/notna, abs

Fix empty groupby results to set group keys as index when as_index=True
to match pandas behavior.
auxten added 2 commits March 19, 2026 15:22
…t paths

- Extract _build_limit_by_clause() in sql_executor.py (3 inline LIMIT 1 BY blocks → 1 helper)
- Extract _all_lazy_ops_are_sql_compatible() in core.py (3 near-identical SQL-check methods → shared predicate + thin wrappers)
- Extract _build_count_subquery() in core.py (count/count_rows ORDER BY stripping dedup)
- Extract _probe_schema() in core.py (4 inline LIMIT 0 schema probes → 1 helper)
- Remove ~15 unnecessary hasattr(self, '_get_df'/'_wrap_result') guards (always True on DataStore)
- Add logging to _bind_table_context silent catch; upgrade SQL fallback log level to info
- Remove private helpers from __all__ in exceptions.py
- Move math/datetime imports to module level in core.py
- Fix test file paths: use os.path.dirname(__file__) instead of hardcoded relative paths
…ote sources

count_rows() and count() wrapped to_sql() in SELECT COUNT(*) FROM (...),
creating nested subqueries over remote() table functions that hang in
chDB's embedded ClickHouse engine.

Replace the subquery approach with flat SQL generation:
- Simple cases: SELECT count() FROM {source} WHERE {conditions}
- Complex + remote: fall back to len(_execute()) which generates flat SQL
- Complex + local: keep existing subquery (no regression)

Also fixes:
- count() alias collision: COUNT("value") AS "value" conflicted with
  WHERE "value" > N in ClickHouse; removed aliases from flat path
- self.columns clearing state: capture table source and WHERE before
  columns access which triggers execution on non-pristine sources
- assign() + filter: computed columns excluded from flat SQL path since
  WHERE may reference columns that don't exist in the raw source
- loc[] WHERE: extract_clauses_from_ops captures WHERE from _lazy_ops,
  fixing latent bug where loc[] conditions were ignored by count
@auxten auxten changed the title feat(datastore): streaming DataFrame execution for 80% memory reduction Optimize streaming DataFrame execution for 80% memory reduction Mar 19, 2026
@auxten auxten force-pushed the supervisor/explore_07 branch from e3f40a5 to eadb5e2 Compare March 19, 2026 11:44
…eak by 80%

Replace synchronous query("DataFrame") with send_query("DataFrame") +
pd.concat in Connection.execute() for database/file queries. This avoids
ChunkCollectorOutputFormat accumulating all result chunks in C++ memory
before building the DataFrame.

Simulation on 500M rows (50M matching) showed:
- MemoryTracker peak: 12.68 GiB -> 2.53 GiB (80% reduction)
- Process RSS peak: 14.28 GiB -> 4.54 GiB (68% reduction)
- Query time: 10.0s -> 7.0s (30% faster)

The _execute_df_query() path (Python table function) remains synchronous
because send_query() cannot find __df__ across stack frames.

Includes config toggle set_streaming_df(bool) as safety escape hatch,
and automatic fallback for DDL/empty results.
@auxten auxten force-pushed the supervisor/explore_07 branch 6 times, most recently from bedd457 to 50e6def Compare March 19, 2026 13:57
… pandas 3.x compat

- __del__: make no-op to avoid dropping self._conn reference which
  triggers chdb's own __del__ -> close() -> EmbeddedServer shutdown,
  aborting in-flight streaming queries on other Connection objects
- _query_df_streaming: only fallback to sync query() for read-only
  statements (SELECT/DESCRIBE/SHOW/EXPLAIN/WITH) when 0 chunks returned;
  return empty DataFrame for DDL/INSERT to prevent double execution
- test_fixedstring_dtype_is_object: accept both object and StringDtype
  for pandas 3.x compatibility
@auxten auxten force-pushed the supervisor/explore_07 branch from 50e6def to f0d473c Compare March 19, 2026 14:27
@auxten auxten merged commit 7d10955 into main Mar 19, 2026
4 of 6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants