Skip to content

fix: harden resume checkpoint handling#624

Merged
nabinchha merged 9 commits into
mainfrom
fix/resume-hardening-623
May 11, 2026
Merged

fix: harden resume checkpoint handling#624
nabinchha merged 9 commits into
mainfrom
fix/resume-hardening-623

Conversation

@nabinchha

@nabinchha nabinchha commented May 8, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Harden resume checkpoints by persisting config fingerprints in metadata, writing metadata atomically, and rejecting unsafe resume states such as allow_resize=True and already post-processed datasets.
  • Make async resume recover actual persisted row counts from parquet metadata for early-shutdown salvage scenarios.
  • Unify sync + async resume: both engines now derive num_completed_batches and actual_num_records from parquet-files/batch_*.parquet via _recover_progress_from_disk. metadata.json keeps describing the run configuration (buffer_size, target_num_records, original_target_num_records, config fingerprint); the filesystem is the source of truth for progress. The sync engine additionally rejects non-contiguous batch IDs.
  • Document the unified resume strategy in architecture and Fern docs.
  • Document DatasetCreationResults observability scope on resume: methods that read the artifact directory (load_dataset, count_records, load_analysis, export, push_to_hub) reflect the full on-disk dataset, while task_traces, model-usage logs, and telemetry events are scoped to the current invocation only.
  • Address review feedback: collapse the unreachable IF_POSSIBLE guard in _post_generation_processed_resume_result, split the post-processed raise so num_records < prior_target and the extension case get accurate messages, and drop the dead _find_completed_row_group_ids helper.

Fixes #623

Test plan

  • uv run pytest packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py -k "resume or allow_resize or post_generation or non_contiguous or recovers_progress"
  • uv run pytest packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py -k "metadata"
  • uv run pytest packages/data-designer/tests/interface/test_data_designer.py packages/data-designer/tests/interface/test_results.py
  • uv run pytest packages/data-designer-engine/tests/ (full engine suite)
  • uv run ruff format --check <touched files>
  • uv run ruff check <touched files>
  • fern check (not run: Fern CLI is not installed in this environment)

Persist config identity in metadata, make checkpoints atomic, and reject unsafe resume states so interrupted runs do not mix incompatible or post-processed data.
@nabinchha nabinchha requested a review from a team as a code owner May 8, 2026 22:10
@github-actions

github-actions Bot commented May 8, 2026

Copy link
Copy Markdown
Contributor

PR #624 Review — fix: harden resume checkpoint handling

Summary

This PR hardens the resume checkpoint pipeline to address several crash-recovery and correctness issues:

  • Atomic metadata writesmetadata.json is now written via tmp file + fsync + os.replace, so a crash mid-write cannot leave a partially written checkpoint.
  • Config-fingerprint in metadata — every metadata write is stamped with config_hash, config_hash_algo, and config_hash_version. Resume compatibility checks read metadata.json first and fall back to builder_config.json only for older artifacts.
  • Single-writer lock — a new ArtifactStorage.dataset_lock() context manager creates an exclusive .data_designer.lock file (O_CREAT | O_EXCL) for the life of a run so two processes cannot write to the same dataset directory.
  • Unsafe-resume rejection — resume now fails fast for allow_resize=True columns (row boundaries are non-deterministic) and for datasets that already completed process_after_generation() (schema/row-count may have changed).
  • Real parquet row counts on resume — the async engine scans parquet-files/batch_*.parquet and reads parquet metadata to recover the actual persisted row count per row-group, which matters for partial-salvage scenarios.
  • Better error on corrupt metadata — a json.JSONDecodeError while loading metadata.json now raises a dedicated DatasetGenerationError instead of being silently treated as "assume compatible."
  • Docs updated in architecture/dataset-builders.md, the Fern docs (architecture-and-performance.mdx, processors.mdx).

Findings

🟠 Bug — outer lock caches resolved_dataset_name prematurely

In data_designer.py:268+, create() now acquires the lock before calling builder.build():

