feat(results): add export() method and --output-format CLI flag#540
Conversation
Adds DatasetCreationResults.export(path, format=) supporting jsonl, csv, and parquet. The CLI create command gains --output-format / -f which writes dataset.<format> alongside the parquet batch files.
Greptile SummaryThis PR adds a
|
| Filename | Overview |
|---|---|
| packages/data-designer/src/data_designer/interface/results.py | Adds export(), count_records(), and three private streaming helpers; ExportFormat and SUPPORTED_EXPORT_FORMATS are cleanly derived from a single Literal type. |
| packages/data-designer/src/data_designer/cli/controllers/generation_controller.py | Replaces len(load_dataset()) with count_records() and appends an optional export step with cleanup on failure; ordering of console output slightly reordered (success message moved after export). |
| packages/data-designer/src/data_designer/cli/commands/create.py | Adds --output-format / -f typer option backed by click.Choice(list(SUPPORTED_EXPORT_FORMATS)); forwards the value to the controller. |
| packages/data-designer/tests/interface/test_results.py | Comprehensive parametrized tests covering all formats, format inference, schema unification, incompatible schemas, case-insensitive extensions, and error paths. |
| packages/data-designer/tests/cli/controllers/test_generation_controller.py | Adds happy-path and failure-path tests for --output-format; updates existing mock helpers to use count_records instead of load_dataset. |
| packages/data-designer/tests/cli/commands/test_create_command.py | Updates delegation tests for the new output_format parameter and adds a dedicated test for the --output-format forwarding path. |
| packages/data-designer/tests/cli/test_main.py | Minor update adding output_format=None to the dispatch assertion. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
CLI["CLI: data-designer create\n--output-format fmt"] --> VAL["click.Choice validates format"]
VAL --> RC["run_create()"]
RC --> LOAD["_load_config()"]
LOAD --> CREATE["data_designer.create()"]
CREATE --> COUNT["results.count_records()\nreads Parquet metadata only"]
COUNT --> ANALYSIS["results.load_analysis()"]
ANALYSIS --> FMT{output_format set?}
FMT -- No --> SUCCESS["print_success(N records)"]
FMT -- Yes --> EXPORT_PATH["build export_path"]
EXPORT_PATH --> EXPORT["results.export(export_path)"]
EXPORT --> INFER["infer format from extension"]
INFER --> FMTCHECK{format supported?}
FMTCHECK -- No --> FMTERR["raise InvalidFileFormatError"]
FMTCHECK -- Yes --> BATCHCHECK{batch files found?}
BATCHCHECK -- No --> BATCHERR["raise ArtifactStorageError"]
BATCHCHECK -- Yes --> FMTROUTE{format}
FMTROUTE -- jsonl --> JSONL["_export_jsonl()"]
FMTROUTE -- csv --> CSV["_export_csv()"]
FMTROUTE -- parquet --> PQ["_export_parquet()"]
JSONL --> DONE["return Path"]
CSV --> DONE
PQ --> DONE
DONE --> SUCCESS
EXPORT -- Exception --> CLEANUP["unlink partial file + Exit 1"]
Reviews (11): Last reviewed commit: "Merge branch 'main' into feat/dataset-ex..." | Re-trigger Greptile
andreatgretel
left a comment
There was a problem hiding this comment.
Thanks for this @przemekboruta - export() fills a real gap and the core implementation is clean. A few things to tighten up before merging:
- Per CONTRIBUTING.md, features should have an associated issue. Could you open one retroactively and link it here with
Closes #NNN? - The success message / export ordering and double dataset load need fixing (see inline comments)
- Need controller-level tests for the new
output_formatpath - happy path, bad format rejection, and export failure - A couple of style guide items flagged inline (error types, import placement)
Left details in inline comments. Nice catch on the lazy import fix in the third commit.
…, import hygiene - Derive SUPPORTED_EXPORT_FORMATS from get_args(ExportFormat) so the two can't drift apart - Replace ValueError with InvalidFileFormatError in export() — consistent with project error conventions - Add date_format="iso" to to_json() for consistent datetime serialization across formats - Add click.Choice(SUPPORTED_EXPORT_FORMATS) to --output-format CLI option for parse-time validation, better --help output, and tab completion - Fix double load_dataset() in run_create: inline len() so the DataFrame ref dies before export - Move success message after the export block to avoid "Dataset created" followed by "Export failed" - Move imports to module level in test_results.py (json, Path, lazy already imported) - Add controller-level tests for output_format happy path, bad format rejection, and export failure
|
hey @andreatgretel! thanks for the review. All points seem to be addressed right now. |
|
Issue #566 has been triaged. The linked issue check is being re-evaluated. |
andreatgretel
left a comment
There was a problem hiding this comment.
Thanks @przemekboruta, all review comments are addressed - approving!
We should be able to merge soon. Seeking one more approval - cc @NVIDIA-NeMo/data-designer-reviewers.
|
Heads up on OOM risk with large datasets The proposed The data is already on disk as individual batch files (
One nuance for the parquet path: batch files can have slightly mismatched schemas (e.g., a column inferred as Example implementation sketch: from __future__ import annotations
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
SUPPORTED_FORMATS = {"jsonl", "csv", "parquet"}
def export(self, path: str | Path) -> Path:
output = Path(path)
fmt = output.suffix.lstrip(".")
if fmt not in SUPPORTED_FORMATS:
raise ValueError(
f"Unsupported file extension {output.suffix!r}. "
f"Use one of: {', '.join(SUPPORTED_FORMATS)}"
)
batch_files = sorted(self.artifact_storage.final_dataset_path.glob("*.parquet"))
if not batch_files:
raise FileNotFoundError("No batch parquet files found.")
if fmt == "jsonl":
_export_jsonl(batch_files, output)
elif fmt == "csv":
_export_csv(batch_files, output)
elif fmt == "parquet":
_export_parquet(batch_files, output)
return output
def _export_jsonl(batch_files: list[Path], output: Path) -> None:
with output.open("w") as f:
for batch_file in batch_files:
chunk = pd.read_parquet(batch_file)
f.write(chunk.to_json(orient="records", lines=True))
f.write("\n")
def _export_csv(batch_files: list[Path], output: Path) -> None:
for i, batch_file in enumerate(batch_files):
chunk = pd.read_parquet(batch_file)
chunk.to_csv(output, mode="a", header=(i == 0), index=False)
def _export_parquet(batch_files: list[Path], output: Path) -> None:
schemas = [pq.read_schema(f) for f in batch_files]
unified = pa.unify_schemas(schemas)
with pq.ParquetWriter(output, unified) as writer:
for batch_file in batch_files:
table = pq.read_table(batch_file)
table = table.cast(unified)
writer.write_table(table)Usage: results.export("output.jsonl")
results.export("output.csv")
results.export("output.parquet") |
…atasets - Rewrite export() to read batch parquet files one at a time instead of materialising the full dataset via load_dataset(); peak memory is now proportional to a single batch regardless of dataset size - Infer output format from file extension by default; format= parameter kept as an explicit override (e.g. writing .txt as JSONL) - _export_parquet unifies schemas across batches (pa.unify_schemas) to handle type drift (e.g. int64 vs float64 in the same column) - Drop format= from the controller's export() call — path already carries the correct extension - Rewrite export tests around real batch parquet files (stub_batch_dir fixture); add tests for multi-batch output, schema unification, unknown extension, empty batch directory, and explicit format override
|
Hey @nabinchha! I totally agree with your proposal and I've implemented it. Looking forward to your feedback PR description updated |
nabinchha
left a comment
There was a problem hiding this comment.
Thanks @przemekboruta! One blocker on memory safety and a few small wins to tidy up before merge.
Summary
Adds DatasetCreationResults.export(path, format=) (jsonl/csv/parquet) that streams batch parquet files one at a time to bound peak memory, plus --output-format/-f on data-designer create. Matches the stated intent; extension-based inference with explicit override is a nice ergonomic touch.
Findings
Critical — Let's fix these before merge
packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:161 — CLI path still OOMs on large datasets, defeating the streaming export
- What:
num_records = len(results.load_dataset())reads every batch parquet file into a single in-memory DataFrame just to print the row count, and it runs beforeresults.export(). For any dataset big enough to need streaming, the CLI OOMs on this line before the streaming export gets a chance to run. - Why: The PR's headline property is "peak memory proportional to a single batch regardless of dataset size." The programmatic API honors that —
results.export(...)called directly from Python is safe. Butdata-designer create ... --output-format X(the most visible entry point, and the one users of #566 are most likely to hit) does not. This isn't a nit: the motivating use case is broken through the CLI. - Suggestion: Count rows from parquet metadata — no data pages loaded:
Cleaner option: expose a
batch_files = sorted(results.artifact_storage.final_dataset_path.glob("batch_*.parquet")) num_records = sum(lazy.pq.read_metadata(f).num_rows for f in batch_files)
count_records()(or similar) helper onDatasetCreationResults/ArtifactStorageso the CLI doesn't have to know about batch file layout. Either way, this should land in this PR rather than a follow-up, since it's tied to the claim the PR is making.
Warnings — Worth addressing
packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:131-137 — Redundant validation now that click.Choice is in place
- What:
create.py:44usesclick_type=click.Choice(list(SUPPORTED_EXPORT_FORMATS)), which means Click rejects any invalid--output-formatat parse time with its own error message — therun_createcontroller can never be reached with a bad value via the CLI. The controller-level check (and the local import at line 131) is dead code from the CLI's perspective. - Why: It's a small YAGNI violation (the controller isn't a documented public API, the only caller is
create_command) and the style guide asks for imports at module level —create.pyalready importsSUPPORTED_EXPORT_FORMATSat the top, so lazy-loading doesn't buy anything here either. It also means we maintain two parallel error messages for the same condition. - Suggestion: Either (a) drop lines 131-137 and the corresponding
test_run_create_invalid_output_format_exitstest, or (b) if we want the controller to remain independently callable with defensive validation, move the import to module level and keep the check. I'd lean toward (a) sinceclick.Choicealready gives better--helpoutput and tab completion.
packages/data-designer/src/data_designer/interface/results.py:229-234 — _export_parquet leaks raw pyarrow errors on genuinely incompatible schemas
- What:
pa.unify_schemas(schemas)handles numeric widening fine (tested), but raisespyarrow.lib.ArrowInvalidif batches have differing column names or truly incompatible types. Likewisetable.cast(unified_schema)can raise on an unsupported cast. Those escape unwrapped. - Why: Per AGENTS.md / STYLEGUIDE.md, third-party exceptions at module boundaries should be normalized to
data_designererror types. Callers writingexcept InvalidFileFormatError(orexcept DataDesignerError) will miss these, and the raw pyarrow traceback leaks an implementation detail of the engine into user-facing error handling. - Suggestion: Wrap the unify/cast block:
try: unified_schema = lazy.pa.unify_schemas(schemas) except lazy.pa.lib.ArrowInvalid as e: raise InvalidFileFormatError(f"Cannot unify batch schemas for parquet export: {e}") from e
Suggestions — Take it or leave it
packages/data-designer/src/data_designer/interface/results.py:138 — File extension matching is case-sensitive
- What:
path.suffix.lstrip(".")returns"JSONL"foroutput.JSONL, which fails the format lookup. - Suggestion:
path.suffix.lstrip(".").lower()— tiny ergonomic win, no downside.
packages/data-designer/src/data_designer/interface/results.py:211-212 — The trailing-newline guard in _export_jsonl is probably unnecessary
- What:
to_json(orient="records", lines=True)always emits a trailing\nin supported pandas versions, soif not content.endswith("\n")never fires. Not harmful, just noise. - Suggestion: Either drop the check, or add a one-line comment noting which pandas behavior it guards against so a future reader doesn't puzzle over it.
What Looks Good
- Schema unification for parquet type drift — the
pa.unify_schemas+table.castapproach is exactly the right move, and the dedicatedtest_export_parquet_schema_unificationtest (int64 → float64 across batches) nails the motivating case. Nice catch during the rewrite. - Streaming helpers are cleanly factored —
_export_jsonl/_export_csv/_export_parquetare small, single-purpose, and easy to reason about in isolation. Theexport()dispatcher stays a readable 15ish lines. - Error-type choices are consistent with the codebase —
InvalidFileFormatErrorfor format problems,ArtifactStorageErrorfor "no batch files" is the right split. And the docstring → raised-type fix in commit50a93d98is a good correctness catch before merge. - Extension-inference-with-override —
results.export("output.csv")vs.results.export("output.txt", format="jsonl")is a clean API.SUPPORTED_EXPORT_FORMATSderived fromget_args(ExportFormat)means the Literal and tuple can't drift. - Test coverage is thorough — parametrized format coverage, content round-trips, multi-batch streaming, schema unification, extension inference, explicit override, empty batch dir, unsupported format, and the controller-level happy/sad paths. Good mix.
Verdict
Needs changes — the pre-export load_dataset() on line 161 breaks the PR's own memory-safety claim for the CLI path and should be fixed in this PR. The redundant controller validation is a smaller cleanup worth bundling in. The rest are genuine take-it-or-leave-it suggestions.
This review was generated by an AI assistant.
…g, UX - Replace load_dataset() with count_records() in CLI to avoid OOM on large datasets; add count_records() method using pq.read_metadata (reads file metadata only, no data pages loaded) - Remove redundant format validation in controller — click.Choice in create.py already rejects invalid values at parse time; dead code removed along with corresponding test - Wrap pa.unify_schemas / table.cast ArrowInvalid as InvalidFileFormatError to normalize third-party exceptions at module boundaries per AGENTS.md - Lowercase file extension before format lookup so .JSONL/.CSV/.PARQUET are accepted without error - Add clarifying comment to trailing-newline guard in _export_jsonl - Add tests: count_records(), uppercase extension, incompatible schemas
|
@przemekboruta, looks like tests are failing in CI. We will need to get those resolved to move this along. |
…th bug - Use promote_options="permissive" in pa.unify_schemas so minor numeric type drift (int64 vs float64) is handled by promotion instead of raising - Also catch ArrowTypeError from unify_schemas and ValueError from table.cast() — the actual exception types thrown by pyarrow for these cases (ArrowInvalid alone is not sufficient) - Wrap base_dataset_path in Path() in generation_controller.run_create to guard against callers that return a str (mock returns str, Path does not support / with str operands) - Update test_export_parquet_incompatible_schemas_raises to match the new error source: with permissive unification, different-column-name batches fail at cast() not at unify_schemas(), so the match string changes from "Cannot unify batch schemas" to "Cannot cast batch"
nabinchha
left a comment
There was a problem hiding this comment.
Thanks for putting this through so many rounds, @przemekboruta — the streaming rewrite plus all the prior-review fixes have left this in really good shape. A handful of things worth tightening before merge below.
Summary
Adds DatasetCreationResults.export(path, format=) and a count_records() helper, plus --output-format/-f on the data-designer create CLI. Both export() and count_records() are explicitly designed to keep peak memory bounded (batch-at-a-time streaming for export, parquet metadata only for counting), which matches the stated intent and addresses the OOM concerns from earlier rounds. Linter passes cleanly on all changed files.
Findings
Warnings — Worth addressing
packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:163-170 — Partial file left on disk if export fails
- What:
_export_jsonlopens the output file withmode="w"and streams batches into it; if the third batch raises (disk full, encoding error, etc.), the partial output file stays behind. Same for CSV (append mode after the first batch) and parquet (ParquetWriterleaves a footerless file when the context manager exits via exception). - Why: The user sees
Export failed: ...and exits with code 1, but a staledataset.jsonlsitting next to the batch dir is misleading on retry — for CSV/JSONL it looks valid until you hit the truncation point, and for parquet it'll fail to read with a confusing footer error. This is the kind of thing that bites people when they re-run after fixing the underlying issue. - Suggestion: Write to a
.tmpsibling andos.replace()on success, or wrap the export in atry/exceptthatpath.unlink(missing_ok=True)before re-raising:try: results.export(export_path) except Exception as e: export_path.unlink(missing_ok=True) print_error(f"Export failed: {e}") raise typer.Exit(code=1)
packages/data-designer/src/data_designer/interface/results.py:250, 257 — pa.lib.ArrowInvalid reaches into a private submodule
- What: pyarrow exposes
ArrowInvalidandArrowTypeErrordirectly at the top level (pa.ArrowInvalid,pa.ArrowTypeError);pa.libis the underlying definition site but isn't part of the public surface. The two are aliases today, butlibis conventionally treated as private and has been reorganized between pyarrow versions before. - Why: We just spent the last review round normalizing third-party exceptions at the boundary per AGENTS.md — leaning on a private submodule path for the exception types we're catching partly undoes that hardening. If pyarrow ever moves these out of
lib, ourexceptclauses silently stop matching and_export_parquetstarts leaking raw pyarrow errors again. - Suggestion: Swap to the public alias:
except (lazy.pa.ArrowInvalid, lazy.pa.ArrowTypeError) as e: ... except (lazy.pa.ArrowInvalid, ValueError) as e: ...
packages/data-designer/src/data_designer/interface/results.py:223-225 — _export_jsonl trailing-newline guard comment is inaccurate
- What: The comment says "Guard against empty batches where
to_json()omits the trailing newline." Verified locally: in current pandas, an emptyDataFramereturns"\n"fromto_json(orient="records", lines=True), and a non-empty one always ends with"\n". The branch never fires, and the comment describes a behavior that doesn't exist. - Why: A future reader puzzling over this guard will be misled and may end up "fixing" empty-batch handling based on a phantom assumption. Two related sub-issues: (a) the guard is effectively dead code, (b) for empty batches the current code still writes the lone
"\n"to disk, which produces a blank line in the JSONL output for every empty batch — harmless to most parsers, but noisy. - Suggestion: Either drop the guard and the comment entirely, or rephrase to something accurate ("Defensive: pandas has always emitted a trailing newline here, but guard against future versions changing that") and add
if content:so empty batches don't emit a phantom blank line.
packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:153 — num_records parameter is shadowed by count_records()
- What:
num_recordsentersrun_create()as the requested count (used in the header at line 137), then gets reassigned to the actual count returned bycount_records()and reused in the success message at line 173 — same name, two different meanings. - Why: It's a real readability snag in a method that's already 60 lines and threading a couple of related concepts. A future maintainer touching this method has to mentally track that
num_recordsflips meaning halfway through, and it's the kind of thing that's easy to mis-edit (e.g. moving the success message earlier and accidentally printing the requested count instead of the actual count). - Suggestion: Rename the second one —
actual_record_count = results.count_records()— and use that on line 173.
packages/data-designer/src/data_designer/cli/commands/create.py:48 — --dataset-name doesn't influence the export filename
- What: The help text documents
<artifact-path>/<dataset-name>/dataset.<format>, so the literaldatasetfilename is intentional and visible. But for a user runningdata-designer create cfg.yaml -d my_data -f jsonl, gettingmy_data/dataset.jsonlinstead ofmy_data/my_data.jsonlis asymmetric —--dataset-namecontrols the directory but not the file inside it. - Why: This is the user-facing entry point for the feature, so the convention we set here will stick. Users will reasonably expect
--dataset-nameto flow through to the filename, and the current behavior makes the flag feel half-wired. Easier to align it now than to change the convention later once people depend on the path. - Suggestion: Use
<dataset-name>.<format>for the export filename and update the help text to match:Worth bumping the controller test to assert the dataset-name-derived path too.export_path = Path(results.artifact_storage.base_dataset_path) / f"{dataset_name}.{output_format}"
What Looks Good
- Memory safety is now end-to-end —
count_records()via parquet metadata + batch-streamingexport()means both the programmatic and CLI paths are O(one batch) in peak memory. The prior reviewers' OOM concerns were addressed thoroughly, including the subtle CLI-only path that bypassed the streaming export. - Error normalization is on point — wrapping
ArrowInvalid/ArrowTypeError/ValueErrorfrom pyarrow intoInvalidFileFormatErrorat the boundary, and choosingArtifactStorageErrorvsInvalidFileFormatErrorfor the right cases, lines up cleanly with the AGENTS.md "errors normalize at boundaries" invariant.from echaining preserves the original traceback too. SUPPORTED_EXPORT_FORMATSderived fromget_args(ExportFormat)— small detail, but it eliminates the class of bug where theLiteraland tuple drift apart over time.- Test coverage is genuinely thorough — multi-batch streaming for all three formats, schema unification with type drift, incompatible-schema rejection, extension inference (lower + upper case), explicit format override, empty batch dir, unsupported format, plus controller-level happy/sad paths.
stub_batch_diris a clean fixture and reused well. promote_options="permissive"+ cast — handling int64↔float64 drift across batches without raising is exactly the right default for synthetic data, and the dedicated test pins down both the success case and the "cast actually fails" path post-permissive-unification.- Iterative review responsiveness — the commit history reads as a careful pass through every prior review point (error types, lazy imports, click.Choice, double load_dataset, count_records, schema permissive promotion), which made this review easy.
Verdict
Needs changes — nothing architectural, but the items above are worth pulling into this PR rather than punting:
pa.lib→paexception references (one-line swap, undoes a piece of our error-normalization work otherwise)- partial-file cleanup on export failure (real retry footgun)
- the misleading
_export_jsonlcomment (and either drop the dead guard or fix it) - rename the second
num_recordsto a clearer name - decide on the export filename convention (
dataset.<format>vs<dataset-name>.<format>) before this lands
This review was generated by an AI assistant.
- Use public pa.ArrowInvalid/ArrowTypeError instead of pa.lib.* in _export_parquet - Drop dead trailing-newline guard in _export_jsonl; skip empty batches with `if content` - Rename num_records→actual_record_count after count_records() call to avoid shadowing - Unlink partial export file before re-raising on export failure in run_create - Export filename now uses dataset_name (<dataset-name>.<format>) instead of literal "dataset" - Update help text and tests to match new export filename convention
…I flag, fix edge cases C1: drop commit 0bdf24a — remove export() / --output-format from this PR; that feature belongs to NVIDIA-NeMo#540 which has a superior streaming implementation C2: add --resume / -r flag to data-designer create CLI, thread ResumeMode through GenerationController.run_create() into DataDesigner.create() C3: fix already-complete warning text — replace stale "Remove resume=True" with "Use resume=ResumeMode.NEVER" in _build_with_resume and _build_async C4: fix docstrings — ALWAYS does NOT raise when no checkpoint exists (silently restarts from scratch); clarify num_records >= actual semantics C5: sync artifact_storage.resume = NEVER when no-metadata fallback fires so both state holders agree after the downgrade C6: fix return_value=False → _ConfigCompatibility.INCOMPATIBLE in IF_POSSIBLE test; drop 3 direct _find_completed_row_group_ids tests (private API, covered by build()) W1: add logger.warning when builder_config.json is absent (silent COMPATIBLE was footgun) W2: narrow except Exception → (OSError, json.JSONDecodeError, ValidationError) W3: run make check-all-fix — ruff reformatted test_if_possible_starts_fresh_when_directory_is_empty
nabinchha
left a comment
There was a problem hiding this comment.
Thanks @przemekboruta!
Resolved conflicts in CLI create command and tests — kept both --resume (PR NVIDIA-NeMo#526) and --output-format (PR NVIDIA-NeMo#540) parameters throughout create.py, generation_controller.py, and all test files.
#526) * docs: add implementation plan for resume mechanism Fixes #525 * feat(storage): add resume flag and clear_partial_results() - ArtifactStorage gains a `resume: bool = False` field - resolved_dataset_name skips timestamp logic when resume=True, returning the existing dataset folder name as-is - Raises ArtifactStorageError on resume=True when the target folder is absent or empty (no data to resume from) - New clear_partial_results() removes in-flight partial results left over from an interrupted run Fixes #525 * feat(batch-manager): add start_batch param to start() DatasetBatchManager.start() now accepts: - start_batch: int = 0 — first batch index to process - initial_actual_num_records: int = 0 — records already on disk Both default to 0 so all existing call sites are unaffected. Fixes #525 * feat(builder): implement resume logic in DatasetBuilder - build() gains a resume: bool = False parameter - _load_resume_state() reads metadata.json and validates that num_records and buffer_size match the original run - _build_with_resume() skips completed batches, clears in-flight partial results, and continues from the first incomplete batch - Raises DatasetGenerationError with clear messages for: - missing metadata.json (interrupted before first batch completes) - num_records mismatch - buffer_size mismatch - DATA_DESIGNER_ASYNC_ENGINE=1 (not yet supported) - Logs a warning and returns early when dataset is already complete Fixes #525 * feat(interface): expose resume on DataDesigner.create() - create() gains resume: bool = False - _create_resource_provider() passes resume to ArtifactStorage - builder.build() receives the resume flag Fixes #525 * test: add tests for resume mechanism Covers: - ArtifactStorage.resolved_dataset_name with resume=True - ArtifactStorage.clear_partial_results() - DatasetBatchManager.start() with start_batch and initial_actual_num_records - DatasetBuilder.build(resume=True): missing metadata, num_records mismatch, buffer_size mismatch, already-complete detection Fixes #525 * feat(builder): extend resume to async engine (DATA_DESIGNER_ASYNC_ENGINE=1) - Add _find_completed_row_group_ids() to scan parquet-files/ for already-written row groups by parsing batch_*.parquet filenames - _build_async() now accepts resume=True: loads metadata, finds completed row groups, clears partial results, and logs progress; returns early if all row groups are done - _prepare_async_run() accepts skip_row_groups, initial_actual_num_records, and initial_total_num_batches so the scheduler only processes remaining row groups and RowGroupBufferManager starts from the correct counts - RowGroupBufferManager.__init__ gains initial_actual_num_records and initial_total_num_batches params to seed the counters on resume - finalize_row_group closure now writes incremental metadata after each checkpoint so any run (resume or not) can be resumed if interrupted mid-way - Remove the guard that rejected resume=True with DATA_DESIGNER_ASYNC_ENGINE=1 - Add tests for all new paths * fix(builder): skip after-generation processors when resume finds dataset already complete _build_with_resume and _build_async now return False when the dataset is already complete (early-return path), True otherwise. build() skips _processor_runner.run_after_generation() on False, preventing processors from calling shutil.rmtree and rewriting an already-finalized dataset. Fixes the issue raised in review: greptile P1 comment on PR #526. * fix(builder): use filesystem count for initial_total_num_batches on async resume Metadata can lag by one row group if a crash occurs between move_partial_result_to_final_file_path and write_metadata. Using len(completed_ids) from the filesystem scan instead of state.num_completed_batches ensures the final metadata reflects the actual number of parquet files present, not the potentially stale metadata count. * feat(results): add export() method and --output-format CLI flag Adds DatasetCreationResults.export(path, format=) supporting jsonl, csv, and parquet. The CLI create command gains --output-format / -f which writes dataset.<format> alongside the parquet batch files. * fix(builder): handle resume when metadata.json missing (interrupted before first batch) When a run is interrupted before any row group or batch completes, metadata.json is never written. Previously resume=True would raise DatasetGenerationError in this case. Now build() detects the missing file, logs an info message, clears any leftover partial results and falls back to a clean fresh run. This is the common scenario for small datasets (fewer records than buffer_size) where all records fit in a single row group. * docs(interface): fix resume docstring — async engine is supported * fix(builder): derive initial_actual_num_records from filesystem in async resume In the crash window (row group written to disk but write_metadata crashed before updating the file), both initial_total_num_batches and initial_actual_num_records now use the filesystem-discovered completed_ids as source of truth. Previously initial_actual_num_records was read from potentially stale metadata, causing actual_num_records in the final metadata to be undercounted by one row group. Also adds a test covering the partial-resume crash-window scenario. * feat(resume): replace resume: bool with ResumeMode enum (NEVER/ALWAYS/IF_POSSIBLE) - Introduces ResumeMode(StrEnum) in artifact_storage.py for use across all layers - Replaces resume: bool with resume: ResumeMode in DatasetBuilder.build(), DataDesigner.create(), ArtifactStorage, and _build_async() - Adds _check_resume_config_compatibility() using config fingerprints to support IF_POSSIBLE: falls back to a fresh run when config has changed since last run - Relaxes num_records validation from strict equality to num_records >= actual_num_records, allowing dataset extension on resume; buffer_size must still match exactly - Preserves exception chain with 'from exc' on FileNotFoundError in _load_resume_state - Exports ResumeMode from data_designer.interface for users to import - Adds skip_row_groups assertion test and IF_POSSIBLE storage behavior tests * fix(resume): invalidate resolved_dataset_name cache when IF_POSSIBLE downgrades to NEVER ArtifactStorage's Pydantic model validator accesses base_dataset_path at construction time, caching resolved_dataset_name under IF_POSSIBLE semantics before build() can set resume=NEVER. Pop the stale cache entry so the property re-resolves with the correct NEVER semantics (timestamped directory). Also fixes _check_resume_config_compatibility() to use artifact_path/dataset_name directly instead of base_dataset_path, and adds a regression test covering the cache-bypass scenario. * fix(builder): move partial-completion warning before return in _build_async * fix(builder): IF_POSSIBLE now starts fresh when no dataset directory exists _check_resume_config_compatibility returned True when config_path was absent, even when the dataset directory itself didn't exist. This caused IF_POSSIBLE to upgrade to ALWAYS, which then raised ArtifactStorageError on the first-ever run because ALWAYS requires an existing directory. Fix: return False early when the dataset directory is absent. Also sets actual_num_records on mock buffer managers in two async resume tests that started failing after the partial-completion warning block was made reachable. * fix(builder): use original target_num_records in async resume record count When extending a non-aligned run (e.g. original num_records=5, buffer_size=2), the last completed row group has 1 record, not buffer_size=2. Using new num_records in the formula would overcount: min(2, 7-2*2)=2 instead of min(2, 5-2*2)=1. Fix: capture state from _load_resume_state (previously discarded) and pass state.target_num_records into the sum formula. Added target_num_records field to _ResumeState, populated from metadata.json. Test: test_build_async_resume_initial_actual_num_records_uses_original_target * fix(builder): IF_POSSIBLE starts fresh on empty dataset directory Empty directory (crash between mkdir and first file write) was treated as compatible — _check_resume_config_compatibility returned True, IF_POSSIBLE upgraded to ALWAYS, which then raised ArtifactStorageError. Fix: treat empty directory the same as missing — return False from _check_resume_config_compatibility when any(dir.iterdir()) is False. Test: test_if_possible_starts_fresh_when_directory_is_empty * fix(builder): ALWAYS raises DatasetGenerationError on config fingerprint mismatch ResumeMode.ALWAYS was documented to raise when column/model config changed, but _check_resume_config_compatibility() was only called in the IF_POSSIBLE branch. A user resuming with ALWAYS after changing the config would silently mix records from two different configs. Fix: - Refactor _check_resume_config_compatibility() to return _ConfigCompatibility enum (COMPATIBLE / INCOMPATIBLE / NO_PRIOR_DATASET) instead of bool so callers can distinguish 'no prior run' from 'configs differ' - Call the check for both ALWAYS and IF_POSSIBLE before _write_builder_config() - ALWAYS + INCOMPATIBLE → DatasetGenerationError - IF_POSSIBLE + INCOMPATIBLE → silent fresh start (existing behaviour) - IF_POSSIBLE + NO_PRIOR_DATASET → silent fresh start (existing behaviour) Test: test_build_resume_always_raises_on_config_mismatch * fix(resume): address nabinchha review — drop export collision, add CLI flag, fix edge cases C1: drop commit 0bdf24a — remove export() / --output-format from this PR; that feature belongs to #540 which has a superior streaming implementation C2: add --resume / -r flag to data-designer create CLI, thread ResumeMode through GenerationController.run_create() into DataDesigner.create() C3: fix already-complete warning text — replace stale "Remove resume=True" with "Use resume=ResumeMode.NEVER" in _build_with_resume and _build_async C4: fix docstrings — ALWAYS does NOT raise when no checkpoint exists (silently restarts from scratch); clarify num_records >= actual semantics C5: sync artifact_storage.resume = NEVER when no-metadata fallback fires so both state holders agree after the downgrade C6: fix return_value=False → _ConfigCompatibility.INCOMPATIBLE in IF_POSSIBLE test; drop 3 direct _find_completed_row_group_ids tests (private API, covered by build()) W1: add logger.warning when builder_config.json is absent (silent COMPATIBLE was footgun) W2: narrow except Exception → (OSError, json.JSONDecodeError, ValidationError) W3: run make check-all-fix — ruff reformatted test_if_possible_starts_fresh_when_directory_is_empty * fix(builder): replace stdlib StrEnum with project compat shim for Python 3.10 * fix(builder): guard extension row groups in initial_actual_num_records formula on async resume When extending an async run (num_records > state.target_num_records) and a crash occurs after an extension row group is written to disk but before write_metadata, the formula `min(buffer_size, state.target_num_records - rg_id * buffer_size)` yields a negative value for any extension row group (rg_id * buffer_size >= target), making initial_actual_num_records silently undercount. The RowGroupBufferManager then starts at the wrong offset, and the final metadata reports an incorrect actual_num_records with a false partial-completion warning. Fix: use state.target_num_records for original row groups and num_records for extension row groups (guarded by rg_id * buffer_size < state.target_num_records). Covers the scenario with a new regression test. * fix(builder): pre-compute row-group list in _build_async to fix sizes on non-aligned extension resume The partitioning loop in _prepare_async_run decremented remaining by min(buffer_size, remaining) for every row group, including skipped ones. For a non-aligned original run (e.g. target=5, buffer_size=2, last group has 1 record), the loop deducted 2 for the skipped last group, leaving remaining one short. Extension row groups received smaller sizes than intended, so the generated dataset was silently short by the deficit and a false partial-completion warning fired. Fix: pre-compute the full row-group list with correct per-group sizes in _build_async where state.target_num_records is available, then pass it to _prepare_async_run as precomputed_row_groups (replacing the skip_row_groups param). Original groups use min(buffer_size, target - rg*bs); extension groups use min(buffer_size, extension_records - ext_idx*bs). Also updates the skip_row_groups test to assert on precomputed_row_groups and adds a regression test for the non-aligned extension case. * chore: remove stale implementation plan for #525 The plan described the initial resume: bool design which has since been replaced by the full ResumeMode enum (NEVER/ALWAYS/IF_POSSIBLE), async engine support, filesystem reconciliation, and config compatibility checks. The PR description is the authoritative record of what shipped. * fix(engine): fix false 'already complete' when extension fits in last group's slack original_target=5, buffer_size=2 produces 3 groups [2,2,1]. Extending to num_records=6: ceil(6/2)=3 equalled len(completed_ids)=3, triggering the already-complete branch on both the async and sync paths — returning the 5-record dataset silently. Fix (async): replace ceil(num_records/bs) with num_original_groups + ceil(extension_records/bs) so any extension always adds new groups beyond num_original_groups. Fix (sync): add num_records_list param to DatasetBatchManager.start() and pass the correct per-batch sizes in _build_with_resume, giving the batch manager the right total batch count (4 instead of 3 in the example). * fix(engine): raise error when num_records is below original target on resume Prevents negative extension_records in async path which silently truncated the dataset and corrupted metadata without triggering a partial-completion warning. * fix(storage): refresh MediaStorage path after IF_POSSIBLE → NEVER downgrade When build() detected an incompatible config and downgraded resume from IF_POSSIBLE to NEVER, _media_storage.base_path remained bound to the original directory while all other path properties resolved to the new timestamped directory — causing broken image references in image-column runs. * fix(engine): preserve original_target_num_records across extension resume writes After finalize_row_group successfully wrote incremental metadata during an extension run, target_num_records in metadata was updated to the extension target. A subsequent resume would read this as the original target, making _rg_size() incorrect for all row groups and silently corrupting actual_num_records. Stores original_target_num_records as an immutable field in metadata so the original group boundaries are always recoverable regardless of how many incremental writes have occurred. --------- Co-authored-by: Nabin Mulepati <nmulepati@nvidia.com>
Summary
Closes #566
DatasetCreationResults.export(path, format=)supporting jsonl, csv, and parquet formatsDatasetCreationResults.count_records()which reads record counts from Parquet file metadata only (no data pages loaded, no OOM risk)--output-format/-fflag to thedata-designer createCLI command withclick.Choicevalidation; writes<dataset-name>.<format>alongside the parquet batch filesformat=is an optional explicit override (e.g. write a.txtfile as JSONL)SUPPORTED_EXPORT_FORMATSis derived fromget_args(ExportFormat)so the Literal and the tuple cannot drift apartInvalidFileFormatError(consistent with project error conventions)date_format="iso"for consistent datetime serialization across formatsImplementation notes
Streaming export (no OOM risk)
export()never materialises the full dataset in memory. Instead of callingload_dataset(), it reads the individual batch parquet files fromartifact_storage.final_dataset_pathone at a time and appends each to the output file, keeping peak memory proportional to a single batch regardless of dataset size:pyarrow.parquet.ParquetWriteris used to write each batch as a row group into one output file; schemas are unified withpa.unify_schemas(promote_options="permissive")to handle minor numeric type drift (e.g.int64→float64across batches); truly incompatible schemas (different column names) raiseInvalidFileFormatErrorcount_records()(no OOM risk)count_records()reads row counts from Parquet file metadata only — no data pages are loaded:The CLI uses
count_records()instead oflen(load_dataset())to report the final record count, avoiding loading the full dataset into memory just for a count.Format inference
CLI:
data-designer create config.yaml --output-format jsonl data-designer create config.yaml -n 500 -f csv # Output file: <artifact-path>/<dataset-name>/<dataset-name>.<format>Test plan
test_export_writes_file— parametrized over all 3 formatstest_export_jsonl_content— each line is valid JSON, all records presenttest_export_csv_content— single header row, full record counttest_export_parquet_content— DataFrame round-trip across two batchestest_export_infers_format_from_extension— format omitted, inferred from.jsonltest_export_explicit_format_overrides_extension—.txtwritten as JSONLtest_export_parquet_schema_unification— two batches with diverging column types (int64/float64) merged correctly via permissive promotiontest_export_parquet_incompatible_schemas_raises— different column names across batches raisesInvalidFileFormatErrortest_export_uppercase_extension_is_recognised—.JSONLtreated same as.jsonltest_export_unknown_extension_raises— raisesInvalidFileFormatErrortest_export_unsupported_explicit_format_raises— raisesInvalidFileFormatErrortest_export_no_batch_files_raises— raisesArtifactStorageErrortest_export_returns_path_object— returnsPathfor str inputtest_count_records— reads row counts from Parquet metadata, no data pages loadedtest_run_create_with_output_format_happy_path— export called with dataset-name-derived pathtest_run_create_export_failure_exits— export exception cleans up partial file and exits with code 1output_formatparameter