|
6 | 6 | Intelligently collects and judges context information |
7 | 7 | """ |
8 | 8 |
|
| 9 | +import json |
9 | 10 | from typing import List, Dict, Any, Optional |
10 | 11 | from .base import BaseNode |
11 | 12 | from ..core.state import WorkflowState, StreamEvent |
@@ -69,108 +70,79 @@ async def process(self, state: WorkflowState) -> WorkflowState: |
69 | 70 |
|
70 | 71 | # LLM-driven iterative collection process |
71 | 72 | iteration = 0 |
72 | | - tool_history = [] # Record tool call history |
73 | 73 | while iteration < self.max_iterations: |
74 | 74 | iteration += 1 |
75 | 75 | progress = iteration / self.max_iterations |
76 | | - |
77 | 76 | await self.streaming_manager.emit(StreamEvent( |
78 | 77 | type=EventType.RUNNING, |
79 | 78 | content=f"Round {iteration} of intelligent context collection...", |
80 | 79 | stage=WorkflowStage.CONTEXT_GATHERING, |
81 | 80 | progress=progress |
82 | 81 | )) |
83 | | - tool_calls = await self.strategy.analyze_and_plan_tools( |
84 | | - state.intent, |
85 | | - state.contexts, |
86 | | - max_tools=5, |
87 | | - iteration=iteration, |
88 | | - tool_history=tool_history |
89 | | - ) |
90 | | - # # Filter duplicate calls |
91 | | - # if tool_history: |
92 | | - # tool_calls = await self.strategy.filter_duplicate_calls( |
93 | | - # tool_calls, |
94 | | - # tool_history |
95 | | - # ) |
96 | | - if tool_calls: |
97 | | - # 2. Concurrently execute tool calls |
98 | | - await self.streaming_manager.emit(StreamEvent( |
99 | | - type=EventType.RUNNING, |
100 | | - content=f"Concurrently calling {len(tool_calls)} tools...", |
101 | | - stage=WorkflowStage.CONTEXT_GATHERING)) |
102 | | - new_context_items = await self.strategy.execute_tool_calls_parallel(tool_calls) |
103 | | - # Record tool call history |
104 | | - # for call in tool_calls: |
105 | | - # func_name = call.get("function", {}).get("name") |
106 | | - # func_args = call.get("function", {}).get("arguments", {}) |
107 | | - # tool_history.append({ |
108 | | - # "tool_name": func_name, |
109 | | - # "query": func_args.get("query", ""), |
110 | | - # "iteration": iteration |
111 | | - # }) |
112 | | - # 3. Add results to context |
113 | | - for item in new_context_items: |
114 | | - state.contexts.add_item(item) |
115 | | - await self.streaming_manager.emit(StreamEvent( |
116 | | - type=EventType.DONE, |
117 | | - content=f"Collected {len(new_context_items)} new context items in this round", |
118 | | - stage=WorkflowStage.CONTEXT_GATHERING)) |
119 | | - # 4. LLM evaluates sufficiency |
| 82 | + |
| 83 | + # 1. Evaluate sufficiency first (including first iteration) |
120 | 84 | sufficiency = await self.strategy.evaluate_sufficiency( |
121 | 85 | state.contexts, |
122 | 86 | state.intent |
123 | 87 | ) |
124 | 88 | state.contexts.sufficiency = sufficiency |
125 | | - self.logger.info(f"sufficiency {sufficiency}") |
126 | | - |
127 | | - # 5. If there are many context items, filter for relevance |
128 | | - if len(state.contexts.items) > 5: # Only filter if more than 5 items |
| 89 | + |
| 90 | + if sufficiency == ContextSufficiency.SUFFICIENT: |
129 | 91 | await self.streaming_manager.emit(StreamEvent( |
130 | | - type=EventType.RUNNING, |
131 | | - content="Filtering irrelevant context...", |
132 | | - stage=WorkflowStage.CONTEXT_GATHERING |
| 92 | + type=EventType.DONE, |
| 93 | + content=f"Context is sufficient, collected {len(state.contexts.items)} items in total", |
| 94 | + stage=WorkflowStage.CONTEXT_GATHERING, |
| 95 | + progress=1.0 |
133 | 96 | )) |
134 | | - |
135 | | - # Get relevant context IDs |
136 | | - relevant_ids = await self.strategy.filter_relevant_contexts( |
137 | | - state.contexts.items, |
138 | | - state.intent.enhanced_query or state.intent.original_query |
139 | | - ) |
140 | | - |
141 | | - # Convert relevant IDs to a set for quick lookup |
142 | | - relevant_id_set = set(relevant_ids) |
143 | | - |
144 | | - # Mark irrelevant context and keep relevant ones |
145 | | - original_count = len(state.contexts.items) |
146 | | - filtered_items = [] |
147 | | - |
148 | | - for item in state.contexts.items: |
149 | | - if item.id in relevant_id_set: |
150 | | - item.is_relevant = True |
151 | | - filtered_items.append(item) |
152 | | - else: |
153 | | - item.is_relevant = False |
154 | | - item.relevance_reason = "Judged irrelevant to the user's question by the LLM" |
155 | | - |
156 | | - # Update the context collection |
157 | | - state.contexts.items = filtered_items |
158 | | - |
159 | | - # await self.streaming_manager.emit(StreamEvent( |
160 | | - # type=EventType.DONE, |
161 | | - # content=f"Retained {len(filtered_items)} relevant context items after filtering (originally {original_count})", |
162 | | - # stage=WorkflowStage.CONTEXT_GATHERING |
163 | | - # )) |
164 | | - |
165 | | - if sufficiency == ContextSufficiency.SUFFICIENT: |
| 97 | + break |
| 98 | + |
| 99 | + # 2. Analyze information gap and plan tool calls |
| 100 | + tool_calls, _ = await self.strategy.analyze_and_plan_tools( |
| 101 | + state.intent, |
| 102 | + state.contexts, |
| 103 | + iteration=iteration |
| 104 | + ) |
| 105 | + |
| 106 | + if not tool_calls: |
166 | 107 | await self.streaming_manager.emit(StreamEvent( |
167 | 108 | type=EventType.DONE, |
168 | | - content=f"Context collection complete, collected {len(state.contexts.items)} items in total", |
| 109 | + content=f"No more tools to call, ending collection with {len(state.contexts.items)} items", |
169 | 110 | stage=WorkflowStage.CONTEXT_GATHERING, |
170 | 111 | progress=1.0 |
171 | 112 | )) |
172 | 113 | break |
173 | | - elif iteration >= self.max_iterations: |
| 114 | + |
| 115 | + # 3. Execute tool calls concurrently |
| 116 | + await self.streaming_manager.emit(StreamEvent( |
| 117 | + type=EventType.RUNNING, |
| 118 | + content=f"Concurrently calling {len(tool_calls)} tools...", |
| 119 | + stage=WorkflowStage.CONTEXT_GATHERING)) |
| 120 | + new_context_items = await self.strategy.execute_tool_calls_parallel(tool_calls) |
| 121 | + |
| 122 | + # 4. Validate and filter tool results |
| 123 | + await self.streaming_manager.emit(StreamEvent( |
| 124 | + type=EventType.RUNNING, |
| 125 | + content="Validating tool results and filtering relevant contexts...", |
| 126 | + stage=WorkflowStage.CONTEXT_GATHERING |
| 127 | + )) |
| 128 | + validated_items, _ = await self.strategy.validate_and_filter_tool_results( |
| 129 | + tool_calls, |
| 130 | + new_context_items, |
| 131 | + state.intent, |
| 132 | + state.contexts |
| 133 | + ) |
| 134 | + |
| 135 | + # 5. Add validated results to context collection |
| 136 | + for item in validated_items: |
| 137 | + state.contexts.add_item(item) |
| 138 | + |
| 139 | + await self.streaming_manager.emit(StreamEvent( |
| 140 | + type=EventType.DONE, |
| 141 | + content=f"Round {iteration}: Added {len(validated_items)} relevant context items (filtered from {len(new_context_items)} total)", |
| 142 | + stage=WorkflowStage.CONTEXT_GATHERING)) |
| 143 | + |
| 144 | + # Check if reached max iterations |
| 145 | + if iteration >= self.max_iterations: |
174 | 146 | state.contexts.sufficiency = ContextSufficiency.PARTIAL |
175 | 147 | await self.streaming_manager.emit(StreamEvent( |
176 | 148 | type=EventType.DONE, |
|
0 commit comments