with builder.artifact_storage.dataset_lock():
    builder.build(num_records=num_records, resume=resume)

dataset_lock() calls mkdir_if_needed(self.base_dataset_path) (artifact_storage.py:252), which accesses base_dataset_path → the cached_property resolved_dataset_name. This is the exact premature-caching the existing _check_resume_config_compatibility comment warns against ("must NOT access base_dataset_path … which would cache resolved_dataset_name prematurely").

Inside build(), when resume == IF_POSSIBLE and the stored config fingerprint is incompatible, the code pops the cache and re-resolves to a new timestamped directory:

self.artifact_storage.resume = ResumeMode.NEVER
self.artifact_storage.__dict__.pop("resolved_dataset_name", None)

After this switch, base_dataset_path now points at foo_MM-DD-YYYY_HHMMSS/ — but the outer lock file was already created in the original foo/ directory. The new timestamped directory is unlocked, and the lock file in foo/ protects a directory the run never writes to.

In practice two concurrent runs will race to unique timestamps and not collide, so this is unlikely to cause data corruption today. But the lock semantics are broken in the IF_POSSIBLE + incompatible path, which is the single most interesting case for a lock to protect (both runs are targeting the same dataset_name). Worth fixing.

Suggested fix: either (a) move the outer lock inside build() after the compat-check / mode switch and re-acquire a narrower lock around profiling, or (b) expose a helper that resolves the final directory without creating it, lock that, and pass the path through. Option (a) is probably simpler — the inner dataset_lock() already exists.

🟡 Redundant nested lock when called from DataDesigner.create()

With the outer lock in data_designer.py plus the inner dataset_lock() inside DatasetBuilder.build(), the inner lock is effectively a no-op re-entry through _lock_depth in the common path. It only matters when build() is called directly (tests, future callers). That's fine, but the two-layer locking is easy to misread. A comment at either site pointing out the re-entry contract would save the next reader some time.

🟡 _lock_depth counter is not thread-safe

_lock_depth is a bare int; increments/decrements are not atomic. Today DataDesigner runs single-threaded control flow, so this doesn't matter — but if the same ArtifactStorage is ever shared across threads (e.g. a future profiler that reads metadata from a worker thread), re-entrant locks could desynchronize. Worth a one-line docstring note that dataset_lock() is re-entrant per-process but not thread-safe.

🟡 Stale lock requires manual cleanup

dataset_lock() relies on unlink(missing_ok=True) in the finally block. If the process is SIGKILLed or the host crashes, the lock file remains and the next run fails with the "remove the stale lock if no process is active" message. The PID is written to the file, so a future enhancement could auto-detect staleness (e.g. psutil.pid_exists), but the current message is clear and actionable — flagging only as follow-up, not a blocker.

🟡 post_generation_processed flag is only set if there are AFTER_GENERATION processors

if self._processor_runner.has_processors_for(ProcessorStage.AFTER_GENERATION):
    self.artifact_storage.update_metadata({"post_generation_processed": True})

If a config has no AFTER_GENERATION processors, the flag is never set, so _post_generation_processed_resume_result never fires. That's correct (nothing was applied). But if a user re-runs the same dataset with a newly added AFTER_GENERATION processor, that processor will run on the completed dataset — which is what the PR description says is intended ("can change row counts, schemas, row-group boundaries"). Please confirm this is the desired behavior; a docstring or test pinning this contract would prevent future regressions.

🟢 _post_generation_processed_resume_result early-return bypasses task_traces initialization

When this method returns early, build() returns the final dataset path without entering the inner with block. data_designer.py:create() then accesses builder.task_traces, builder.actual_num_records, builder.first_non_retryable_error, and builder.early_shutdown on the builder. These need to be initialized to safe defaults (empty list, -1, None, False) before build() is called for the early-return path to work end-to-end. Worth adding an integration-style test that drives the whole DataDesigner.create() flow against a completed+post-processed dataset (the current test only patches _initialize_generators_and_graph on the builder).

🟢 read_metadata() used in two places, different exception-handling contracts

  • _load_resume_state (dataset_builder.py:402) now catches json.JSONDecodeError and raises DatasetGenerationError with "metadata.json is corrupt."
  • _post_generation_processed_resume_result catches (FileNotFoundError, json.JSONDecodeError) and silently returns None.

Catching JSONDecodeError silently in the post-generation check means a corrupt metadata slips past this gate and is only caught later in _load_resume_state. That's probably fine (the real resume path hits it), but it's asymmetric with the compat-check path which raises on JSONDecodeError. Consider routing all corrupt-metadata reads through a single helper so the behavior is consistent.

🟢 Style nit — typing.Iterator

artifact_storage.py imports Iterator from typing. The codebase already uses modern typing elsewhere; collections.abc.Iterator is preferred for runtime-usable generic aliases on 3.9+. Ruff's UP035 should flag this if it's in the rule set — not worth blocking on, but easy to fix.

🟢 Tests are solid

The new tests cover the important invariants:

  • test_resume_rejects_allow_resize_columns
  • test_build_resume_raises_on_corrupt_metadata
  • test_build_resume_post_generation_processed_same_target_returns_existing_path
  • test_build_resume_post_generation_processed_extension_raises
  • test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group
  • test_artifact_storage_write_metadata_includes_defaults
  • test_artifact_storage_dataset_lock_rejects_second_writer

The parquet fixtures were upgraded from empty text files to real parquet with row counts, which is necessary for the new read_metadata(p).num_rows path. Good hygiene.

Missing coverage:

  • End-to-end DataDesigner.create() flow against a completed+post-processed dataset (sanity-check for the early-return path through builder attributes).
  • Lock behavior across the IF_POSSIBLE + incompatible-config mode switch (this is exactly the bug in the first finding).
  • Thread-reentrancy behavior of _lock_depth, if the contract is "single-threaded" that should be documented/tested.

Docs

  • architecture/dataset-builders.md — new "Resume Checkpointing" section is accurate and matches the code.
  • fern/.../architecture-and-performance.mdx — clean summary of resume modes and invariants.
  • fern/.../processors.mdx — the new warning about process_after_generation() being terminal is clear.

Verdict

Request changes (minor) before merge. The PR is a clear improvement in crash-recovery hardness, and most of the design is well-considered. Please address the outer-lock premature-caching issue (the IF_POSSIBLE + incompatible path leaves the run unlocked) before merging — that's the only finding I'd treat as blocking. The rest are follow-ups or style nits.

@greptile-apps

greptile-apps Bot commented May 8, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR hardens the resume/checkpoint system for both the sync and async dataset-builder engines. It unifies progress recovery — both engines now scan parquet-files/batch_*.parquet for actual row counts instead of reading stale metadata counters — and closes the narrow crash window where a complete dataset could silently skip process_after_generation.

  • Atomic metadata writes replace the previous in-place json.dump: a tmp-file + fsync + os.replace sequence prevents corrupt checkpoints on crash mid-write, and config fingerprints are injected into every write via _metadata_defaults so compatibility checks no longer require deserializing builder_config.json.
  • Unified _recover_progress_from_disk now drives both engines; the sync engine additionally enforces contiguous batch IDs and rejects allow_resize=True columns, while the async engine tolerates out-of-order completion.
  • _post_generation_processed_resume_result intercepts the terminal post-processed state before any generation begins, surfacing clear DatasetGenerationError messages for the "already complete", "smaller target", and "extension" cases.

Confidence Score: 5/5

Safe to merge — all resume paths are guarded, tested, and consistent between sync and async engines.

