Skip to content

docs: add plan for workflow chaining#552

Merged
andreatgretel merged 15 commits into
mainfrom
andreatgretel/docs/workflow-chaining
May 12, 2026
Merged

docs: add plan for workflow chaining#552
andreatgretel merged 15 commits into
mainfrom
andreatgretel/docs/workflow-chaining

Conversation

@andreatgretel

@andreatgretel andreatgretel commented Apr 15, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Adds a design plan for workflow chaining - sequencing multiple generation stages where each stage's output seeds the next.
  • Primary use cases: explode (few seeds -> many records), filter-then-enrich, generate-then-judge, multi-turn construction.
  • Secondary benefit: enables full removal of allow_resize and simplification of sync/async engine convergence (deprecation already shipped in chore: async engine readiness - blockers and polish before default #553).

What's in the plan

  • Pipeline class in the interface layer with add_stage(), run(), between-stage callbacks. Reuses the parent DataDesigner so all stages share one ModelRegistry / ThrottleManager.
  • to_config_builder() convenience on results for lightweight notebook chaining.
  • On-disk handoffs between stages via LocalFileSeedSource. In-memory DataFrameSeedSource is reserved for the to_config_builder() notebook ergonomic and is explicitly not a Pipeline.
  • DAG-shaped internal stage model. v1 accepts linear inputs only; Phase 4 adds parallel branches as an additive API change (depends_on=[...]).
  • acreate() engine sidecar. Small additive async API on DataDesigner. Independent of chaining v1; hard dependency for Phase 4. Enables in-process parallel-independent workflows via asyncio.gather.
  • allow_resize removal following the deprecation already in main from chore: async engine readiness - blockers and polish before default #553.
  • Pre-batch processor resize lockdown (fail-fast on row-count changes).
  • Stage-level checkpointing and resume using DataDesignerConfig.fingerprint() (feat(config): add deterministic fingerprint for workflow configs #587) composed with num_records, DD version, and upstream stage fingerprint.

Phases

  1. Phase 1: Pipeline class + to_config_builder() (can ship independently).
  2. Sidecar: acreate() on DataDesigner (independent track; can land before/alongside/after Phase 1).
  3. Phase 2: Remove allow_resize (deprecation already shipped in chore: async engine readiness - blockers and polish before default #553; this phase finishes the removal).
  4. Phase 3: Stage-level resume via per-stage fingerprints.
  5. Phase 4: DAG-shaped stages with parallel branches via asyncio.gather over acreate(). Hard dependency on the sidecar.
  6. Phase 5 (future): Auto-chaining from a single config.

Resolved decisions

  • Stage handoff is always on disk inside Pipeline. In-memory mode reserved for to_config_builder().
  • DAG semantics are designed-in but v1 accepts linear inputs only. Phase 4 ships parallel branches.
  • Pipeline is constructed via dd.pipeline() and reuses the parent DataDesigner across all stages - load-bearing for throttle coordination.

Future considerations (uncommitted)

  • External orchestration for cross-process / distributed execution. The plan's design choices (parent DataDesigner reuse, on-disk handoffs, no new engine surface) compose naturally with such a system.
  • Pipelined execution of dependent stages (streaming seed sources, edge-streaming resume) - flagged so the stage data contract isn't quietly closed off.

Open questions

Preview support, config serialization for auto-chaining, naming, image/media column forwarding, downstream seeding scope.

No code changes - plan document only.

Proposes replacing the in-place allow_resize mechanism with a Pipeline
class that chains multiple generation stages. Each stage gets a fresh
fixed-size tracker, and resize becomes a between-stage concern.
nabinchha added a commit that referenced this pull request May 5, 2026
greptile-apps (PR #594, r3189904028): `ProviderRepository.load`'s
YAML-default `DeprecationWarning` was using `warnings.warn(stacklevel=2)`,
which attributes to whichever data_designer frame called `load()` —
controllers, services, list/reset commands, agent introspection. Every
real call path lands on `data_designer.cli.*`, which falls under
Python's default `ignore::DeprecationWarning` filter and is silenced.
Audit found two more sites with the same problem:

- `DatasetBuilder._resolve_async_compatibility` (`allow_resize` /
  issue #552) — was using `stacklevel=4` to walk past
  `_resolve_async_compatibility -> build/build_preview -> interface ->
  user`. Brittle: any added frame (decorator, async wrapping, the
  `try/except DeprecationWarning: raise` boundary) shifts attribution
  silently. The existing test passed only because it used
  `simplefilter("always") + record=True`, which records warnings
  regardless of attribution.
- `ProviderController._handle_change_default` — was using
  `stacklevel=2`, which lands on the menu dispatcher in the same
  controller module. `print_warning` already shows the message
  visually, but programmatic observers (`pytest.warns`,
  `filterwarnings("error", ...)`) saw a library-attributed entry that
  default filters silenced.

All three migrated to `warn_at_caller` (the helper from 247fa30) so
attribution lands on the user's call site regardless of internal
chain shape. `data_designer` is already in
`DEFAULT_INTERNAL_PREFIXES`, so the walk escapes the entire library
in one pass.

Added attribution regression tests at each site asserting
`warning.filename == __file__`. A future regression to
`warnings.warn(stacklevel=N)` now fails CI instead of silently
silencing the user-facing nudge:

- `test_load_with_yaml_default_attributes_warning_to_caller`
  (test_provider_repository.py)
- `test_resolve_async_compatibility` extended with the same assertion
- `test_handle_change_default_emits_deprecation_warning` rewritten
  from `pytest.warns(...)` to a `catch_warnings(record=True)` block
  that filters for the message and asserts `filename == __file__`
  (`pytest.warns` does not check attribution, so the rewrite is
  required to actually catch the regression).

3,125 tests pass (548 config + 1,923 engine + 654 interface).

Refs #589
nabinchha added a commit that referenced this pull request May 5, 2026
* feat(models): deprecate implicit default provider routing

Emit DeprecationWarning whenever the legacy "implicit default
provider" path is exercised: `ModelConfig.provider=None`, the
registry-level `ModelProviderRegistry.default`, the YAML
`default:` key in `~/.data-designer/model_providers.yaml`, and
the CLI's "Change default provider" workflow.

`resolve_model_provider_registry` skips passing `default=` in the
single-provider case so the common construction path stays quiet.
Multi-provider registries still pass `default` (per
`check_implicit_default`) and warn accordingly.

Update docs, the package README, and test fixtures to specify
`provider=` explicitly on every `ModelConfig`. New tests cover
each warning entry point and pin the post-deprecation happy paths.

Refs #589

Made-with: Cursor

* fix(models): address PR #594 review feedback

Greptile P1: ProviderRepository.load emitted its DeprecationWarning
inside a `try/except Exception` block. Under
`filterwarnings("error", DeprecationWarning)` the warn would raise,
the except would swallow it, and `load()` would silently return None
(losing the registry). Move the warn outside the catch-all so the
strict-warning path no longer drops valid configs.

Greptile P2 / johnnygreco: `_warn_on_implicit_provider` and
`_warn_on_explicit_default` use `stacklevel=2`, which lands inside
pydantic v2's validator dispatch rather than at the user's
`ModelConfig(...)` / `ModelProviderRegistry(...)` call. That broke
both attribution (the source line was unhelpful) and Python's
once-per-location dedup (every call collapsed to the same
pydantic-internal key, suppressing all but the first warning).
Introduce `data_designer.config.utils.warning_helpers.warn_at_caller`,
which walks past the helper, validator, and any pydantic frames to
find the user's call site and emits via `warnings.warn_explicit` with
the user frame's `__warningregistry__`. Keeps attribution accurate
and dedup keyed on the user's (filename, lineno).

johnnygreco: align the `provider_repository.py` warning copy with the
sibling site in `default_model_settings.py` ("specify provider=
explicitly on each ModelConfig instead") so both YAML-default warning
sites give the same migration instruction. The previous wording
pointed users at "ModelConfig entries" inside `model_providers.yaml`,
where ModelConfig entries don't actually live.

johnnygreco: dedup the cascade in `DataDesigner.__init__`. With
`model_providers=None` and a YAML `default:`, the user previously saw
two DeprecationWarnings for the same root cause —
`get_default_provider_name()` warns about the YAML key, then
`resolve_model_provider_registry(...)` re-warns from
`_warn_on_explicit_default`. Suppress the registry-level duplicate in
the YAML-fallback branch via `warnings.catch_warnings()` so users see
exactly one warning per user action.

johnnygreco: tighten `_warn_on_explicit_default` to fire only when
`default is not None`. Passing `default=None` explicitly is
semantically equivalent to omitting it (caller is opting *out* of a
registry-level default), and shouldn't trigger the deprecation
nudge.

johnnygreco: add a `model_validate({...})` regression test for
`ModelConfig` so the deserialization path (legacy on-disk configs)
is pinned alongside the construction path.

Tests:
- Update `test_load_exists` and `test_save` to omit `default=` so the
  roundtrip stops exercising the deprecated YAML-default path
  unguarded (Greptile note).
- Wrap `test_resolve_model_provider_registry_with_explicit_default`,
  `test_get_provider`, and
  `test_init_user_supplied_providers_preserve_first_wins_over_yaml_default`
  in `pytest.warns` so the suite stays green under
  `-W error::DeprecationWarning` (Greptile note).
- Add `test_explicit_default_none_does_not_emit_deprecation_warning`
  to pin the tightened predicate.
- Add `test_init_yaml_default_emits_single_deprecation_warning` to
  pin the cascade-dedup behavior.

Refs #589

Made-with: Cursor

* fix(models): make deprecation warnings visible under default filters

andreatgretel (PR #594): the YAML-default warning in
`get_default_provider_name` and the registry-default warning emitted
from inside DataDesigner helpers were attributing to data_designer
library frames, not user code. Python's default filter chain includes
`ignore::DeprecationWarning`, so library-attributed entries are
silenced — meaning a normal `DataDesigner()` call with a YAML
`default:` set showed nothing, and `resolve_model_provider_registry`
warnings were similarly invisible. Two related changes:

1. `warn_at_caller`: extend the default skip-list from `("pydantic",)`
   to `("pydantic", "pydantic_core", "data_designer")` so the walk
   escapes both pydantic's validator-dispatch frames and data_designer
   helper frames before attributing. Also tighten the prefix predicate
   to exact-or-dotted-prefix matching (`name == p or
   name.startswith(p + ".")`) so e.g. `pydantic_helpers` is not
   falsely matched as part of `pydantic` (johnnygreco nit). Allow
   callers to pass a custom `skip_prefixes` for flexibility. Drop the
   "skip frame 0+1 unconditionally" guard now that prefix matching
   covers it.

2. `get_default_provider_name`: switch from
   `warnings.warn(stacklevel=2)` to `warn_at_caller`. The previous
   stacklevel pointed into `default_model_settings.py`, which is a
   library file → silenced under default filters. Verified the fix
   empirically with `python -W default`: warning is now attributed to
   the user's call site and rendered.

johnnygreco (PR #594): add the missing
`test_explicit_default_none_does_not_emit_deprecation_warning`
regression for the `self.default is not None` predicate landed in
the prior round.

Tests:
- New `test_warning_helpers.py` pins prefix-matching precision
  (rejects `pydantic_helpers` / `data_designer_other`), default
  skip-list contents, attribution past skip-prefix frames, and
  per-call-site dedup behavior.
- `test_get_default_provider_name_warning_attributes_to_user_frame`
  pins andreatgretel's repro for the YAML-default site.
- `test_explicit_default_warning_attributes_to_user_frame` pins the
  multi-frame case: construction goes through
  `resolve_model_provider_registry`, so the walk has to escape both
  pydantic and data_designer before landing on the test file.
- `test_explicit_default_none_does_not_emit_deprecation_warning`
  pins johnnygreco's predicate-tightening regression.

3,124 tests pass (540 config + 1,923 engine + 653 interface; +10 net
from this round).

Refs #589

Made-with: Cursor

* fix(models): apply warn_at_caller to remaining deprecation sites

greptile-apps (PR #594, r3189904028): `ProviderRepository.load`'s
YAML-default `DeprecationWarning` was using `warnings.warn(stacklevel=2)`,
which attributes to whichever data_designer frame called `load()` —
controllers, services, list/reset commands, agent introspection. Every
real call path lands on `data_designer.cli.*`, which falls under
Python's default `ignore::DeprecationWarning` filter and is silenced.
Audit found two more sites with the same problem:

- `DatasetBuilder._resolve_async_compatibility` (`allow_resize` /
  issue #552) — was using `stacklevel=4` to walk past
  `_resolve_async_compatibility -> build/build_preview -> interface ->
  user`. Brittle: any added frame (decorator, async wrapping, the
  `try/except DeprecationWarning: raise` boundary) shifts attribution
  silently. The existing test passed only because it used
  `simplefilter("always") + record=True`, which records warnings
  regardless of attribution.
- `ProviderController._handle_change_default` — was using
  `stacklevel=2`, which lands on the menu dispatcher in the same
  controller module. `print_warning` already shows the message
  visually, but programmatic observers (`pytest.warns`,
  `filterwarnings("error", ...)`) saw a library-attributed entry that
  default filters silenced.

All three migrated to `warn_at_caller` (the helper from 247fa30) so
attribution lands on the user's call site regardless of internal
chain shape. `data_designer` is already in
`DEFAULT_INTERNAL_PREFIXES`, so the walk escapes the entire library
in one pass.

Added attribution regression tests at each site asserting
`warning.filename == __file__`. A future regression to
`warnings.warn(stacklevel=N)` now fails CI instead of silently
silencing the user-facing nudge:

- `test_load_with_yaml_default_attributes_warning_to_caller`
  (test_provider_repository.py)
- `test_resolve_async_compatibility` extended with the same assertion
- `test_handle_change_default_emits_deprecation_warning` rewritten
  from `pytest.warns(...)` to a `catch_warnings(record=True)` block
  that filters for the message and asserts `filename == __file__`
  (`pytest.warns` does not check attribution, so the rewrite is
  required to actually catch the regression).

3,125 tests pass (548 config + 1,923 engine + 654 interface).

Refs #589
…, fingerprint feature available

- Update allow_resize framing: now logs DeprecationWarning and falls back to sync (#553), no longer hard-rejected. Async is default as of #592.
- Reference DataDesignerConfig.fingerprint() (#587) as the per-stage hash for resume invalidation.
- Rename _validate_async_compatibility() to _resolve_async_compatibility() to match current code.
- Mark Phase 2 step 1 as done; list the concrete docs that still need updates.
…ant, on-disk handoffs, DAG-ready, acreate sidecar

- Resolve in-memory vs on-disk handoff to always-on-disk inside Pipeline; reserve in-memory for to_config_builder() notebook ergonomic.
- Add Composability section: parent DataDesigner reuse is a load-bearing API contract for throttle coordination across stages and parallel branches.
- Add Engine API surface section: acreate() as a small additive sidecar, independent of chaining v1 but a hard dependency for Phase 4.
- Promote DAG semantics from "future work" to "designed-in"; add Phase 4 (parallel branches via asyncio.gather over acreate); demote auto-chaining to Phase 5.
- New Resolved decisions section captures the three load-bearing API decisions; trim the Open questions list accordingly.
- Mention possible future external orchestration only as a vague composability constraint, no commitment.
- Soften "Door open for external orchestration" - drop throttle-backend-as-seam framing; cross-reference Future considerations.
- Make acreate() scope explicit (in-process); cross-process orchestration is not the same problem.
- Add Phase 4 scope clarifier - branch parallelism, not stage pipelining.
- New Future considerations section: external orchestration (vague, uncommitted) and pipelined execution of dependent stages.
@andreatgretel andreatgretel marked this pull request as ready for review May 7, 2026 22:06
@andreatgretel andreatgretel requested a review from a team as a code owner May 7, 2026 22:06
@github-actions

github-actions Bot commented May 7, 2026

Copy link
Copy Markdown
Contributor

Review: PR #552 — docs: add plan for workflow chaining

Summary

This PR adds a single planning document at plans/workflow-chaining/workflow-chaining.md (+410/-0). It proposes a Pipeline class in the data_designer.interface layer that sequences multiple DataDesigner.create() calls, hands off between stages via on-disk parquet, and reuses the parent DataDesigner's ModelRegistry / ThrottleManager across stages. It also outlines:

No code changes.

Findings

Architectural alignment — strong

  • Layering is respected. Pipeline lives in data_designer.interface; the engine stays ignorant of pipelines; each stage is a regular DatasetBuilder.build() call. Consistent with the interface → engine → config import direction in AGENTS.md.
  • Config layer stays declarative. The plan explicitly keeps Pipeline imperative in v1 (no new config models), preserving the "declarative config, imperative engine" core principle.
  • Throttle invariant is load-bearing and correctly framed. The rationale for reusing a single DataDesigner (one ModelRegistry → one ThrottleManager → one AIMD state) is the right reason to pin this in the API contract now rather than discover it later. The same argument is extended cleanly to Phase 4 branches.
  • Internal-DAG, linear-API-v1 split. Keeping a DAG stage representation from day one while exposing only linear add_stage() avoids a disruptive restructure at Phase 4. Good forward-compat hygiene.
  • acreate() framed as a sidecar, not a pipeline dependency. Correctly identifies that sequential v1 doesn't need async, but Phase 4 does. The asyncio.wrap_future bridge over the existing singleton event loop is the minimal, correct shape.

Completeness — mostly complete, with a few areas to tighten before implementation

  1. Between-stage callback data contract is under-specified. The signature is (stage_output_path: Path) -> Path, but what the returned path must contain (a parquet-files/ subdirectory? a flat data.parquet? any LocalFileSeedSource-compatible layout?) is left implicit. The two examples are inconsistent: the pipeline reads stage_output_path / "parquet-files" but the callbacks write a flat data.parquet under a new directory. Before implementation, nail down exactly what the callback must produce so LocalFileSeedSource can consume it uniformly.

  2. Callback fingerprinting for resume is acknowledged but unresolved. The Resume Safety section lists callbacks as something that "may have changed between runs," but the fingerprint composition (DataDesignerConfig.fingerprint() + num_records + DD version + upstream stage fingerprint) does not capture the callback. A user who edits their filter predicate and reruns with resume=True will silently get stale filtered data. Options worth noting in the plan: hash the callback source, require a user-supplied version string, or document that callbacks invalidate resume. Better to decide now than post-ship.

  3. Stage fingerprint composition is missing sampling/selection. Phase 3 composes fingerprints from config + num_records + DD version + upstream. But add_stage() also takes sampling_strategy and selection_strategy, which change the data consumed by the stage. These should either be folded into the stage fingerprint or explicitly declared non-fingerprinted. Same question applies to allow_empty.

  4. allow_empty=True short-circuit: result-dict semantics unspecified. If stage 2 of 4 empties out, does results["stage_3"] raise KeyError, return None, or return a DatasetCreationResults with an empty dataset? The user-facing behavior of pipeline.run() in this branch isn't spelled out.

  5. Image/media column forwarding is flagged as an open question but unprioritized. Option (c) "document as unsupported in v1" may be fine, but users with image-producing stages will hit this immediately. Worth an explicit call: ship v1 without media forwarding and document the limitation, or require it in v1.

  6. Docs-audit scope may be incomplete. Phase 2 lists three docs files that reference allow_resize (custom_columns.md, plugins/example.md, agent-rollout-ingestion.md). Tutorials, example notebooks, and the skills/data-designer/ skill instructions aren't mentioned. A grep -r allow_resize across docs/, tutorials/ (if present), skills/, and packages/ before Phase 2 would be cheap insurance.

  7. ArtifactStorage bypass. The pipeline "owns its directory layout directly, bypassing ArtifactStorage's default auto-rename behavior." This is a reasonable choice but implies a new path through ArtifactStorage (or around it). Before implementation, confirm whether this requires an additive ArtifactStorage API (e.g., disable_auto_rename=True) or a fully separate layout manager, and whether the latter risks drift from single-run storage conventions.

  8. Migration example for allow_resize users is light. Phase 2 says "Migration path: users with allow_resize=True columns split their config into a pipeline with a stage boundary at the resize column." A concrete before/after example in the plan (or a tracked doc deliverable) would reduce user friction. Currently only listed under "Tests: verify rejection, migration path examples."

  9. acreate() cancellation semantics. asyncio.wrap_future over concurrent.futures.Future leaves cancellation propagation partial (cancellation of the asyncio wrapper doesn't reliably cancel the underlying future). Given the singleton background event loop, this is probably acceptable, but the plan should note whether cancellation is supported or deliberately not, so expectations don't drift during implementation.

Feasibility — feasible as structured

  • Phase 1 is a thin orchestration layer over existing components (DataDesigner.create(), LocalFileSeedSource, ArtifactStorage). No deep engine surgery required.
  • The sidecar acreate() is a ~one-file addition; the heavy lifting (singleton loop, concurrent future) already exists in _build_async.
  • Phase 2's engine simplifications (_cell_resize_mode, _finalize_fan_out resize branch, _resolve_async_compatibility) all unblock once Phase 1 gives users a migration path. Order is sound.
  • Phase 3 depends on Phase 1 metadata format. The plan calls out "the metadata format in phase 1 should record enough information to support it" — good. Worth making the exact metadata schema a deliverable of Phase 1, not a Phase 3 discovery.
  • Phase 4 additively extends add_stage(depends_on=...) without changing existing call sites. Clean.

Minor observations

  • dd.pipeline() factory is named consistently with dd.create(); good.
  • The to_config_builder() scoping (explicitly not a Pipeline, in-memory only, notebook ergonomic) is well-drawn. The plan is unambiguous that on-disk is the production path.
  • "Resolved decisions" section at the end reiterates choices already made in-body; this is slightly redundant but useful for reviewers skimming the plan as a decision log.
  • Naming question ("Pipeline vs Chain vs WorkflowChain") probably worth resolving before Phase 1 ships, since it's user-visible on dd.pipeline().

Verdict

Strong plan. Architecturally aligned with the project's layering and invariants, feasibility-grounded in existing primitives, and thoughtful about forward compatibility (DAG-internal-v1, acreate-as-sidecar). The one place the plan would benefit from a tightening pass before implementation is the stage data contract and fingerprint scope: callback output layout, callback fingerprinting for resume, and inclusion of sampling_strategy/selection_strategy in stage fingerprints. These are small clarifications, not redesigns. Media forwarding and migration-example concreteness are secondary polish items.

Recommend approving the plan with a note to resolve items 1–3 above before starting Phase 1 implementation.

@greptile-apps

greptile-apps Bot commented May 7, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a design plan for workflow chaining in DataDesigner: a CompositeWorkflow class that sequences multiple generation stages with on-disk handoffs, shared throttling, between-stage callbacks, and stage-level resume. Secondary deliverables include a to_config_builder() notebook ergonomic, removal of allow_resize, and a pre-batch processor resize lockdown.

  • Phase 1 introduces CompositeWorkflow with linear stage composition, deterministic artifact layout (artifacts/<name>/stage-<i>-<name>/), provenance via workflow-metadata.json, and duplicate-name rejection on add_stage().
  • Phase 3 adds fingerprint-based resume (config + num_records + on_success_version + DD version + upstream fingerprint), with explicit handling for completed_empty, skipped_empty_upstream, and callback output path validation.
  • Phase 4 extends to DAG-shaped stages with parallel branches via asyncio.gather over acreate(), but leaves the sync\u2192async bridging mechanism inside workflow.run() unspecified.

Confidence Score: 5/5

Documentation-only change with no executable code; all previously flagged design gaps have been addressed in subsequent commits.

The plan is thorough and self-consistent for Phases 1-3. The two remaining open items (how workflow.run() bridges sync to async in Phase 4, and the exact resume treatment of failed stages) are future-phase concerns that do not affect the v1 implementation. No code ships in this PR.

plans/workflow-chaining/workflow-chaining.md - the Phase 4 async bridge and failed-stage resume policy are unspecified but affect only future phases.

Important Files Changed

Filename Overview
plans/workflow-chaining/workflow-chaining.md New design plan for composite workflow chaining. Most design gaps from earlier reviews (resume identity, duplicate names, DAG fingerprint ordering, callback output validation, allow_empty contract) are explicitly addressed. Two unresolved items remain: Phase 4's sync→async bridge inside workflow.run() is unspecified, and the behavior of a failed stage during resume is ambiguous.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[DataDesigner.compose_workflow name=required] --> B[CompositeWorkflow]
    B --> C[add_stage name, config, num_records]
    C --> D{Duplicate name?}
    D -- Yes --> E[raise DataDesignerWorkflowError]
    D -- No --> F[Stage added to DAG]
    F --> G[workflow.run]
    G --> H{Phase 3: check workflow-metadata.json}
    H -- fingerprint match + completed --> I[Skip stage]
    H -- fingerprint match + partial --> J[create resume=ALWAYS via #526]
    H -- fingerprint mismatch --> K[Run stage fresh]
    K --> L[DataDesigner.create]
    J --> L
    L --> M{on_success callback?}
    M -- Yes --> N[callback writes resolved path]
    M -- No --> O[stage parquet output]
    N --> O
    O --> P{allow_empty?}
    P -- No, 0 rows --> Q[raise DataDesignerWorkflowError]
    P -- Yes, 0 rows --> R[completed_empty downstream skipped_empty_upstream]
    P -- rows > 0 --> S[completed seeds next stage]
    S --> T[Next stage]
    T --> G
    S --> U[workflow-metadata.json]
Loading
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
plans/workflow-chaining/workflow-chaining.md:393-408
**`workflow.run()` sync→async bridge unspecified for Phase 4**

Phase 4 says `workflow.run()` will gather parallel branches via `asyncio.gather` over `acreate()` calls internally, but `workflow.run()` is synchronous in v1. The sidecar section says `acreate()` bridges the background-loop future "into the caller's loop via `asyncio.wrap_future`", which requires a running event loop at the call site. A synchronous `workflow.run()` calling `asyncio.gather(...)` would need `asyncio.run(...)` or `loop.run_until_complete(...)`, both of which raise `RuntimeError` if called inside Jupyter's already-running event loop — exactly the environment highlighted as a primary use case.

The plan should specify whether Phase 4 introduces a complementary `workflow.arun()` (leaving `workflow.run()` sync-safe), or whether `workflow.run()` becomes async (a breaking change), or whether it falls back to the same singleton-background-loop pattern already used by `_build_async` for its concurrent.futures bridging. Leaving this unresolved will force the decision at implementation time when API compatibility is harder to change.

### Issue 2 of 2
plans/workflow-chaining/workflow-chaining.md:381-391
**`failed` stage resume treatment is unspecified**

Phase 3 says "skip stages whose fingerprints match and are complete" and delegates partial stages to `create(..., resume=ResumeMode.ALWAYS)`, but it never defines how a `failed` stage is classified. A stage can fail after writing several batches of parquet output (partial) or before writing any output (empty). In the first case `ResumeMode.ALWAYS` would recover intra-stage progress via #526; in the second it's effectively a clean restart. Without an explicit rule, implementers must make this call ad-hoc. The Phase 3 bullet list should add: a `failed` stage with existing partial output in its directory is treated the same as a partial stage (delegate to `ResumeMode.ALWAYS`); a `failed` stage with an empty or missing directory is started fresh.

Reviews (9): Last reviewed commit: "docs: require unique composite workflow ..." | Re-trigger Greptile

Comment thread plans/workflow-chaining/workflow-chaining.md Outdated
Comment thread plans/workflow-chaining/workflow-chaining.md Outdated
Comment thread plans/workflow-chaining/workflow-chaining.md Outdated
Comment thread plans/workflow-chaining/workflow-chaining.md Outdated
Comment thread plans/workflow-chaining/workflow-chaining.md

### Part 1: Pipeline class

A new `Pipeline` class in `data_designer.interface` that orchestrates multi-stage generation.

@johnnygreco johnnygreco May 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using a name like WorkflowChain instead? I'm a bit concerned that Pipeline sounds too much like we are exposing the "Data Designer pipeline builder" (which isn't a thing haha).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it strictly going to be a linked list or can it also be a DAG !?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call - changed the public api to CompositeWorkflow via data_designer.compose_workflow(name=...). the plan now says v1 exposes linear composition, but the internal stage model is dag-shaped, so this is not strictly a linked list.

@johnnygreco johnnygreco left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't wait for this to be built!

Are you thinking that each stage can have a different RunConfig. Something to think about for the future is a world where different stages can run on different compute either in parallel or serially.

Comment on lines +50 to +52
pipeline.add_stage("personas", config_personas, num_records=100)
pipeline.add_stage("conversations", config_convos, num_records=1000) # explode: 100 -> 1000
pipeline.add_stage("judged", config_judge) # defaults to previous stage's output size

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we assume the seed for the conversations stage is what's in the parquet-files folder generated by personas. What if conversations actually depended on outcome of the schema transform post processor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarified the v1 contract: downstream stages seed from the upstream final dataset only. named processor outputs, schema-transform artifacts, dropped columns, and media need an on_success bridge in v1, with first-class artifact seeding called out as future work.

result.to_config_builder(columns=["name", "age", "background"]) # optional column selection
.add_column(name="conversation", column_type="llm_text", prompt="...")
)
result_2 = dd.create(config_convos, num_records=1000)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this re-do all the columns in the config_personas?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another way, perhaps, is another seed source which can operate from the PreviewResults class to connect the dots?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarified that to_config_builder() starts a new config seeded from existing results, rather than mutating or resuming the original config. kept PreviewResults in phase 1, and richer result/preview seed ergonomics can build on that.

Comment on lines +83 to +85
**Auto-chaining from a single config (future):**

The engine detects columns that were previously `allow_resize=True` (or a new marker like `stage_boundary=True`) and auto-splits the DAG into stages. This is a convenience layer on top of the explicit API - not required for v1.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably overkill... even for the future!?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, demoted this out of the phase plan. it is now just a future consideration and explicitly not part of the initial roadmap.


Each stage seeds from the **previous stage's final dataset** - the post-processor output with dropped columns excluded. This is the same DataFrame returned by `DatasetCreationResults.load_dataset()`.

Processor outputs (named processor artifacts) and media assets (images stored on disk with relative paths in the DataFrame) are NOT automatically forwarded. If a downstream stage needs image columns from an upstream stage, the pipeline must resolve image paths relative to the upstream stage's artifact directory. This needs explicit handling - TBD in implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a bridge, we can have documentation for the downstream stage to use use expression column configs to modify the relative paths of the media files

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this bridge to the media open question: v1 can document using on_success or expression columns to rewrite relative media paths against the upstream artifact directory.

pipeline.add_stage(
"enriched",
config_enrich,
after=filter_high_quality, # runs on stage output before next stage seeds from it

@nabinchha nabinchha May 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps name these to look like a call back?
on_success=filter_high_quality

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, renamed the hook to on_success / on_success_version.

)
```

The callback receives the path to the completed stage's artifact directory (containing `parquet-files/`, `metadata.json`, etc.) and returns a path that the next stage will seed from. This keeps large DataFrames on disk and gives users full control.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where should the call back dump files seems open ended. But should we enforce some structure? Like another sub-folder to live alongside parquet_files?

Another question, how do we envision push to hf to work in this scenario? Only the last stage can push?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a managed callback output convention under <stage-dir>/callback-outputs/<callback-name>/. also made export/push a v1 decision: helpers default to the final stage dataset, while selected-stage or full workflow bundle export stays future work.


**Callback resume policy**: The pipeline does not hash arbitrary Python source or bytecode in v1. `after_version` is the explicit callback identity recorded in `pipeline-metadata.json` and included in the next stage's fingerprint. If `after` is set without `after_version`, that stage is treated as dirty on every resume so a changed callback cannot silently reuse stale transformed data. The resolved path returned by the callback is also recorded as the dependent stage's seed path; a stage seeded from callback output is skippable only if that recorded path still exists and is readable by `LocalFileSeedSource`.

**Empty stage policy**: If a callback filters all rows (or a stage produces zero rows), the pipeline raises `DataDesignerPipelineError` by default. Stages can opt in to empty output with `allow_empty=True` on `add_stage()`, in which case the pipeline marks that stage as `completed_empty` and all downstream stages as `skipped_empty_upstream`. `PipelineResults` still contains every declared stage name: executed stages map to `DatasetCreationResults`, while skipped downstream stages map to `SkippedStageResult` with `status="skipped_empty_upstream"` and `upstream_stage=<stage_name>`. This avoids `KeyError`/`None` ambiguity and gives resume a durable state distinct from normal completion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we added on_failure call back? I default it's a noop, but folks can choose to decide how they want to handle it in situations like all rows filtered, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added this as future scope. v1 keeps failure behavior simple and raises by default; a later on_failure hook can support cleanup or custom recovery.

Comment on lines +191 to +195
The `Pipeline` is constructed via `dd.pipeline(name=...)` and holds a reference to the parent `DataDesigner`. Every stage runs `dd.create()` (or `dd.acreate()` once available - see Engine API surface below) on that same instance. This is a load-bearing API contract for two reasons.

**Throttle coordination across stages.** A `DataDesigner` owns one `ModelRegistry`, which owns one `ThrottleManager`. AIMD rate-limit state is per-instance. If the pipeline constructed a fresh `DataDesigner` per stage, each stage would adapt independently and the aggregate request rate against a provider could exceed the configured cap by a multiple of the stage count. The same hazard applies to parallel branches in Phase 4: branches sharing one `DataDesigner` automatically share throttling; branches each holding their own `DataDesigner` silently fragment it. Reusing one instance is the simple, correct default.

**Door open for external orchestration.** The pipeline's choice to reuse one `DataDesigner` is the in-process strategy: shared throttling across stages, branches gathered in the orchestrator process. A cross-process strategy is a separate but compatible model - see Future considerations. v1 only needs to avoid encoding assumptions that would prevent it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the snippets above the a pipeline is a config code.
pipeline = dd.pipeline # because we do import data_designer.config as dd
pipeline.run

What is the contract between the pipeline and the data designer instance since it's planned to be shared."

Something seems off here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, the examples were confusing because dd usually means data_designer.config. changed them to use data_designer.compose_workflow(...) and clarified that CompositeWorkflow is created from, and holds onto, the parent DataDesigner instance.

@andreatgretel

Copy link
Copy Markdown
Contributor Author

@johnnygreco added this under future considerations and clarified the split: v1 stages can already use different DataDesignerConfig values, including different model configs, while per-stage RunConfig or compute placement stays future work because it needs explicit throttle/artifact/resume rules.

Comment thread plans/workflow-chaining/workflow-chaining.md

@johnnygreco johnnygreco left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛸

@andreatgretel andreatgretel merged commit 2c6e6b5 into main May 12, 2026
49 checks passed
@andreatgretel andreatgretel deleted the andreatgretel/docs/workflow-chaining branch May 14, 2026 01:13
@andreatgretel andreatgretel mentioned this pull request May 14, 2026
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants