Skip to content

feat(results): add export() method and --output-format CLI flag#540

Merged
nabinchha merged 12 commits into
NVIDIA-NeMo:mainfrom
przemekboruta:feat/dataset-export
May 6, 2026
Merged

feat(results): add export() method and --output-format CLI flag#540
nabinchha merged 12 commits into
NVIDIA-NeMo:mainfrom
przemekboruta:feat/dataset-export

Conversation

@przemekboruta

@przemekboruta przemekboruta commented Apr 13, 2026

Copy link
Copy Markdown
Contributor

Summary

Closes #566

  • Adds DatasetCreationResults.export(path, format=) supporting jsonl, csv, and parquet formats
  • Adds DatasetCreationResults.count_records() which reads record counts from Parquet file metadata only (no data pages loaded, no OOM risk)
  • Adds --output-format / -f flag to the data-designer create CLI command with click.Choice validation; writes <dataset-name>.<format> alongside the parquet batch files
  • Output format is inferred from the file extension by default (case-insensitive); format= is an optional explicit override (e.g. write a .txt file as JSONL)
  • SUPPORTED_EXPORT_FORMATS is derived from get_args(ExportFormat) so the Literal and the tuple cannot drift apart
  • Unsupported format raises InvalidFileFormatError (consistent with project error conventions)
  • JSONL export uses date_format="iso" for consistent datetime serialization across formats

Implementation notes

Streaming export (no OOM risk)

export() never materialises the full dataset in memory. Instead of calling load_dataset(), it reads the individual batch parquet files from artifact_storage.final_dataset_path one at a time and appends each to the output file, keeping peak memory proportional to a single batch regardless of dataset size:

  • JSONL / CSV — each batch is appended to the output file; CSV header is written only on the first batch
  • Parquetpyarrow.parquet.ParquetWriter is used to write each batch as a row group into one output file; schemas are unified with pa.unify_schemas(promote_options="permissive") to handle minor numeric type drift (e.g. int64float64 across batches); truly incompatible schemas (different column names) raise InvalidFileFormatError

count_records() (no OOM risk)

count_records() reads row counts from Parquet file metadata only — no data pages are loaded:

sum(pq.read_metadata(f).num_rows for f in batch_files)

The CLI uses count_records() instead of len(load_dataset()) to report the final record count, avoiding loading the full dataset into memory just for a count.

Format inference

results.export("output.jsonl")          # inferred from extension
results.export("output.csv")            # inferred from extension
results.export("output.parquet")        # inferred from extension
results.export("output.txt", format="jsonl")  # explicit override
results.export("output.JSONL")          # case-insensitive extension

CLI:

data-designer create config.yaml --output-format jsonl
data-designer create config.yaml -n 500 -f csv
# Output file: <artifact-path>/<dataset-name>/<dataset-name>.<format>

Test plan

  • test_export_writes_file — parametrized over all 3 formats
  • test_export_jsonl_content — each line is valid JSON, all records present
  • test_export_csv_content — single header row, full record count
  • test_export_parquet_content — DataFrame round-trip across two batches
  • test_export_infers_format_from_extension — format omitted, inferred from .jsonl
  • test_export_explicit_format_overrides_extension.txt written as JSONL
  • test_export_parquet_schema_unification — two batches with diverging column types (int64/float64) merged correctly via permissive promotion
  • test_export_parquet_incompatible_schemas_raises — different column names across batches raises InvalidFileFormatError
  • test_export_uppercase_extension_is_recognised.JSONL treated same as .jsonl
  • test_export_unknown_extension_raises — raises InvalidFileFormatError
  • test_export_unsupported_explicit_format_raises — raises InvalidFileFormatError
  • test_export_no_batch_files_raises — raises ArtifactStorageError
  • test_export_returns_path_object — returns Path for str input
  • test_count_records — reads row counts from Parquet metadata, no data pages loaded
  • test_run_create_with_output_format_happy_path — export called with dataset-name-derived path
  • test_run_create_export_failure_exits — export exception cleans up partial file and exits with code 1
  • Existing CLI delegation tests updated for new output_format parameter

Adds DatasetCreationResults.export(path, format=) supporting jsonl,
csv, and parquet. The CLI create command gains --output-format / -f
which writes dataset.<format> alongside the parquet batch files.
@przemekboruta przemekboruta requested a review from a team as a code owner April 13, 2026 19:26
@greptile-apps