The changes are well-scoped and each new invariant (atomic writes, filesystem-as-truth for progress, post-processed terminal state, allow_resize rejection, non-contiguous ID rejection) is covered by a dedicated test. The normalization of IF_POSSIBLE to ALWAYS/NEVER happens before _post_generation_processed_resume_result and the allow_resize guard, keeping the ordering consistent. The atomic write correctly handles every branch in the finally unlink. No logic errors or correctness gaps were found.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py Adds atomic write (tmp+fsync+os.replace), metadata defaults injection, and a PID-tagged temp filename; logic is correct and the finally unlink is safely a no-op after a successful rename.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Introduces unified _recover_progress_from_disk, _post_generation_processed_resume_result, and _has_allow_resize_columns; after-generation now runs unconditionally (no longer gated on generated); ordering of guards in build() is consistent with the normalization of IF_POSSIBLE earlier in the same function.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py Rewrites _write_parquet_files to create real parquet files with configurable per-group row counts, adds coverage for all new resume paths (post-processed terminal state, non-contiguous IDs, corrupt metadata, crash-window recovery, partial row-group salvage).
packages/data-designer-engine/tests/engine/storage/test_artifact_storage.py Adds test_artifact_storage_write_metadata_includes_defaults verifying that _metadata_defaults are merged into every write; straightforward and correct.
packages/data-designer/src/data_designer/interface/data_designer.py Splits the single try/except into two separate blocks around _create_dataset_builder and builder.build() so that DeprecationWarning from the engine propagates correctly under strict warning filters.
packages/data-designer/src/data_designer/interface/results.py Documentation-only addition to DatasetCreationResults clarifying the observability scope on resume; no logic changes.
architecture/dataset-builders.md New Resume Checkpointing section accurately documents the unified resume strategy, dual source-of-truth split, and invariants.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["build(num_records, resume)"] --> B{resume mode?}
    B -- IF_POSSIBLE --> C["_check_resume_config_compatibility()"]
    C -- COMPATIBLE --> D["resume = ALWAYS"]
    C -- INCOMPATIBLE/NO_PRIOR --> E["resume = NEVER"]
    B -- ALWAYS --> F
    B -- NEVER --> F
    D --> F["_set_metadata_defaults()\n(inject config fingerprint into every write)"]
    E --> F
    F --> G["_post_generation_processed_resume_result()"]
    G -- "post_gen_started → raise\nnot post_gen processed → None" --> H["_write_builder_config()"]
    G -- "post_gen complete + same target → return final_dataset_path" --> Z["✅ no-op return"]
    G -- "post_gen complete + wrong target → raise" --> ERR["🛑 DatasetGenerationError"]
    H --> I{allow_resize?}
    I -- "Yes + ALWAYS → raise" --> ERR
    I -- No --> J["_resolve_async_compatibility()"]
    J --> K{engine?}
    K -- async --> L["_build_async()"]
    K -- sync + ALWAYS --> M["_build_with_resume()"]
    K -- sync + NEVER --> N["_build fresh batches"]
    L --> O["_load_resume_state()\n→ _recover_progress_from_disk(allow_holes=True)\n→ _find_completed_row_groups()"]
    M --> O
    O --> P{all batches done?}
    P -- Yes --> Q["log 'already complete', return"]
    P -- No --> R["generate remaining batches"]
    Q --> S
    R --> S["has_after_generation_processors?"]
    S -- No --> T["log_model_usage → return final_path"]
    S -- Yes --> U["update_metadata(started)\nrun_after_generation()\nupdate_metadata(complete)"]
    U --> T
Loading

Reviews (8): Last reviewed commit: "Merge branch 'main' into fix/resume-hard..." | Re-trigger Greptile

Comment thread packages/data-designer/src/data_designer/interface/data_designer.py Outdated
nabinchha added 4 commits May 8, 2026 16:35
Let IF_POSSIBLE start fresh for resize configs and mark after-generation processing before mutation so interrupted processors cannot be resumed unsafely.
Single-user CLI/notebook flows don't race on the artifact directory, and
the timestamped-directory fallback already handles the "ran it twice"
case. The lock added complexity (re-entrancy, stale cleanup, the
cached-property trap where IF_POSSIBLE→NEVER moves writes to a
timestamped directory while the lock stays pinned to the original) for
no real protection. Atomic metadata writes still cover the actual hazard
(crash mid-write).

Also fix a pre-existing test bug in
test_initial_actual_num_records_uses_actual_parquet_rows_for_partial_row_group
where the mocked scheduler hit the partial-completion path with
unconfigured Mock attributes.
* Drop the unreachable ResumeMode.IF_POSSIBLE branch in
  _post_generation_processed_resume_result. By the time this helper
  runs, build() has normalised IF_POSSIBLE to ALWAYS or NEVER, so the
  guard now matches reality. Tighten the docstring to document the
  three outcomes (no-op return / fall through / raise).

* Split the post-processed extension/raise into two cases. When
  num_records < prior_target the user just asked for fewer records than
  already exist; the previous "would mix pre- and post-processor
  records" message only describes the extension case. Mirror the
  wording used by _load_resume_state and add a regression test.

* Remove the dead _find_completed_row_group_ids wrapper now that
  _build_async uses _find_completed_row_groups directly. Rename the
  related test to match.
Both engines now derive `num_completed_batches` and `actual_num_records`
from `parquet-files/batch_*.parquet` via `_recover_progress_from_disk`.
`metadata.json` keeps describing the run *configuration* (`buffer_size`,
`target_num_records`, `original_target_num_records`, config fingerprint),
while the filesystem is the source of truth for *progress*. This closes
the sync engine's race window between `move_partial_result_to_final_file_path`
and the metadata write that follows it, matching the crash-recovery the
async engine already had.

The sync engine additionally rejects non-contiguous batch IDs (a hole can
only mean external mutation or a directory written by an incompatible
engine); the async engine continues to tolerate gaps from out-of-order
completion via `allow_holes=True`.

Existing sync resume tests now seed parquet files alongside metadata,
and two new tests cover the unified behaviour: filesystem progress wins
when metadata lags, and sync rejects non-contiguous IDs.
nabinchha added 2 commits May 8, 2026 17:38
`load_dataset`, `count_records`, `load_analysis`, `export`, and `push_to_hub`
all read from the artifact directory, so they reflect the cumulative dataset
(original + resume rows). `task_traces`, model-usage logs, and telemetry
events are scoped to the current invocation only because the original run's
in-memory state is not persisted. Document this in the class docstring,
the architecture note, and the Fern resume guide.
Future readers were puzzled by the ``except DeprecationWarning: raise``
short-circuits before the generic generation-error wrappers. Add a
comment in ``create()`` (with a back-reference from ``preview()``) to
record that strict warning filters (``pytest.warns``,
``-W error::DeprecationWarning``) turn the engine's
``warnings.warn(..., DeprecationWarning)`` calls — most notably the
``allow_resize=True`` deprecation in ``_resolve_async_compatibility`` —
into raised exceptions, and we want them to surface untouched instead of
being swallowed by ``DataDesignerGenerationError``.
@johnnygreco

Copy link
Copy Markdown
Contributor

Thanks for putting this together, @nabinchha!

Summary

This PR hardens resume checkpoints by making metadata writes atomic, persisting config fingerprints, deriving resume progress from parquet files, and treating after-generation-processed datasets as terminal for resume. The implementation matches the stated intent, and the docs updates line up with the new resume model.

Findings

No findings.

What Looks Good

  • The split between metadata as configuration state and parquet files as progress state is clear and closes the stale-metadata crash window.
  • The post-generation processor guard is conservative in the right way: it marks the risky rewrite phase before mutation and avoids re-running processors on already-complete resumed datasets.
  • The regression coverage is strong around config compatibility, missing/corrupt metadata, non-contiguous sync batches, async row counts from parquet metadata, and allow-resize rejection.

Verdict

Ship it. I verified the changed Python files with uv run ruff check and uv run ruff format --check, plus the targeted engine resume/storage tests and the touched interface tests. I could not run fern check because the Fern CLI is not installed in this environment.


This review was generated by an AI assistant.

