Conversation
… optimizations - Add DAG validation with cycle detection and dependency verification - Implement StepTracker to consolidate lock state (4 locks → 1) - Move DAG validation to registration time for efficiency - Add get_context() and is_cancelled() to avoid full state cloning - Use std::mem::replace to reduce context cloning overhead - Add sequential depends_on chains to all workflow step definitions - Update register_workflow to return Result for validation errors - Add comprehensive DAG workflow tests
Summary of ChangesHello @slin1237, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a major overhaul to the workflow engine, transforming it into a robust system capable of executing steps in parallel based on a Directed Acyclic Graph (DAG) of dependencies. This enhancement not only improves efficiency by allowing concurrent execution of independent tasks but also bolsters reliability through integrated DAG validation and optimized state management, ensuring workflows are correctly structured and performant. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for Directed Acyclic Graph (DAG) based parallel execution of workflow steps, which is a significant improvement over the previous sequential execution model. The changes are well-implemented and include dependency tracking, a new DAG scheduler, validation at registration time, and performance optimizations. The addition of comprehensive tests for the new DAG execution logic is also a great contribution. My review includes a high-severity suggestion to correctly handle a potential deadlock scenario in the scheduler and a medium-severity suggestion to optimize the DAG validation logic for better performance.
| if ready_step_indices.is_empty() && running_count == 0 { | ||
| if blocked_by_failure { | ||
| self.state_store.update(instance_id, |s| { | ||
| s.status = WorkflowStatus::Failed; | ||
| })?; | ||
|
|
||
| let failed_step = tracker.read().failed.iter().next().cloned(); | ||
| self.event_bus | ||
| .publish(WorkflowEvent::WorkflowFailed { | ||
| instance_id, | ||
| failed_step: failed_step.unwrap_or_else(|| StepId::new("unknown")), | ||
| error: "Workflow failed due to step dependency failure".to_string(), | ||
| }) | ||
| .await; | ||
| return Ok(()); | ||
| } | ||
| Ok(StepResult::Failure) | Err(_) => { | ||
| // Handle failure based on failure action | ||
| match step.on_failure { | ||
| FailureAction::FailWorkflow => { | ||
| let error_msg = format!("Step {} failed", step.id); | ||
| self.state_store.update(instance_id, |s| { | ||
| s.status = WorkflowStatus::Failed; | ||
| })?; | ||
|
|
||
| self.event_bus | ||
| .publish(WorkflowEvent::WorkflowFailed { | ||
| instance_id, | ||
| failed_step: step.id.clone(), | ||
| error: error_msg, | ||
| }) | ||
| .await; | ||
|
|
||
| return Ok(()); | ||
| } | ||
| FailureAction::ContinueNextStep => { | ||
| // Mark step as skipped and continue | ||
| self.state_store.update(instance_id, |s| { | ||
| if let Some(step_state) = s.step_states.get_mut(&step.id) { | ||
| step_state.status = StepStatus::Skipped; | ||
| break; | ||
| } |
There was a problem hiding this comment.
In the case of a deadlock where no steps are ready to run, none are currently running, but the workflow is not yet complete and no dependencies have failed, the code currently breaks from the loop. This will lead to the workflow being incorrectly marked as 'Completed' if no steps have failed. This scenario should theoretically not happen with a valid, cycle-free DAG, but if it does, it indicates a scheduler bug or an unhandled edge case in the DAG structure. Instead of breaking and implicitly succeeding, it would be safer to explicitly fail the workflow with a descriptive error message.
if ready_step_indices.is_empty() && running_count == 0 {
let error_message = if blocked_by_failure {
"Workflow failed due to step dependency failure".to_string()
} else {
"Workflow deadlocked. This may indicate a cycle or issue in the workflow definition.".to_string()
};
self.state_store.update(instance_id, |s| {
s.status = WorkflowStatus::Failed;
})?;
let failed_step = tracker.read().failed.iter().next().cloned();
self.event_bus
.publish(WorkflowEvent::WorkflowFailed {
instance_id,
failed_step: failed_step.unwrap_or_else(|| StepId::new("internal_scheduler")),
error: error_message,
})
.await;
return Ok(());
}| pub fn validate(&self) -> Result<(), String> { | ||
| let step_ids: HashSet<_> = self.steps.iter().map(|s| &s.id).collect(); | ||
|
|
||
| // Check all dependencies exist | ||
| for step in &self.steps { | ||
| for dep in &step.depends_on { | ||
| if !step_ids.contains(dep) { | ||
| return Err(format!( | ||
| "Step '{}' depends on non-existent step '{}'", | ||
| step.id, dep | ||
| )); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Check for cycles using DFS | ||
| let mut visited = HashSet::new(); | ||
| let mut rec_stack = HashSet::new(); | ||
|
|
||
| for step in &self.steps { | ||
| if self.has_cycle(&step.id, &mut visited, &mut rec_stack) { | ||
| return Err(format!("Cycle detected involving step '{}'", step.id)); | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// DFS helper for cycle detection | ||
| fn has_cycle( | ||
| &self, | ||
| step_id: &StepId, | ||
| visited: &mut HashSet<StepId>, | ||
| rec_stack: &mut HashSet<StepId>, | ||
| ) -> bool { | ||
| if rec_stack.contains(step_id) { | ||
| return true; // Back edge found - cycle! | ||
| } | ||
| if visited.contains(step_id) { | ||
| return false; // Already fully processed | ||
| } | ||
|
|
||
| visited.insert(step_id.clone()); | ||
| rec_stack.insert(step_id.clone()); | ||
|
|
||
| // Find the step and check its dependencies | ||
| if let Some(step) = self.steps.iter().find(|s| &s.id == step_id) { | ||
| for dep in &step.depends_on { | ||
| if self.has_cycle(dep, visited, rec_stack) { | ||
| return true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| rec_stack.remove(step_id); | ||
| false | ||
| } |
There was a problem hiding this comment.
The cycle detection logic in validate and has_cycle can be optimized. Currently, has_cycle performs a linear search (self.steps.iter().find(...)) to locate a step by its ID within the recursive calls. This results in a time complexity of roughly O(V*V) for a connected graph, where V is the number of steps, instead of the optimal O(V + E).
You can improve this by creating a HashMap of step IDs to step definitions once in the validate function and passing it down to has_cycle. This will reduce the lookup to O(1) and bring the overall complexity to O(V + E).
pub fn validate(&self) -> Result<(), String> {
let steps_map: std::collections::HashMap<_, _> = self.steps.iter().map(|s| (&s.id, s)).collect();
// Check all dependencies exist
for step in &self.steps {
for dep in &step.depends_on {
if !steps_map.contains_key(dep) {
return Err(format!(
"Step '{}' depends on non-existent step '{}'",
step.id, dep
));
}
}
}
// Check for cycles using DFS
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();
for step in &self.steps {
if !visited.contains(&step.id) {
if self.has_cycle(&step.id, &steps_map, &mut visited, &mut rec_stack) {
return Err(format!("Cycle detected involving step '{}'", step.id));
}
}
}
Ok(())
}
/// DFS helper for cycle detection
fn has_cycle<'a>(
&self,
step_id: &'a StepId,
steps_map: &std::collections::HashMap<&'a StepId, &'a StepDefinition>,
visited: &mut HashSet<&'a StepId>,
rec_stack: &mut HashSet<&'a StepId>,
) -> bool {
if rec_stack.contains(step_id) {
return true; // Back edge found - cycle!
}
if visited.contains(step_id) {
return false; // Already fully processed
}
visited.insert(step_id);
rec_stack.insert(step_id);
// Find the step and check its dependencies
if let Some(step) = steps_map.get(step_id) {
for dep in &step.depends_on {
if self.has_cycle(dep, steps_map, visited, rec_stack) {
return true;
}
}
}
rec_stack.remove(step_id);
false
}…n_eagle3_npu * 'main' of https://github.com/sgl-project/sglang: (121 commits) Super tiny add gsp-fast-prepare (sgl-project#14992) Super tiny fix confusing slash_command_handler hint (sgl-project#14976) Super tiny remove unused argument (sgl-project#14966) [registry] Add a strict mode to model registration (sgl-project#14933) Feature/Fix multi lora scheduler blocking issue and evict LoRA None lastly (sgl-project#14795) Tune triton fused moe for the case of glm-4.6-fp8 b200 tp4 (sgl-project#15020) [model-gateway] refactor: unify worker management into modular workflow structure (sgl-project#15010) Update ci permission (sgl-project#15014) Refactor of http and engine entrypoints to allow custom override (sgl-project#14869) Add KV4-capable backend flashmla and update server args (sgl-project#14989) Revert several PRs (sgl-project#14958) Super tiny extract route_typed_request_once (sgl-project#14951) Fix CI by reverting incorrect metric check logic (sgl-project#15004) [model-gateway] refactor: workflow engine cleanup and minor optimization (sgl-project#15001) [model-gateway] fix: handle workflow deadlock and optimize cycle detection (sgl-project#15000) [model-gateway] feat: add DAG parallel execution support and workflow optimization (sgl-project#14999) [model-gateway] refactor: extract workflow engine to src/workflow module (sgl-project#14996) Update CODEOWNERS for multimodal_gen (sgl-project#14995) [diffusion] docker: Tiny fix Docker Hub link in installation documentation (sgl-project#14987) [PD] Add decode PP event loop for PD disaggregation (sgl-project#14945) ... # Conflicts: # python/sglang/srt/model_executor/piecewise_cuda_graph_runner.py
StepTrackerto consolidate lock state (4 locks → 1)get_context()andis_cancelled()to avoid full state cloningstd::mem::replaceto reduce context cloning overheaddepends_onchains to all workflow step definitionsregister_workflowto return Result for validation errorsChecklist