fix: harden resume checkpoint handling#624
Conversation
Persist config identity in metadata, make checkpoints atomic, and reject unsafe resume states so interrupted runs do not mix incompatible or post-processed data.
PR #624 Review —
|
Greptile SummaryThis PR hardens the resume/checkpoint system for both the sync and async dataset-builder engines. It unifies progress recovery — both engines now scan
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py | Adds atomic write (tmp+fsync+os.replace), metadata defaults injection, and a PID-tagged temp filename; logic is correct and the finally unlink is safely a no-op after a successful rename. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Introduces unified _recover_progress_from_disk, _post_generation_processed_resume_result, and _has_allow_resize_columns; after-generation now runs unconditionally (no longer gated on generated); ordering of guards in build() is consistent with the normalization of IF_POSSIBLE earlier in the same function. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py | Rewrites _write_parquet_files to create real parquet files with configurable per-group row counts, adds coverage for all new resume paths (post-processed terminal state, non-contiguous IDs, corrupt metadata, crash-window recovery, partial row-group salvage). |
| packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py | Adds test_artifact_storage_write_metadata_includes_defaults verifying that _metadata_defaults are merged into every write; straightforward and correct. |
| packages/data-designer/src/data_designer/interface/data_designer.py | Splits the single try/except into two separate blocks around _create_dataset_builder and builder.build() so that DeprecationWarning from the engine propagates correctly under strict warning filters. |
| packages/data-designer/src/data_designer/interface/results.py | Documentation-only addition to DatasetCreationResults clarifying the observability scope on resume; no logic changes. |
| architecture/dataset-builders.md | New Resume Checkpointing section accurately documents the unified resume strategy, dual source-of-truth split, and invariants. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["build(num_records, resume)"] --> B{resume mode?}
B -- IF_POSSIBLE --> C["_check_resume_config_compatibility()"]
C -- COMPATIBLE --> D["resume = ALWAYS"]
C -- INCOMPATIBLE/NO_PRIOR --> E["resume = NEVER"]
B -- ALWAYS --> F
B -- NEVER --> F
D --> F["_set_metadata_defaults()\n(inject config fingerprint into every write)"]
E --> F
F --> G["_post_generation_processed_resume_result()"]
G -- "post_gen_started → raise\nnot post_gen processed → None" --> H["_write_builder_config()"]
G -- "post_gen complete + same target → return final_dataset_path" --> Z["✅ no-op return"]
G -- "post_gen complete + wrong target → raise" --> ERR["🛑 DatasetGenerationError"]
H --> I{allow_resize?}
I -- "Yes + ALWAYS → raise" --> ERR
I -- No --> J["_resolve_async_compatibility()"]
J --> K{engine?}
K -- async --> L["_build_async()"]
K -- sync + ALWAYS --> M["_build_with_resume()"]
K -- sync + NEVER --> N["_build fresh batches"]
L --> O["_load_resume_state()\n→ _recover_progress_from_disk(allow_holes=True)\n→ _find_completed_row_groups()"]
M --> O
O --> P{all batches done?}
P -- Yes --> Q["log 'already complete', return"]
P -- No --> R["generate remaining batches"]
Q --> S
R --> S["has_after_generation_processors?"]
S -- No --> T["log_model_usage → return final_path"]
S -- Yes --> U["update_metadata(started)\nrun_after_generation()\nupdate_metadata(complete)"]
U --> T
Reviews (8): Last reviewed commit: "Merge branch 'main' into fix/resume-hard..." | Re-trigger Greptile
Let IF_POSSIBLE start fresh for resize configs and mark after-generation processing before mutation so interrupted processors cannot be resumed unsafely.
Single-user CLI/notebook flows don't race on the artifact directory, and the timestamped-directory fallback already handles the "ran it twice" case. The lock added complexity (re-entrancy, stale cleanup, the cached-property trap where IF_POSSIBLE→NEVER moves writes to a timestamped directory while the lock stays pinned to the original) for no real protection. Atomic metadata writes still cover the actual hazard (crash mid-write). Also fix a pre-existing test bug in test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group where the mocked scheduler hit the partial-completion path with unconfigured Mock attributes.
* Drop the unreachable ResumeMode.IF_POSSIBLE branch in _post_generation_processed_resume_result. By the time this helper runs, build() has normalised IF_POSSIBLE to ALWAYS or NEVER, so the guard now matches reality. Tighten the docstring to document the three outcomes (no-op return / fall through / raise). * Split the post-processed extension/raise into two cases. When num_records < prior_target the user just asked for fewer records than already exist; the previous "would mix pre- and post-processor records" message only describes the extension case. Mirror the wording used by _load_resume_state and add a regression test. * Remove the dead _find_completed_row_group_ids wrapper now that _build_async uses _find_completed_row_groups directly. Rename the related test to match.
Both engines now derive `num_completed_batches` and `actual_num_records` from `parquet-files/batch_*.parquet` via `_recover_progress_from_disk`. `metadata.json` keeps describing the run *configuration* (`buffer_size`, `target_num_records`, `original_target_num_records`, config fingerprint), while the filesystem is the source of truth for *progress*. This closes the sync engine's race window between `move_partial_result_to_final_file_path` and the metadata write that follows it, matching the crash-recovery the async engine already had. The sync engine additionally rejects non-contiguous batch IDs (a hole can only mean external mutation or a directory written by an incompatible engine); the async engine continues to tolerate gaps from out-of-order completion via `allow_holes=True`. Existing sync resume tests now seed parquet files alongside metadata, and two new tests cover the unified behaviour: filesystem progress wins when metadata lags, and sync rejects non-contiguous IDs.
`load_dataset`, `count_records`, `load_analysis`, `export`, and `push_to_hub` all read from the artifact directory, so they reflect the cumulative dataset (original + resume rows). `task_traces`, model-usage logs, and telemetry events are scoped to the current invocation only because the original run's in-memory state is not persisted. Document this in the class docstring, the architecture note, and the Fern resume guide.
Future readers were puzzled by the ``except DeprecationWarning: raise`` short-circuits before the generic generation-error wrappers. Add a comment in ``create()`` (with a back-reference from ``preview()``) to record that strict warning filters (``pytest.warns``, ``-W error::DeprecationWarning``) turn the engine's ``warnings.warn(..., DeprecationWarning)`` calls — most notably the ``allow_resize=True`` deprecation in ``_resolve_async_compatibility`` — into raised exceptions, and we want them to surface untouched instead of being swallowed by ``DataDesignerGenerationError``.
|
Thanks for putting this together, @nabinchha! SummaryThis PR hardens resume checkpoints by making metadata writes atomic, persisting config fingerprints, deriving resume progress from parquet files, and treating after-generation-processed datasets as terminal for resume. The implementation matches the stated intent, and the docs updates line up with the new resume model. FindingsNo findings. What Looks Good
VerdictShip it. I verified the changed Python files with This review was generated by an AI assistant. |
| self.batch_manager.finish() | ||
|
|
||
| if generated: | ||
| has_after_generation_processors = self._processor_runner.has_processors_for(ProcessorStage.AFTER_GENERATION) |
There was a problem hiding this comment.
I think there is still a crash window just before this marker is written. If all row-group parquet files are already on disk but post_generation_state has not been persisted yet, the next resume=ALWAYS returns "already complete", sets generated=False, and skips process_after_generation() entirely. Codex reproduced this with a smoke test by seeding complete parquet files plus metadata with no post-generation marker, then attaching an after-generation processor. Maybe write a pending marker before row generation can become complete, and have resume either finish pending after-generation work or reject ambiguous legacy checkpoints.
There was a problem hiding this comment.
Good catch — confirmed. There is a crash window between the final row-group parquet write and the post_generation_state="started" marker write: in that window every row group is on disk, no marker is in metadata, and the on-disk parquet files are still clean (no after-generation processor has touched them). The previous if generated: guard meant the next resume=ALWAYS saw "already complete", set generated=False, and skipped process_after_generation() entirely.
Fixed in 93d47e5 by removing the if generated: guard so after-generation runs unconditionally on the on-disk dataset whenever after-generation processors are configured. The existing short-circuits in _post_generation_processed_resume_result still cover the unsafe directions:
post_generation_state == "complete"/post_generation_processed=True→ return early (already done).post_generation_state == "started"→ raise (crashed mid-rewrite, parquet files may be partially post-processed).
So the unconditional run only fires when after-generation has demonstrably not been applied to the dataset on disk yet.
I went with the "finish pending after-generation work" leg of your suggestion rather than the "reject ambiguous legacy checkpoints" leg, since the latter would break users who completed a dataset and later attached an after-generation processor (the same intended behaviour the PR description and Greptile review both call out).
Added test_build_resume_complete_dataset_runs_after_generation_when_no_marker to lock in the new behaviour. Architecture doc and the Fern resume invariants now describe both crash windows explicitly.
andreatgretel
left a comment
There was a problem hiding this comment.
Requesting changes for the after-generation resume crash window and typed metadata error noted inline.
…on resume Address review feedback on resume hardening: * Run after-generation processors unconditionally on the on-disk dataset rather than gating on the generation return value. The previous gate silently skipped after-generation when resume saw every row group already on disk, leaving a crash window between the final parquet write and the ``post_generation_state="started"`` marker write: in that window the dataset is complete but after-generation never ran, and the on-disk parquet files are still clean. The "started" short-circuit still rejects the other direction (crashed mid-rewrite, ambiguous state), so resume only re-runs after-generation when it is safe to do so. * Raise ``DatasetGenerationError`` (instead of letting a raw ``TypeError`` leak out of ``num_records < prior_target``) when a post-processed dataset's metadata is missing ``target_num_records``. Mirrors the wording used by ``_load_resume_state``. * Document the new behaviour in ``architecture/dataset-builders.md`` and the Fern resume invariants. Tests: * ``test_build_resume_complete_dataset_runs_after_generation_when_no_marker`` covers the closed crash window via the public ``set_processor_runner`` API. * ``test_build_resume_post_generation_processed_missing_target_raises_clearly`` covers the typed-error gap.
andreatgretel
left a comment
There was a problem hiding this comment.
Thanks for addressing the review comments. I re-ran the focused resume/post-generation tests and smoke checks on 93d47e53, and the two issues I flagged are fixed.
Codex did flag one remaining edge case: a legacy markerless dataset that had already completed process_after_generation() could have after-generation run again if someone now adds resume=ALWAYS against that existing artifact. I think that is not a big deal in practice since resume is very new and this only affects older completed artifacts without the new post-generation metadata markers.
Approving!
Summary
allow_resize=Trueand already post-processed datasets.num_completed_batchesandactual_num_recordsfromparquet-files/batch_*.parquetvia_recover_progress_from_disk.metadata.jsonkeeps describing the run configuration (buffer_size,target_num_records,original_target_num_records, config fingerprint); the filesystem is the source of truth for progress. The sync engine additionally rejects non-contiguous batch IDs.DatasetCreationResultsobservability scope on resume: methods that read the artifact directory (load_dataset,count_records,load_analysis,export,push_to_hub) reflect the full on-disk dataset, whiletask_traces, model-usage logs, and telemetry events are scoped to the current invocation only.IF_POSSIBLEguard in_post_generation_processed_resume_result, split the post-processed raise sonum_records < prior_targetand the extension case get accurate messages, and drop the dead_find_completed_row_group_idshelper.Fixes #623
Test plan
uv run pytest packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py -k "resume or allow_resize or post_generation or non_contiguous or recovers_progress"uv run pytest packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py -k "metadata"uv run pytest packages/data-designer/tests/interface/test_data_designer.py packages/data-designer/tests/interface/test_results.pyuv run pytest packages/data-designer-engine/tests/(full engine suite)uv run ruff format --check <touched files>uv run ruff check <touched files>fern check(not run: Fern CLI is not installed in this environment)