johnnygreco
johnnygreco previously approved these changes May 11, 2026
self.batch_manager.finish()

if generated:
has_after_generation_processors = self._processor_runner.has_processors_for(ProcessorStage.AFTER_GENERATION)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is still a crash window just before this marker is written. If all row-group parquet files are already on disk but post_generation_state has not been persisted yet, the next resume=ALWAYS returns "already complete", sets generated=False, and skips process_after_generation() entirely. Codex reproduced this with a smoke test by seeding complete parquet files plus metadata with no post-generation marker, then attaching an after-generation processor. Maybe write a pending marker before row generation can become complete, and have resume either finish pending after-generation work or reject ambiguous legacy checkpoints.

@nabinchha nabinchha May 11, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — confirmed. There is a crash window between the final row-group parquet write and the post_generation_state="started" marker write: in that window every row group is on disk, no marker is in metadata, and the on-disk parquet files are still clean (no after-generation processor has touched them). The previous if generated: guard meant the next resume=ALWAYS saw "already complete", set generated=False, and skipped process_after_generation() entirely.

Fixed in 93d47e5 by removing the if generated: guard so after-generation runs unconditionally on the on-disk dataset whenever after-generation processors are configured. The existing short-circuits in _post_generation_processed_resume_result still cover the unsafe directions:

  • post_generation_state == "complete" / post_generation_processed=True → return early (already done).
  • post_generation_state == "started" → raise (crashed mid-rewrite, parquet files may be partially post-processed).

So the unconditional run only fires when after-generation has demonstrably not been applied to the dataset on disk yet.

I went with the "finish pending after-generation work" leg of your suggestion rather than the "reject ambiguous legacy checkpoints" leg, since the latter would break users who completed a dataset and later attached an after-generation processor (the same intended behaviour the PR description and Greptile review both call out).

Added test_build_resume_complete_dataset_runs_after_generation_when_no_marker to lock in the new behaviour. Architecture doc and the Fern resume invariants now describe both crash windows explicitly.

@andreatgretel andreatgretel left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting changes for the after-generation resume crash window and typed metadata error noted inline.

…on resume

Address review feedback on resume hardening:

* Run after-generation processors unconditionally on the on-disk dataset
  rather than gating on the generation return value. The previous gate
  silently skipped after-generation when resume saw every row group
  already on disk, leaving a crash window between the final parquet write
  and the ``post_generation_state="started"`` marker write: in that
  window the dataset is complete but after-generation never ran, and the
  on-disk parquet files are still clean. The "started" short-circuit
  still rejects the other direction (crashed mid-rewrite, ambiguous
  state), so resume only re-runs after-generation when it is safe to do
  so.

* Raise ``DatasetGenerationError`` (instead of letting a raw
  ``TypeError`` leak out of ``num_records < prior_target``) when a
  post-processed dataset's metadata is missing ``target_num_records``.
  Mirrors the wording used by ``_load_resume_state``.

* Document the new behaviour in ``architecture/dataset-builders.md`` and
  the Fern resume invariants.

Tests:

* ``test_build_resume_complete_dataset_runs_after_generation_when_no_marker``
  covers the closed crash window via the public ``set_processor_runner``
  API.
* ``test_build_resume_post_generation_processed_missing_target_raises_clearly``
  covers the typed-error gap.

@andreatgretel andreatgretel left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the review comments. I re-ran the focused resume/post-generation tests and smoke checks on 93d47e53, and the two issues I flagged are fixed.

Codex did flag one remaining edge case: a legacy markerless dataset that had already completed process_after_generation() could have after-generation run again if someone now adds resume=ALWAYS against that existing artifact. I think that is not a big deal in practice since resume is very new and this only affects older completed artifacts without the new post-generation metadata markers.

Approving!

@nabinchha nabinchha merged commit bbcd7d3 into main May 11, 2026
49 checks passed
@nabinchha nabinchha deleted the fix/resume-hardening-623 branch May 11, 2026 17:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Harden resume checkpointing and edge cases

3 participants