Skip to content

[model-gateway] feat: add DAG parallel execution support and workflow optimization#14999

Merged
slin1237 merged 1 commit intomainfrom
wfaas-n/2
Dec 12, 2025
Merged

[model-gateway] feat: add DAG parallel execution support and workflow optimization#14999
slin1237 merged 1 commit intomainfrom
wfaas-n/2

Conversation

@slin1237
Copy link
Copy Markdown
Collaborator

  • Add DAG validation with cycle detection and dependency verification
  • Implement StepTracker to consolidate lock state (4 locks → 1)
  • Move DAG validation to registration time for efficiency
  • Add get_context() and is_cancelled() to avoid full state cloning
  • Use std::mem::replace to reduce context cloning overhead
  • Add sequential depends_on chains to all workflow step definitions
  • Update register_workflow to return Result for validation errors
  • Add comprehensive DAG workflow tests

Checklist

… optimizations

- Add DAG validation with cycle detection and dependency verification
- Implement StepTracker to consolidate lock state (4 locks → 1)
- Move DAG validation to registration time for efficiency
- Add get_context() and is_cancelled() to avoid full state cloning
- Use std::mem::replace to reduce context cloning overhead
- Add sequential depends_on chains to all workflow step definitions
- Update register_workflow to return Result for validation errors
- Add comprehensive DAG workflow tests
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a major overhaul to the workflow engine, transforming it into a robust system capable of executing steps in parallel based on a Directed Acyclic Graph (DAG) of dependencies. This enhancement not only improves efficiency by allowing concurrent execution of independent tasks but also bolsters reliability through integrated DAG validation and optimized state management, ensuring workflows are correctly structured and performant.

Highlights

  • DAG Parallel Execution Support: The workflow engine now supports Directed Acyclic Graph (DAG) based parallel execution, allowing independent steps to run concurrently while respecting defined dependencies.
  • Workflow Optimization: Significant optimizations have been implemented, including consolidating lock states into a single StepTracker to reduce contention and improving context handling to avoid full state cloning.
  • DAG Validation: New validation logic has been added to detect cycles and verify dependencies within workflow DAGs, ensuring workflow integrity at registration time.
  • Dependency Definition: Workflow step definitions now include a depends_on mechanism, allowing explicit declaration of sequential dependencies between steps.
  • Comprehensive Testing: New unit tests have been added to cover various DAG scenarios, including parallel execution, dependency chaining, and failure handling.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for Directed Acyclic Graph (DAG) based parallel execution of workflow steps, which is a significant improvement over the previous sequential execution model. The changes are well-implemented and include dependency tracking, a new DAG scheduler, validation at registration time, and performance optimizations. The addition of comprehensive tests for the new DAG execution logic is also a great contribution. My review includes a high-severity suggestion to correctly handle a potential deadlock scenario in the scheduler and a medium-severity suggestion to optimize the DAG validation logic for better performance.