greptile-apps Bot commented Apr 13, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a DatasetCreationResults.export() method supporting JSONL, CSV, and Parquet output via a streaming multi-batch approach, a count_records() method that reads row counts from Parquet metadata only, and a --output-format / -f CLI flag to data-designer create. The CLI also switches from loading the full dataset to count_records() for the final record-count message.

  • export() streams individual batch Parquet files one at a time, keeping peak memory proportional to a single batch; Parquet output unifies schemas across batches using pa.unify_schemas(promote_options=\"permissive\") to tolerate numeric type drift.
  • count_records() reads row counts from Parquet file metadata only (pq.read_metadata(f).num_rows), avoiding a full dataset load.
  • CLI adds click.Choice-validated --output-format / -f flag; the export file is written to <artifact-path>/<dataset-name>/<dataset-name>.<format> after generation completes.

Confidence Score: 5/5

The new export and count_records logic is well-scoped, streaming export keeps memory bounded, and the test suite covers all format paths including edge cases like schema unification and incompatible column names.

All three export paths are exercised by tests, schema unification for Parquet handles type drift and incompatible columns, and partial-file cleanup on export failure is correctly implemented and tested. The CLI change is additive and does not touch existing generation logic.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer/src/data_designer/interface/results.py Adds export(), count_records(), and three private streaming helpers; ExportFormat and SUPPORTED_EXPORT_FORMATS are cleanly derived from a single Literal type.
packages/data-designer/src/data_designer/cli/controllers/generation_controller.py Replaces len(load_dataset()) with count_records() and appends an optional export step with cleanup on failure; ordering of console output slightly reordered (success message moved after export).
packages/data-designer/src/data_designer/cli/commands/create.py Adds --output-format / -f typer option backed by click.Choice(list(SUPPORTED_EXPORT_FORMATS)); forwards the value to the controller.
packages/data-designer/tests/interface/test_results.py Comprehensive parametrized tests covering all formats, format inference, schema unification, incompatible schemas, case-insensitive extensions, and error paths.
packages/data-designer/tests/cli/controllers/test_generation_controller.py Adds happy-path and failure-path tests for --output-format; updates existing mock helpers to use count_records instead of load_dataset.
packages/data-designer/tests/cli/commands/test_create_command.py Updates delegation tests for the new output_format parameter and adds a dedicated test for the --output-format forwarding path.
packages/data-designer/tests/cli/test_main.py Minor update adding output_format=None to the dispatch assertion.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    CLI["CLI: data-designer create\n--output-format fmt"] --> VAL["click.Choice validates format"]
    VAL --> RC["run_create()"]
    RC --> LOAD["_load_config()"]
    LOAD --> CREATE["data_designer.create()"]
    CREATE --> COUNT["results.count_records()\nreads Parquet metadata only"]
    COUNT --> ANALYSIS["results.load_analysis()"]
    ANALYSIS --> FMT{output_format set?}
    FMT -- No --> SUCCESS["print_success(N records)"]
    FMT -- Yes --> EXPORT_PATH["build export_path"]
    EXPORT_PATH --> EXPORT["results.export(export_path)"]
    EXPORT --> INFER["infer format from extension"]
    INFER --> FMTCHECK{format supported?}
    FMTCHECK -- No --> FMTERR["raise InvalidFileFormatError"]
    FMTCHECK -- Yes --> BATCHCHECK{batch files found?}
    BATCHCHECK -- No --> BATCHERR["raise ArtifactStorageError"]
    BATCHCHECK -- Yes --> FMTROUTE{format}
    FMTROUTE -- jsonl --> JSONL["_export_jsonl()"]
    FMTROUTE -- csv --> CSV["_export_csv()"]
    FMTROUTE -- parquet --> PQ["_export_parquet()"]
    JSONL --> DONE["return Path"]
    CSV --> DONE
    PQ --> DONE
    DONE --> SUCCESS
    EXPORT -- Exception --> CLEANUP["unlink partial file + Exit 1"]
Loading

Reviews (11): Last reviewed commit: "Merge branch 'main' into feat/dataset-ex..." | Re-trigger Greptile

Comment thread packages/data-designer/src/data_designer/cli/controllers/generation_controller.py Outdated

@andreatgretel andreatgretel left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @przemekboruta - export() fills a real gap and the core implementation is clean. A few things to tighten up before merging:

  • Per CONTRIBUTING.md, features should have an associated issue. Could you open one retroactively and link it here with Closes #NNN?
  • The success message / export ordering and double dataset load need fixing (see inline comments)
  • Need controller-level tests for the new output_format path - happy path, bad format rejection, and export failure
  • A couple of style guide items flagged inline (error types, import placement)

Left details in inline comments. Nice catch on the lazy import fix in the third commit.

Comment thread packages/data-designer/src/data_designer/cli/controllers/generation_controller.py Outdated
Comment thread packages/data-designer/src/data_designer/cli/controllers/generation_controller.py Outdated
Comment thread packages/data-designer/src/data_designer/interface/results.py Outdated
Comment thread packages/data-designer/src/data_designer/interface/results.py Outdated
Comment thread packages/data-designer/tests/interface/test_results.py Outdated
Comment thread packages/data-designer/src/data_designer/interface/results.py Outdated
…, import hygiene

- Derive SUPPORTED_EXPORT_FORMATS from get_args(ExportFormat) so the two can't drift apart
- Replace ValueError with InvalidFileFormatError in export() — consistent with project error conventions
- Add date_format="iso" to to_json() for consistent datetime serialization across formats
- Add click.Choice(SUPPORTED_EXPORT_FORMATS) to --output-format CLI option for parse-time
  validation, better --help output, and tab completion
- Fix double load_dataset() in run_create: inline len() so the DataFrame ref dies before export
- Move success message after the export block to avoid "Dataset created" followed by "Export failed"
- Move imports to module level in test_results.py (json, Path, lazy already imported)
- Add controller-level tests for output_format happy path, bad format rejection, and export failure
Comment thread packages/data-designer/src/data_designer/interface/results.py Outdated
@przemekboruta

Copy link
Copy Markdown
Contributor Author

hey @andreatgretel! thanks for the review. All points seem to be addressed right now.

@github-actions

Copy link
Copy Markdown
Contributor

Issue #566 has been triaged. The linked issue check is being re-evaluated.

andreatgretel
andreatgretel previously approved these changes Apr 22, 2026

@andreatgretel andreatgretel left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @przemekboruta, all review comments are addressed - approving!

We should be able to merge soon. Seeking one more approval - cc @NVIDIA-NeMo/data-designer-reviewers.

Comment thread packages/data-designer/src/data_designer/interface/results.py
@nabinchha

Copy link
Copy Markdown
Contributor

Heads up on OOM risk with large datasets

The proposed export() method should avoid calling load_dataset() internally. Today, load_dataset() materializes the entire dataset into a single pandas DataFrame via pd.read_parquet() -- for a large partitioned dataset, this will OOM.

The data is already on disk as individual batch files (batch_00000.parquet, batch_00001.parquet, ...), so export() can stream batch-by-batch into a single output file without ever holding the full dataset in memory:

  • JSONL / CSV -- iterate over batch files, append each chunk to the output file. These formats are naturally appendable, so this produces one valid file with memory proportional to a single batch.
  • Parquet -- use pyarrow.parquet.ParquetWriter to write each batch as a row group into one output file.

One nuance for the parquet path: batch files can have slightly mismatched schemas (e.g., a column inferred as int64 in one batch and float64 in another). The fix is cheap -- read just the metadata from each batch file upfront with pq.read_schema(), merge with pa.unify_schemas(), and cast each batch to the unified schema before writing.

Example implementation sketch:

from __future__ import annotations

from pathlib import Path

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd


SUPPORTED_FORMATS = {"jsonl", "csv", "parquet"}


def export(self, path: str | Path) -> Path:
    output = Path(path)
    fmt = output.suffix.lstrip(".")

    if fmt not in SUPPORTED_FORMATS:
        raise ValueError(
            f"Unsupported file extension {output.suffix!r}. "
            f"Use one of: {', '.join(SUPPORTED_FORMATS)}"
        )

    batch_files = sorted(self.artifact_storage.final_dataset_path.glob("*.parquet"))

    if not batch_files:
        raise FileNotFoundError("No batch parquet files found.")

    if fmt == "jsonl":
        _export_jsonl(batch_files, output)
    elif fmt == "csv":
        _export_csv(batch_files, output)
    elif fmt == "parquet":
        _export_parquet(batch_files, output)

    return output


def _export_jsonl(batch_files: list[Path], output: Path) -> None:
    with output.open("w") as f:
        for batch_file in batch_files:
            chunk = pd.read_parquet(batch_file)
            f.write(chunk.to_json(orient="records", lines=True))
            f.write("\n")


def _export_csv(batch_files: list[Path], output: Path) -> None:
    for i, batch_file in enumerate(batch_files):
        chunk = pd.read_parquet(batch_file)
        chunk.to_csv(output, mode="a", header=(i == 0), index=False)


def _export_parquet(batch_files: list[Path], output: Path) -> None:
    schemas = [pq.read_schema(f) for f in batch_files]
    unified = pa.unify_schemas(schemas)

    with pq.ParquetWriter(output, unified) as writer:
        for batch_file in batch_files:
            table = pq.read_table(batch_file)
            table = table.cast(unified)
            writer.write_table(table)

Usage:

results.export("output.jsonl")
results.export("output.csv")
results.export("output.parquet")

…atasets

- Rewrite export() to read batch parquet files one at a time instead of
  materialising the full dataset via load_dataset(); peak memory is now
  proportional to a single batch regardless of dataset size
- Infer output format from file extension by default; format= parameter
  kept as an explicit override (e.g. writing .txt as JSONL)
- _export_parquet unifies schemas across batches (pa.unify_schemas) to
  handle type drift (e.g. int64 vs float64 in the same column)
- Drop format= from the controller's export() call — path already carries
  the correct extension
- Rewrite export tests around real batch parquet files (stub_batch_dir
  fixture); add tests for multi-batch output, schema unification, unknown
  extension, empty batch directory, and explicit format override
@przemekboruta

Copy link
Copy Markdown
Contributor Author

Hey @nabinchha! I totally agree with your proposal and I've implemented it. Looking forward to your feedback

PR description updated

@przemekboruta przemekboruta requested a review from nabinchha April 22, 2026 19:05

@nabinchha nabinchha left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @przemekboruta! One blocker on memory safety and a few small wins to tidy up before merge.

Summary

Adds DatasetCreationResults.export(path, format=) (jsonl/csv/parquet) that streams batch parquet files one at a time to bound peak memory, plus --output-format/-f on data-designer create. Matches the stated intent; extension-based inference with explicit override is a nice ergonomic touch.

Findings

Critical — Let's fix these before merge

packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:161 — CLI path still OOMs on large datasets, defeating the streaming export

  • What: num_records = len(results.load_dataset()) reads every batch parquet file into a single in-memory DataFrame just to print the row count, and it runs before results.export(). For any dataset big enough to need streaming, the CLI OOMs on this line before the streaming export gets a chance to run.
  • Why: The PR's headline property is "peak memory proportional to a single batch regardless of dataset size." The programmatic API honors that — results.export(...) called directly from Python is safe. But data-designer create ... --output-format X (the most visible entry point, and the one users of #566 are most likely to hit) does not. This isn't a nit: the motivating use case is broken through the CLI.
  • Suggestion: Count rows from parquet metadata — no data pages loaded:
    batch_files = sorted(results.artifact_storage.final_dataset_path.glob("batch_*.parquet"))
    num_records = sum(lazy.pq.read_metadata(f).num_rows for f in batch_files)
    Cleaner option: expose a count_records() (or similar) helper on DatasetCreationResults / ArtifactStorage so the CLI doesn't have to know about batch file layout. Either way, this should land in this PR rather than a follow-up, since it's tied to the claim the PR is making.

Warnings — Worth addressing

packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:131-137 — Redundant validation now that click.Choice is in place

  • What: create.py:44 uses click_type=click.Choice(list(SUPPORTED_EXPORT_FORMATS)), which means Click rejects any invalid --output-format at parse time with its own error message — the run_create controller can never be reached with a bad value via the CLI. The controller-level check (and the local import at line 131) is dead code from the CLI's perspective.
  • Why: It's a small YAGNI violation (the controller isn't a documented public API, the only caller is create_command) and the style guide asks for imports at module level — create.py already imports SUPPORTED_EXPORT_FORMATS at the top, so lazy-loading doesn't buy anything here either. It also means we maintain two parallel error messages for the same condition.
  • Suggestion: Either (a) drop lines 131-137 and the corresponding test_run_create_invalid_output_format_exits test, or (b) if we want the controller to remain independently callable with defensive validation, move the import to module level and keep the check. I'd lean toward (a) since click.Choice already gives better --help output and tab completion.

packages/data-designer/src/data_designer/interface/results.py:229-234_export_parquet leaks raw pyarrow errors on genuinely incompatible schemas

  • What: pa.unify_schemas(schemas) handles numeric widening fine (tested), but raises pyarrow.lib.ArrowInvalid if batches have differing column names or truly incompatible types. Likewise table.cast(unified_schema) can raise on an unsupported cast. Those escape unwrapped.
  • Why: Per AGENTS.md / STYLEGUIDE.md, third-party exceptions at module boundaries should be normalized to data_designer error types. Callers writing except InvalidFileFormatError (or except DataDesignerError) will miss these, and the raw pyarrow traceback leaks an implementation detail of the engine into user-facing error handling.
  • Suggestion: Wrap the unify/cast block:
    try:
        unified_schema = lazy.pa.unify_schemas(schemas)
    except lazy.pa.lib.ArrowInvalid as e:
        raise InvalidFileFormatError(f"Cannot unify batch schemas for parquet export: {e}") from e

Suggestions — Take it or leave it

packages/data-designer/src/data_designer/interface/results.py:138 — File extension matching is case-sensitive

  • What: path.suffix.lstrip(".") returns "JSONL" for output.JSONL, which fails the format lookup.
  • Suggestion: path.suffix.lstrip(".").lower() — tiny ergonomic win, no downside.

packages/data-designer/src/data_designer/interface/results.py:211-212 — The trailing-newline guard in _export_jsonl is probably unnecessary

  • What: to_json(orient="records", lines=True) always emits a trailing \n in supported pandas versions, so if not content.endswith("\n") never fires. Not harmful, just noise.
  • Suggestion: Either drop the check, or add a one-line comment noting which pandas behavior it guards against so a future reader doesn't puzzle over it.

What Looks Good

  • Schema unification for parquet type drift — the pa.unify_schemas + table.cast approach is exactly the right move, and the dedicated test_export_parquet_schema_unification test (int64 → float64 across batches) nails the motivating case. Nice catch during the rewrite.
  • Streaming helpers are cleanly factored_export_jsonl / _export_csv / _export_parquet are small, single-purpose, and easy to reason about in isolation. The export() dispatcher stays a readable 15ish lines.
  • Error-type choices are consistent with the codebaseInvalidFileFormatError for format problems, ArtifactStorageError for "no batch files" is the right split. And the docstring → raised-type fix in commit 50a93d98 is a good correctness catch before merge.
  • Extension-inference-with-overrideresults.export("output.csv") vs. results.export("output.txt", format="jsonl") is a clean API. SUPPORTED_EXPORT_FORMATS derived from get_args(ExportFormat) means the Literal and tuple can't drift.
  • Test coverage is thorough — parametrized format coverage, content round-trips, multi-batch streaming, schema unification, extension inference, explicit override, empty batch dir, unsupported format, and the controller-level happy/sad paths. Good mix.

Verdict

Needs changes — the pre-export load_dataset() on line 161 breaks the PR's own memory-safety claim for the CLI path and should be fixed in this PR. The redundant controller validation is a smaller cleanup worth bundling in. The rest are genuine take-it-or-leave-it suggestions.


This review was generated by an AI assistant.

…g, UX

- Replace load_dataset() with count_records() in CLI to avoid OOM on
  large datasets; add count_records() method using pq.read_metadata
  (reads file metadata only, no data pages loaded)
- Remove redundant format validation in controller — click.Choice in
  create.py already rejects invalid values at parse time; dead code
  removed along with corresponding test
- Wrap pa.unify_schemas / table.cast ArrowInvalid as InvalidFileFormatError
  to normalize third-party exceptions at module boundaries per AGENTS.md
- Lowercase file extension before format lookup so .JSONL/.CSV/.PARQUET
  are accepted without error
- Add clarifying comment to trailing-newline guard in _export_jsonl
- Add tests: count_records(), uppercase extension, incompatible schemas
@przemekboruta przemekboruta requested a review from nabinchha April 25, 2026 09:51
@nabinchha

Copy link
Copy Markdown
Contributor

@przemekboruta, looks like tests are failing in CI. We will need to get those resolved to move this along.

…th bug

- Use promote_options="permissive" in pa.unify_schemas so minor numeric
  type drift (int64 vs float64) is handled by promotion instead of raising
- Also catch ArrowTypeError from unify_schemas and ValueError from
  table.cast() — the actual exception types thrown by pyarrow for these
  cases (ArrowInvalid alone is not sufficient)
- Wrap base_dataset_path in Path() in generation_controller.run_create
  to guard against callers that return a str (mock returns str, Path
  does not support / with str operands)
- Update test_export_parquet_incompatible_schemas_raises to match the
  new error source: with permissive unification, different-column-name
  batches fail at cast() not at unify_schemas(), so the match string
  changes from "Cannot unify batch schemas" to "Cannot cast batch"

@nabinchha nabinchha left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this through so many rounds, @przemekboruta — the streaming rewrite plus all the prior-review fixes have left this in really good shape. A handful of things worth tightening before merge below.

Summary

Adds DatasetCreationResults.export(path, format=) and a count_records() helper, plus --output-format/-f on the data-designer create CLI. Both export() and count_records() are explicitly designed to keep peak memory bounded (batch-at-a-time streaming for export, parquet metadata only for counting), which matches the stated intent and addresses the OOM concerns from earlier rounds. Linter passes cleanly on all changed files.

Findings

Warnings — Worth addressing

packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:163-170 — Partial file left on disk if export fails

  • What: _export_jsonl opens the output file with mode="w" and streams batches into it; if the third batch raises (disk full, encoding error, etc.), the partial output file stays behind. Same for CSV (append mode after the first batch) and parquet (ParquetWriter leaves a footerless file when the context manager exits via exception).
  • Why: The user sees Export failed: ... and exits with code 1, but a stale dataset.jsonl sitting next to the batch dir is misleading on retry — for CSV/JSONL it looks valid until you hit the truncation point, and for parquet it'll fail to read with a confusing footer error. This is the kind of thing that bites people when they re-run after fixing the underlying issue.
  • Suggestion: Write to a .tmp sibling and os.replace() on success, or wrap the export in a try/except that path.unlink(missing_ok=True) before re-raising:
    try:
        results.export(export_path)
    except Exception as e:
        export_path.unlink(missing_ok=True)
        print_error(f"Export failed: {e}")
        raise typer.Exit(code=1)

packages/data-designer/src/data_designer/interface/results.py:250, 257pa.lib.ArrowInvalid reaches into a private submodule

  • What: pyarrow exposes ArrowInvalid and ArrowTypeError directly at the top level (pa.ArrowInvalid, pa.ArrowTypeError); pa.lib is the underlying definition site but isn't part of the public surface. The two are aliases today, but lib is conventionally treated as private and has been reorganized between pyarrow versions before.
  • Why: We just spent the last review round normalizing third-party exceptions at the boundary per AGENTS.md — leaning on a private submodule path for the exception types we're catching partly undoes that hardening. If pyarrow ever moves these out of lib, our except clauses silently stop matching and _export_parquet starts leaking raw pyarrow errors again.
  • Suggestion: Swap to the public alias:
    except (lazy.pa.ArrowInvalid, lazy.pa.ArrowTypeError) as e:
        ...
    except (lazy.pa.ArrowInvalid, ValueError) as e:
        ...

packages/data-designer/src/data_designer/interface/results.py:223-225_export_jsonl trailing-newline guard comment is inaccurate

  • What: The comment says "Guard against empty batches where to_json() omits the trailing newline." Verified locally: in current pandas, an empty DataFrame returns "\n" from to_json(orient="records", lines=True), and a non-empty one always ends with "\n". The branch never fires, and the comment describes a behavior that doesn't exist.
  • Why: A future reader puzzling over this guard will be misled and may end up "fixing" empty-batch handling based on a phantom assumption. Two related sub-issues: (a) the guard is effectively dead code, (b) for empty batches the current code still writes the lone "\n" to disk, which produces a blank line in the JSONL output for every empty batch — harmless to most parsers, but noisy.
  • Suggestion: Either drop the guard and the comment entirely, or rephrase to something accurate ("Defensive: pandas has always emitted a trailing newline here, but guard against future versions changing that") and add if content: so empty batches don't emit a phantom blank line.

packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:153num_records parameter is shadowed by count_records()

  • What: num_records enters run_create() as the requested count (used in the header at line 137), then gets reassigned to the actual count returned by count_records() and reused in the success message at line 173 — same name, two different meanings.
  • Why: It's a real readability snag in a method that's already 60 lines and threading a couple of related concepts. A future maintainer touching this method has to mentally track that num_records flips meaning halfway through, and it's the kind of thing that's easy to mis-edit (e.g. moving the success message earlier and accidentally printing the requested count instead of the actual count).
  • Suggestion: Rename the second one — actual_record_count = results.count_records() — and use that on line 173.

packages/data-designer/src/data_designer/cli/commands/create.py:48--dataset-name doesn't influence the export filename

  • What: The help text documents <artifact-path>/<dataset-name>/dataset.<format>, so the literal dataset filename is intentional and visible. But for a user running data-designer create cfg.yaml -d my_data -f jsonl, getting my_data/dataset.jsonl instead of my_data/my_data.jsonl is asymmetric — --dataset-name controls the directory but not the file inside it.
  • Why: This is the user-facing entry point for the feature, so the convention we set here will stick. Users will reasonably expect --dataset-name to flow through to the filename, and the current behavior makes the flag feel half-wired. Easier to align it now than to change the convention later once people depend on the path.
  • Suggestion: Use <dataset-name>.<format> for the export filename and update the help text to match:
    export_path = Path(results.artifact_storage.base_dataset_path) / f"{dataset_name}.{output_format}"
    Worth bumping the controller test to assert the dataset-name-derived path too.

What Looks Good

  • Memory safety is now end-to-endcount_records() via parquet metadata + batch-streaming export() means both the programmatic and CLI paths are O(one batch) in peak memory. The prior reviewers' OOM concerns were addressed thoroughly, including the subtle CLI-only path that bypassed the streaming export.
  • Error normalization is on point — wrapping ArrowInvalid/ArrowTypeError/ValueError from pyarrow into InvalidFileFormatError at the boundary, and choosing ArtifactStorageError vs InvalidFileFormatError for the right cases, lines up cleanly with the AGENTS.md "errors normalize at boundaries" invariant. from e chaining preserves the original traceback too.
  • SUPPORTED_EXPORT_FORMATS derived from get_args(ExportFormat) — small detail, but it eliminates the class of bug where the Literal and tuple drift apart over time.
  • Test coverage is genuinely thorough — multi-batch streaming for all three formats, schema unification with type drift, incompatible-schema rejection, extension inference (lower + upper case), explicit format override, empty batch dir, unsupported format, plus controller-level happy/sad paths. stub_batch_dir is a clean fixture and reused well.
  • promote_options="permissive" + cast — handling int64↔float64 drift across batches without raising is exactly the right default for synthetic data, and the dedicated test pins down both the success case and the "cast actually fails" path post-permissive-unification.
  • Iterative review responsiveness — the commit history reads as a careful pass through every prior review point (error types, lazy imports, click.Choice, double load_dataset, count_records, schema permissive promotion), which made this review easy.

Verdict

Needs changes — nothing architectural, but the items above are worth pulling into this PR rather than punting:

  • pa.libpa exception references (one-line swap, undoes a piece of our error-normalization work otherwise)
  • partial-file cleanup on export failure (real retry footgun)
  • the misleading _export_jsonl comment (and either drop the dead guard or fix it)
  • rename the second num_records to a clearer name
  • decide on the export filename convention (dataset.<format> vs <dataset-name>.<format>) before this lands

This review was generated by an AI assistant.

- Use public pa.ArrowInvalid/ArrowTypeError instead of pa.lib.* in _export_parquet
- Drop dead trailing-newline guard in _export_jsonl; skip empty batches with `if content`
- Rename num_records→actual_record_count after count_records() call to avoid shadowing
- Unlink partial export file before re-raising on export failure in run_create
- Export filename now uses dataset_name (<dataset-name>.<format>) instead of literal "dataset"
- Update help text and tests to match new export filename convention
@przemekboruta przemekboruta requested a review from nabinchha May 6, 2026 17:13
przemekboruta added a commit to przemekboruta/DataDesigner that referenced this pull request May 6, 2026
…I flag, fix edge cases

C1: drop commit 0bdf24a — remove export() / --output-format from this PR; that feature
    belongs to NVIDIA-NeMo#540 which has a superior streaming implementation
C2: add --resume / -r flag to data-designer create CLI, thread ResumeMode through
    GenerationController.run_create() into DataDesigner.create()
C3: fix already-complete warning text — replace stale "Remove resume=True" with
    "Use resume=ResumeMode.NEVER" in _build_with_resume and _build_async
C4: fix docstrings — ALWAYS does NOT raise when no checkpoint exists (silently
    restarts from scratch); clarify num_records >= actual semantics
C5: sync artifact_storage.resume = NEVER when no-metadata fallback fires so both
    state holders agree after the downgrade
C6: fix return_value=False → _ConfigCompatibility.INCOMPATIBLE in IF_POSSIBLE test;
    drop 3 direct _find_completed_row_group_ids tests (private API, covered by build())
W1: add logger.warning when builder_config.json is absent (silent COMPATIBLE was footgun)
W2: narrow except Exception → (OSError, json.JSONDecodeError, ValidationError)
W3: run make check-all-fix — ruff reformatted test_if_possible_starts_fresh_when_directory_is_empty

@nabinchha nabinchha left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @przemekboruta!

@nabinchha nabinchha merged commit 0afe287 into NVIDIA-NeMo:main May 6, 2026
49 checks passed
przemekboruta added a commit to przemekboruta/DataDesigner that referenced this pull request May 7, 2026
Resolved conflicts in CLI create command and tests — kept both
--resume (PR NVIDIA-NeMo#526) and --output-format (PR NVIDIA-NeMo#540) parameters
throughout create.py, generation_controller.py, and all test files.
nabinchha added a commit that referenced this pull request May 8, 2026
#526)

* docs: add implementation plan for resume mechanism

Fixes #525

* feat(storage): add resume flag and clear_partial_results()

- ArtifactStorage gains a `resume: bool = False` field
- resolved_dataset_name skips timestamp logic when resume=True,
  returning the existing dataset folder name as-is
- Raises ArtifactStorageError on resume=True when the target folder
  is absent or empty (no data to resume from)
- New clear_partial_results() removes in-flight partial results
  left over from an interrupted run

Fixes #525

* feat(batch-manager): add start_batch param to start()

DatasetBatchManager.start() now accepts:
- start_batch: int = 0  — first batch index to process
- initial_actual_num_records: int = 0  — records already on disk

Both default to 0 so all existing call sites are unaffected.

Fixes #525

* feat(builder): implement resume logic in DatasetBuilder

- build() gains a resume: bool = False parameter
- _load_resume_state() reads metadata.json and validates that
  num_records and buffer_size match the original run
- _build_with_resume() skips completed batches, clears in-flight
  partial results, and continues from the first incomplete batch
- Raises DatasetGenerationError with clear messages for:
  - missing metadata.json (interrupted before first batch completes)
  - num_records mismatch
  - buffer_size mismatch
  - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported)
- Logs a warning and returns early when dataset is already complete

Fixes #525

* feat(interface): expose resume on DataDesigner.create()

- create() gains resume: bool = False
- _create_resource_provider() passes resume to ArtifactStorage
- builder.build() receives the resume flag

Fixes #525

* test: add tests for resume mechanism

Covers:
- ArtifactStorage.resolved_dataset_name with resume=True
- ArtifactStorage.clear_partial_results()
- DatasetBatchManager.start() with start_batch and
  initial_actual_num_records
- DatasetBuilder.build(resume=True): missing metadata, num_records
  mismatch, buffer_size mismatch, already-complete detection

Fixes #525

* feat(builder): extend resume to async engine (DATA_DESIGNER_ASYNC_ENGINE=1)

- Add _find_completed_row_group_ids() to scan parquet-files/ for already-written
  row groups by parsing batch_*.parquet filenames
- _build_async() now accepts resume=True: loads metadata, finds completed row groups,
  clears partial results, and logs progress; returns early if all row groups are done
- _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and
  initial_total_num_batches so the scheduler only processes remaining row groups
  and RowGroupBufferManager starts from the correct counts
- RowGroupBufferManager.__init__ gains initial_actual_num_records and
  initial_total_num_batches params to seed the counters on resume
- finalize_row_group closure now writes incremental metadata after each checkpoint
  so any run (resume or not) can be resumed if interrupted mid-way
- Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1
- Add tests for all new paths

* fix(builder): skip after-generation processors when resume finds dataset already complete

_build_with_resume and _build_async now return False when the dataset is already
complete (early-return path), True otherwise. build() skips
_processor_runner.run_after_generation() on False, preventing processors from
calling shutil.rmtree and rewriting an already-finalized dataset.

Fixes the issue raised in review: greptile P1 comment on PR #526.

* fix(builder): use filesystem count for initial_total_num_batches on async resume

Metadata can lag by one row group if a crash occurs between
move_partial_result_to_final_file_path and write_metadata. Using
len(completed_ids) from the filesystem scan instead of
state.num_completed_batches ensures the final metadata reflects the
actual number of parquet files present, not the potentially stale
metadata count.

* feat(results): add export() method and --output-format CLI flag

Adds DatasetCreationResults.export(path, format=) supporting jsonl,
csv, and parquet. The CLI create command gains --output-format / -f
which writes dataset.<format> alongside the parquet batch files.

* fix(builder): handle resume when metadata.json missing (interrupted before first batch)

When a run is interrupted before any row group or batch completes, metadata.json
is never written. Previously resume=True would raise DatasetGenerationError in
this case. Now build() detects the missing file, logs an info message, clears
any leftover partial results and falls back to a clean fresh run.

This is the common scenario for small datasets (fewer records than buffer_size)
where all records fit in a single row group.

* docs(interface): fix resume docstring — async engine is supported

* fix(builder): derive initial_actual_num_records from filesystem in async resume

In the crash window (row group written to disk but write_metadata crashed before
updating the file), both initial_total_num_batches and initial_actual_num_records
now use the filesystem-discovered completed_ids as source of truth.  Previously
initial_actual_num_records was read from potentially stale metadata, causing
actual_num_records in the final metadata to be undercounted by one row group.

Also adds a test covering the partial-resume crash-window scenario.

* feat(resume): replace resume: bool with ResumeMode enum (NEVER/ALWAYS/IF_POSSIBLE)

- Introduces ResumeMode(StrEnum) in artifact_storage.py for use across all layers
- Replaces resume: bool with resume: ResumeMode in DatasetBuilder.build(),
  DataDesigner.create(), ArtifactStorage, and _build_async()
- Adds _check_resume_config_compatibility() using config fingerprints to support
  IF_POSSIBLE: falls back to a fresh run when config has changed since last run
- Relaxes num_records validation from strict equality to num_records >= actual_num_records,
  allowing dataset extension on resume; buffer_size must still match exactly
- Preserves exception chain with 'from exc' on FileNotFoundError in _load_resume_state
- Exports ResumeMode from data_designer.interface for users to import
- Adds skip_row_groups assertion test and IF_POSSIBLE storage behavior tests

* fix(resume): invalidate resolved_dataset_name cache when IF_POSSIBLE downgrades to NEVER

ArtifactStorage's Pydantic model validator accesses base_dataset_path at
construction time, caching resolved_dataset_name under IF_POSSIBLE semantics
before build() can set resume=NEVER. Pop the stale cache entry so the property
re-resolves with the correct NEVER semantics (timestamped directory).

Also fixes _check_resume_config_compatibility() to use artifact_path/dataset_name
directly instead of base_dataset_path, and adds a regression test covering the
cache-bypass scenario.

* fix(builder): move partial-completion warning before return in _build_async

* fix(builder): IF_POSSIBLE now starts fresh when no dataset directory exists

_check_resume_config_compatibility returned True when config_path was absent,
even when the dataset directory itself didn't exist. This caused IF_POSSIBLE to
upgrade to ALWAYS, which then raised ArtifactStorageError on the first-ever run
because ALWAYS requires an existing directory.

Fix: return False early when the dataset directory is absent. Also sets
actual_num_records on mock buffer managers in two async resume tests that
started failing after the partial-completion warning block was made reachable.

* fix(builder): use original target_num_records in async resume record count

When extending a non-aligned run (e.g. original num_records=5, buffer_size=2),
the last completed row group has 1 record, not buffer_size=2. Using new num_records
in the formula would overcount: min(2, 7-2*2)=2 instead of min(2, 5-2*2)=1.

Fix: capture state from _load_resume_state (previously discarded) and pass
state.target_num_records into the sum formula. Added target_num_records field to
_ResumeState, populated from metadata.json.

Test: test_build_async_resume_initial_actual_num_records_uses_original_target

* fix(builder): IF_POSSIBLE starts fresh on empty dataset directory

Empty directory (crash between mkdir and first file write) was treated as
compatible — _check_resume_config_compatibility returned True, IF_POSSIBLE
upgraded to ALWAYS, which then raised ArtifactStorageError.

Fix: treat empty directory the same as missing — return False from
_check_resume_config_compatibility when any(dir.iterdir()) is False.

Test: test_if_possible_starts_fresh_when_directory_is_empty

* fix(builder): ALWAYS raises DatasetGenerationError on config fingerprint mismatch

ResumeMode.ALWAYS was documented to raise when column/model config changed, but
_check_resume_config_compatibility() was only called in the IF_POSSIBLE branch.
A user resuming with ALWAYS after changing the config would silently mix records
from two different configs.

Fix:
- Refactor _check_resume_config_compatibility() to return _ConfigCompatibility
  enum (COMPATIBLE / INCOMPATIBLE / NO_PRIOR_DATASET) instead of bool so callers
  can distinguish 'no prior run' from 'configs differ'
- Call the check for both ALWAYS and IF_POSSIBLE before _write_builder_config()
- ALWAYS + INCOMPATIBLE → DatasetGenerationError
- IF_POSSIBLE + INCOMPATIBLE → silent fresh start (existing behaviour)
- IF_POSSIBLE + NO_PRIOR_DATASET → silent fresh start (existing behaviour)

Test: test_build_resume_always_raises_on_config_mismatch

* fix(resume): address nabinchha review — drop export collision, add CLI flag, fix edge cases

C1: drop commit 0bdf24a — remove export() / --output-format from this PR; that feature
    belongs to #540 which has a superior streaming implementation
C2: add --resume / -r flag to data-designer create CLI, thread ResumeMode through
    GenerationController.run_create() into DataDesigner.create()
C3: fix already-complete warning text — replace stale "Remove resume=True" with
    "Use resume=ResumeMode.NEVER" in _build_with_resume and _build_async
C4: fix docstrings — ALWAYS does NOT raise when no checkpoint exists (silently
    restarts from scratch); clarify num_records >= actual semantics
C5: sync artifact_storage.resume = NEVER when no-metadata fallback fires so both
    state holders agree after the downgrade
C6: fix return_value=False → _ConfigCompatibility.INCOMPATIBLE in IF_POSSIBLE test;
    drop 3 direct _find_completed_row_group_ids tests (private API, covered by build())
W1: add logger.warning when builder_config.json is absent (silent COMPATIBLE was footgun)
W2: narrow except Exception → (OSError, json.JSONDecodeError, ValidationError)
W3: run make check-all-fix — ruff reformatted test_if_possible_starts_fresh_when_directory_is_empty

* fix(builder): replace stdlib StrEnum with project compat shim for Python 3.10

* fix(builder): guard extension row groups in initial_actual_num_records formula on async resume

When extending an async run (num_records > state.target_num_records) and a crash
occurs after an extension row group is written to disk but before write_metadata,
the formula `min(buffer_size, state.target_num_records - rg_id * buffer_size)` yields
a negative value for any extension row group (rg_id * buffer_size >= target), making
initial_actual_num_records silently undercount. The RowGroupBufferManager then starts
at the wrong offset, and the final metadata reports an incorrect actual_num_records
with a false partial-completion warning.

Fix: use state.target_num_records for original row groups and num_records for extension
row groups (guarded by rg_id * buffer_size < state.target_num_records). Covers the
scenario with a new regression test.

* fix(builder): pre-compute row-group list in _build_async to fix sizes on non-aligned extension resume

The partitioning loop in _prepare_async_run decremented remaining by
min(buffer_size, remaining) for every row group, including skipped ones.
For a non-aligned original run (e.g. target=5, buffer_size=2, last group
has 1 record), the loop deducted 2 for the skipped last group, leaving
remaining one short.  Extension row groups received smaller sizes than
intended, so the generated dataset was silently short by the deficit and
a false partial-completion warning fired.

Fix: pre-compute the full row-group list with correct per-group sizes in
_build_async where state.target_num_records is available, then pass it to
_prepare_async_run as precomputed_row_groups (replacing the skip_row_groups
param). Original groups use min(buffer_size, target - rg*bs); extension
groups use min(buffer_size, extension_records - ext_idx*bs).

Also updates the skip_row_groups test to assert on precomputed_row_groups
and adds a regression test for the non-aligned extension case.

* chore: remove stale implementation plan for #525

The plan described the initial resume: bool design which has since been
replaced by the full ResumeMode enum (NEVER/ALWAYS/IF_POSSIBLE), async
engine support, filesystem reconciliation, and config compatibility checks.
The PR description is the authoritative record of what shipped.

* fix(engine): fix false 'already complete' when extension fits in last group's slack

original_target=5, buffer_size=2 produces 3 groups [2,2,1]. Extending to
num_records=6: ceil(6/2)=3 equalled len(completed_ids)=3, triggering the
already-complete branch on both the async and sync paths — returning the
5-record dataset silently.

Fix (async): replace ceil(num_records/bs) with
  num_original_groups + ceil(extension_records/bs)
so any extension always adds new groups beyond num_original_groups.

Fix (sync): add num_records_list param to DatasetBatchManager.start() and
pass the correct per-batch sizes in _build_with_resume, giving the batch
manager the right total batch count (4 instead of 3 in the example).

* fix(engine): raise error when num_records is below original target on resume

Prevents negative extension_records in async path which silently truncated
the dataset and corrupted metadata without triggering a partial-completion warning.

* fix(storage): refresh MediaStorage path after IF_POSSIBLE → NEVER downgrade

When build() detected an incompatible config and downgraded resume from
IF_POSSIBLE to NEVER, _media_storage.base_path remained bound to the
original directory while all other path properties resolved to the new
timestamped directory — causing broken image references in image-column runs.

* fix(engine): preserve original_target_num_records across extension resume writes

After finalize_row_group successfully wrote incremental metadata during an
extension run, target_num_records in metadata was updated to the extension
target. A subsequent resume would read this as the original target, making
_rg_size() incorrect for all row groups and silently corrupting actual_num_records.

Stores original_target_num_records as an immutable field in metadata so the
original group boundaries are always recoverable regardless of how many
incremental writes have occurred.

---------

Co-authored-by: Nabin Mulepati <nmulepati@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: add export() method and --output-format CLI flag to DatasetCreationResults

3 participants