Comment on lines +268 to +285
if ready_step_indices.is_empty() && running_count == 0 {
if blocked_by_failure {
self.state_store.update(instance_id, |s| {
s.status = WorkflowStatus::Failed;
})?;

let failed_step = tracker.read().failed.iter().next().cloned();
self.event_bus
.publish(WorkflowEvent::WorkflowFailed {
instance_id,
failed_step: failed_step.unwrap_or_else(|| StepId::new("unknown")),
error: "Workflow failed due to step dependency failure".to_string(),
})
.await;
return Ok(());
}
Ok(StepResult::Failure) | Err(_) => {
// Handle failure based on failure action
match step.on_failure {
FailureAction::FailWorkflow => {
let error_msg = format!("Step {} failed", step.id);
self.state_store.update(instance_id, |s| {
s.status = WorkflowStatus::Failed;
})?;

self.event_bus
.publish(WorkflowEvent::WorkflowFailed {
instance_id,
failed_step: step.id.clone(),
error: error_msg,
})
.await;

return Ok(());
}
FailureAction::ContinueNextStep => {
// Mark step as skipped and continue
self.state_store.update(instance_id, |s| {
if let Some(step_state) = s.step_states.get_mut(&step.id) {
step_state.status = StepStatus::Skipped;
break;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In the case of a deadlock where no steps are ready to run, none are currently running, but the workflow is not yet complete and no dependencies have failed, the code currently breaks from the loop. This will lead to the workflow being incorrectly marked as 'Completed' if no steps have failed. This scenario should theoretically not happen with a valid, cycle-free DAG, but if it does, it indicates a scheduler bug or an unhandled edge case in the DAG structure. Instead of breaking and implicitly succeeding, it would be safer to explicitly fail the workflow with a descriptive error message.

            if ready_step_indices.is_empty() && running_count == 0 {
                let error_message = if blocked_by_failure {
                    "Workflow failed due to step dependency failure".to_string()
                } else {
                    "Workflow deadlocked. This may indicate a cycle or issue in the workflow definition.".to_string()
                };

                self.state_store.update(instance_id, |s| {
                    s.status = WorkflowStatus::Failed;
                })?;

                let failed_step = tracker.read().failed.iter().next().cloned();
                self.event_bus
                    .publish(WorkflowEvent::WorkflowFailed {
                        instance_id,
                        failed_step: failed_step.unwrap_or_else(|| StepId::new("internal_scheduler")),
                        error: error_message,
                    })
                    .await;
                return Ok(());
            }

Comment on lines +113 to +169
pub fn validate(&self) -> Result<(), String> {
let step_ids: HashSet<_> = self.steps.iter().map(|s| &s.id).collect();

// Check all dependencies exist
for step in &self.steps {
for dep in &step.depends_on {
if !step_ids.contains(dep) {
return Err(format!(
"Step '{}' depends on non-existent step '{}'",
step.id, dep
));
}
}
}

// Check for cycles using DFS
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();

for step in &self.steps {
if self.has_cycle(&step.id, &mut visited, &mut rec_stack) {
return Err(format!("Cycle detected involving step '{}'", step.id));
}
}

Ok(())
}

/// DFS helper for cycle detection
fn has_cycle(
&self,
step_id: &StepId,
visited: &mut HashSet<StepId>,
rec_stack: &mut HashSet<StepId>,
) -> bool {
if rec_stack.contains(step_id) {
return true; // Back edge found - cycle!
}
if visited.contains(step_id) {
return false; // Already fully processed
}

visited.insert(step_id.clone());
rec_stack.insert(step_id.clone());

// Find the step and check its dependencies
if let Some(step) = self.steps.iter().find(|s| &s.id == step_id) {
for dep in &step.depends_on {
if self.has_cycle(dep, visited, rec_stack) {
return true;
}
}
}

rec_stack.remove(step_id);
false
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cycle detection logic in validate and has_cycle can be optimized. Currently, has_cycle performs a linear search (self.steps.iter().find(...)) to locate a step by its ID within the recursive calls. This results in a time complexity of roughly O(V*V) for a connected graph, where V is the number of steps, instead of the optimal O(V + E).

You can improve this by creating a HashMap of step IDs to step definitions once in the validate function and passing it down to has_cycle. This will reduce the lookup to O(1) and bring the overall complexity to O(V + E).

    pub fn validate(&self) -> Result<(), String> {
        let steps_map: std::collections::HashMap<_, _> = self.steps.iter().map(|s| (&s.id, s)).collect();

        // Check all dependencies exist
        for step in &self.steps {
            for dep in &step.depends_on {
                if !steps_map.contains_key(dep) {
                    return Err(format!(
                        "Step '{}' depends on non-existent step '{}'",
                        step.id, dep
                    ));
                }
            }
        }

        // Check for cycles using DFS
        let mut visited = HashSet::new();
        let mut rec_stack = HashSet::new();

        for step in &self.steps {
            if !visited.contains(&step.id) {
                if self.has_cycle(&step.id, &steps_map, &mut visited, &mut rec_stack) {
                    return Err(format!("Cycle detected involving step '{}'", step.id));
                }
            }
        }

        Ok(())
    }

    /// DFS helper for cycle detection
    fn has_cycle<'a>(
        &self,
        step_id: &'a StepId,
        steps_map: &std::collections::HashMap<&'a StepId, &'a StepDefinition>,
        visited: &mut HashSet<&'a StepId>,
        rec_stack: &mut HashSet<&'a StepId>,
    ) -> bool {
        if rec_stack.contains(step_id) {
            return true; // Back edge found - cycle!
        }
        if visited.contains(step_id) {
            return false; // Already fully processed
        }

        visited.insert(step_id);
        rec_stack.insert(step_id);

        // Find the step and check its dependencies
        if let Some(step) = steps_map.get(step_id) {
            for dep in &step.depends_on {
                if self.has_cycle(dep, steps_map, visited, rec_stack) {
                    return true;
                }
            }
        }

        rec_stack.remove(step_id);
        false
    }

@slin1237 slin1237 merged commit 306e5b8 into main Dec 12, 2025
67 checks passed
@slin1237 slin1237 deleted the wfaas-n/2 branch December 12, 2025 16:31
slin1237 added a commit that referenced this pull request Dec 12, 2025
Liwansi added a commit to iforgetmyname/sglang that referenced this pull request Dec 13, 2025
…n_eagle3_npu

* 'main' of https://github.com/sgl-project/sglang: (121 commits)
  Super tiny add gsp-fast-prepare (sgl-project#14992)
  Super tiny fix confusing slash_command_handler hint (sgl-project#14976)
  Super tiny remove unused argument (sgl-project#14966)
  [registry] Add a strict mode to model registration (sgl-project#14933)
  Feature/Fix multi lora scheduler blocking issue and evict LoRA None lastly (sgl-project#14795)
  Tune triton fused moe for the case of glm-4.6-fp8 b200 tp4 (sgl-project#15020)
  [model-gateway] refactor: unify worker management into modular workflow structure (sgl-project#15010)
  Update ci permission (sgl-project#15014)
  Refactor of http and engine entrypoints to allow custom override  (sgl-project#14869)
  Add KV4-capable backend flashmla and update server args (sgl-project#14989)
  Revert several PRs (sgl-project#14958)
  Super tiny extract route_typed_request_once (sgl-project#14951)
  Fix CI by reverting incorrect metric check logic (sgl-project#15004)
  [model-gateway] refactor: workflow engine cleanup and minor optimization (sgl-project#15001)
  [model-gateway] fix: handle workflow deadlock and optimize cycle detection (sgl-project#15000)
  [model-gateway] feat: add DAG parallel execution support and workflow optimization (sgl-project#14999)
  [model-gateway] refactor: extract workflow engine to src/workflow module (sgl-project#14996)
  Update CODEOWNERS for multimodal_gen (sgl-project#14995)
  [diffusion] docker: Tiny fix Docker Hub link in installation documentation (sgl-project#14987)
  [PD] Add decode PP event loop for PD disaggregation (sgl-project#14945)
  ...

# Conflicts:
#	python/sglang/srt/model_executor/piecewise_cuda_graph_runner.py
Prozac614 pushed a commit to Prozac614/sglang that referenced this pull request Dec 17, 2025
YChange01 pushed a commit to YChange01/sglang that referenced this pull request Jan 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant