Skip to content

Commit 10852a4

Browse files
authored
fix: optimize chat & todo (#35)
1 parent fc0a730 commit 10852a4

File tree

20 files changed

+1291
-941
lines changed

20 files changed

+1291
-941
lines changed

config/prompts_en.yaml

Lines changed: 646 additions & 468 deletions
Large diffs are not rendered by default.

config/prompts_zh.yaml

Lines changed: 323 additions & 143 deletions
Large diffs are not rendered by default.

opencontext/context_consumption/context_agent/core/llm_context_strategy.py

Lines changed: 143 additions & 167 deletions
Large diffs are not rendered by default.

opencontext/context_consumption/context_agent/models/enums.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,12 @@ class EventType(str, Enum):
7272

7373

7474
class QueryType(str, Enum):
75-
"""Query type enumeration - four categories"""
75+
"""Query type enumeration - five categories"""
7676
SIMPLE_CHAT = "simple_chat" # Simple chat (daily greetings, small talk, etc.)
7777
DOCUMENT_EDIT = "document_edit" # Document editing and rewriting (preserving existing facts/not introducing new information)
7878
QA_ANALYSIS = "qa_analysis" # Q&A (covering summarization, analysis, and dialogue based on documents and complex context)
7979
CONTENT_GENERATION = "content_generation" # Document content generation/expansion (allowing new information)
80+
CLARIFICATION_NEEDED = "clarification_needed" # Query is too vague or ambiguous, needs user clarification
8081

8182

8283
class ContextSufficiency(str, Enum):

opencontext/context_consumption/context_agent/models/schemas.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ def get_summary(self) -> str:
155155

156156
summary_parts = [f"{source.value}: {count}" for source, count in source_counts.items()]
157157
return f"Collected {len(self.items)} context items ({', '.join(summary_parts)}), sufficiency: {self.sufficiency.value}"
158+
159+
def get_chat_history(self) -> List[Dict[str, str]]:
160+
"""Get chat history as a list of dictionaries"""
161+
return [{"role": msg.role, "content": msg.content} for msg in self.chat_history]
158162

159163
@dataclass
160164
class ExecutionStep:

opencontext/context_consumption/context_agent/nodes/context.py

Lines changed: 52 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
Intelligently collects and judges context information
77
"""
88

9+
import json
910
from typing import List, Dict, Any, Optional
1011
from .base import BaseNode
1112
from ..core.state import WorkflowState, StreamEvent
@@ -69,108 +70,79 @@ async def process(self, state: WorkflowState) -> WorkflowState:
6970

7071
# LLM-driven iterative collection process
7172
iteration = 0
72-
tool_history = [] # Record tool call history
7373
while iteration < self.max_iterations:
7474
iteration += 1
7575
progress = iteration / self.max_iterations
76-
7776
await self.streaming_manager.emit(StreamEvent(
7877
type=EventType.RUNNING,
7978
content=f"Round {iteration} of intelligent context collection...",
8079
stage=WorkflowStage.CONTEXT_GATHERING,
8180
progress=progress
8281
))
83-
tool_calls = await self.strategy.analyze_and_plan_tools(
84-
state.intent,
85-
state.contexts,
86-
max_tools=5,
87-
iteration=iteration,
88-
tool_history=tool_history
89-
)
90-
# # Filter duplicate calls
91-
# if tool_history:
92-
# tool_calls = await self.strategy.filter_duplicate_calls(
93-
# tool_calls,
94-
# tool_history
95-
# )
96-
if tool_calls:
97-
# 2. Concurrently execute tool calls
98-
await self.streaming_manager.emit(StreamEvent(
99-
type=EventType.RUNNING,
100-
content=f"Concurrently calling {len(tool_calls)} tools...",
101-
stage=WorkflowStage.CONTEXT_GATHERING))
102-
new_context_items = await self.strategy.execute_tool_calls_parallel(tool_calls)
103-
# Record tool call history
104-
# for call in tool_calls:
105-
# func_name = call.get("function", {}).get("name")
106-
# func_args = call.get("function", {}).get("arguments", {})
107-
# tool_history.append({
108-
# "tool_name": func_name,
109-
# "query": func_args.get("query", ""),
110-
# "iteration": iteration
111-
# })
112-
# 3. Add results to context
113-
for item in new_context_items:
114-
state.contexts.add_item(item)
115-
await self.streaming_manager.emit(StreamEvent(
116-
type=EventType.DONE,
117-
content=f"Collected {len(new_context_items)} new context items in this round",
118-
stage=WorkflowStage.CONTEXT_GATHERING))
119-
# 4. LLM evaluates sufficiency
82+
83+
# 1. Evaluate sufficiency first (including first iteration)
12084
sufficiency = await self.strategy.evaluate_sufficiency(
12185
state.contexts,
12286
state.intent
12387
)
12488
state.contexts.sufficiency = sufficiency
125-
self.logger.info(f"sufficiency {sufficiency}")
126-
127-
# 5. If there are many context items, filter for relevance
128-
if len(state.contexts.items) > 5: # Only filter if more than 5 items
89+
90+
if sufficiency == ContextSufficiency.SUFFICIENT:
12991
await self.streaming_manager.emit(StreamEvent(
130-
type=EventType.RUNNING,
131-
content="Filtering irrelevant context...",
132-
stage=WorkflowStage.CONTEXT_GATHERING
92+
type=EventType.DONE,
93+
content=f"Context is sufficient, collected {len(state.contexts.items)} items in total",
94+
stage=WorkflowStage.CONTEXT_GATHERING,
95+
progress=1.0
13396
))
134-
135-
# Get relevant context IDs
136-
relevant_ids = await self.strategy.filter_relevant_contexts(
137-
state.contexts.items,
138-
state.intent.enhanced_query or state.intent.original_query
139-
)
140-
141-
# Convert relevant IDs to a set for quick lookup
142-
relevant_id_set = set(relevant_ids)
143-
144-
# Mark irrelevant context and keep relevant ones
145-
original_count = len(state.contexts.items)
146-
filtered_items = []
147-
148-
for item in state.contexts.items:
149-
if item.id in relevant_id_set:
150-
item.is_relevant = True
151-
filtered_items.append(item)
152-
else:
153-
item.is_relevant = False
154-
item.relevance_reason = "Judged irrelevant to the user's question by the LLM"
155-
156-
# Update the context collection
157-
state.contexts.items = filtered_items
158-
159-
# await self.streaming_manager.emit(StreamEvent(
160-
# type=EventType.DONE,
161-
# content=f"Retained {len(filtered_items)} relevant context items after filtering (originally {original_count})",
162-
# stage=WorkflowStage.CONTEXT_GATHERING
163-
# ))
164-
165-
if sufficiency == ContextSufficiency.SUFFICIENT:
97+
break
98+
99+
# 2. Analyze information gap and plan tool calls
100+
tool_calls, _ = await self.strategy.analyze_and_plan_tools(
101+
state.intent,
102+
state.contexts,
103+
iteration=iteration
104+
)
105+
106+
if not tool_calls:
166107
await self.streaming_manager.emit(StreamEvent(
167108
type=EventType.DONE,
168-
content=f"Context collection complete, collected {len(state.contexts.items)} items in total",
109+
content=f"No more tools to call, ending collection with {len(state.contexts.items)} items",
169110
stage=WorkflowStage.CONTEXT_GATHERING,
170111
progress=1.0
171112
))
172113
break
173-
elif iteration >= self.max_iterations:
114+
115+
# 3. Execute tool calls concurrently
116+
await self.streaming_manager.emit(StreamEvent(
117+
type=EventType.RUNNING,
118+
content=f"Concurrently calling {len(tool_calls)} tools...",
119+
stage=WorkflowStage.CONTEXT_GATHERING))
120+
new_context_items = await self.strategy.execute_tool_calls_parallel(tool_calls)
121+
122+
# 4. Validate and filter tool results
123+
await self.streaming_manager.emit(StreamEvent(
124+
type=EventType.RUNNING,
125+
content="Validating tool results and filtering relevant contexts...",
126+
stage=WorkflowStage.CONTEXT_GATHERING
127+
))
128+
validated_items, _ = await self.strategy.validate_and_filter_tool_results(
129+
tool_calls,
130+
new_context_items,
131+
state.intent,
132+
state.contexts
133+
)
134+
135+
# 5. Add validated results to context collection
136+
for item in validated_items:
137+
state.contexts.add_item(item)
138+
139+
await self.streaming_manager.emit(StreamEvent(
140+
type=EventType.DONE,
141+
content=f"Round {iteration}: Added {len(validated_items)} relevant context items (filtered from {len(new_context_items)} total)",
142+
stage=WorkflowStage.CONTEXT_GATHERING))
143+
144+
# Check if reached max iterations
145+
if iteration >= self.max_iterations:
174146
state.contexts.sufficiency = ContextSufficiency.PARTIAL
175147
await self.streaming_manager.emit(StreamEvent(
176148
type=EventType.DONE,

opencontext/context_consumption/context_agent/nodes/intent.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def process(self, state: WorkflowState) -> WorkflowState:
3333
"""Process intent analysis"""
3434
await self.streaming_manager.emit(StreamEvent(type=EventType.THINKING, content="Analyzing your intent...", stage=WorkflowStage.INTENT_ANALYSIS))
3535
# 1. Classify query type
36-
query_type = await self._classify_query(state.query.text)
36+
query_type = await self._classify_query(state.query.text, state.contexts.get_chat_history())
3737
if not query_type:
3838
await self.streaming_manager.emit(StreamEvent(type=EventType.FAIL, content="Intent analysis failed", stage=WorkflowStage.INTENT_ANALYSIS,
3939
metadata={ "query": state.query.text }))
@@ -59,12 +59,12 @@ async def process(self, state: WorkflowState) -> WorkflowState:
5959
# )
6060
return state
6161

62-
async def _classify_query(self, query: str) -> QueryType:
62+
async def _classify_query(self, query: str, chat_history: List[Dict[str, str]]) -> QueryType:
6363
"""Use LLM to classify query types, including confidence assessment and fallback strategies"""
6464
prompt_group = get_prompt_group("chat_workflow.query_classification")
6565
messages = [
6666
{"role": "system", "content": prompt_group['system']},
67-
{"role": "user", "content": prompt_group['user'].format(query=query)}
67+
{"role": "user", "content": prompt_group['user'].format(query=query, chat_history=json.dumps(chat_history))}
6868
]
6969
response = await generate_with_messages_async(
7070
messages,

opencontext/context_consumption/generation/smart_todo_manager.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ class TodoTask:
3737
assignee: Optional[str] = None
3838
participants: List[str] = field(default_factory=list)
3939
context_reference: Optional[str] = None
40+
reason: Optional[str] = None
41+
created_at: Optional[str] = None
42+
4043

4144

4245
class SmartTodoManager:
@@ -79,22 +82,23 @@ def generate_todo_tasks(self, start_time: int, end_time: int) -> Optional[str]:
7982
# 2. Get regular context data
8083
contexts = self._get_task_relevant_contexts(start_time, end_time, activity_insights)
8184
# 3. Get historical todo completion status
82-
historical_todos = self._get_historical_todos(days=1)
85+
historical_todos = self._get_historical_todos()
8386
# 4. Synthesize all information to generate high-quality todos
8487
tasks = self._extract_tasks_from_contexts_enhanced(
8588
contexts, start_time, end_time, activity_insights, historical_todos)
8689

8790
if not tasks:
88-
logger.info("No clear tasks were identified from the activity.")
8991
return None
9092
# Store in the SQLite todo table
9193
todo_ids = []
9294
for task in tasks:
9395
participants_str = ""
9496
if task.get('participants') and len(task['participants']) > 0:
9597
participants_str = ",".join(task['participants'])
96-
98+
9799
content = task.get('description', '')
100+
reason = task.get('reason', '')
101+
logger.info(f"Generated Todo Task: {task}")
98102
urgency = self._map_priority_to_urgency(task.get('priority', 'normal'))
99103

100104
deadline = None
@@ -107,12 +111,13 @@ def generate_todo_tasks(self, start_time: int, end_time: int) -> Optional[str]:
107111
deadline = datetime.datetime.strptime(task['due_date'], '%Y-%m-%d')
108112
except:
109113
pass
110-
114+
111115
todo_id = self.storage.insert_todo(
112116
content=content,
113117
urgency=urgency,
114118
end_time=deadline,
115-
assignee=participants_str
119+
assignee=participants_str,
120+
reason=reason
116121
)
117122
todo_ids.append(todo_id)
118123

@@ -172,12 +177,12 @@ def _get_recent_activity_insights(self, start: int, end: int) -> Dict[str, Any]:
172177
logger.exception(f"Failed to get activity insights: {e}")
173178
return {}
174179

175-
def _get_historical_todos(self, days: int = 7) -> List[Dict[str, Any]]:
180+
def _get_historical_todos(self, days: int = 7, limit: int = 50) -> List[Dict[str, Any]]:
176181
"""Get historical todo records.
177182
"""
178183
try:
179184
start_time = datetime.datetime.now() - datetime.timedelta(days=days)
180-
todos = self.storage.get_todos(limit=50, start_time=start_time)
185+
todos = self.storage.get_todos(limit=limit, start_time=start_time)
181186
return todos
182187
except Exception as e:
183188
logger.exception(f"Failed to get historical todos: {e}")
@@ -291,7 +296,8 @@ def _post_process_tasks(self, tasks: List[Dict]) -> List[Dict]:
291296
'assignee': task.get('assignee', ''), # Task assignee
292297
'participants': task.get('participants', []), # List of participants
293298
'context_reference': task.get('context_reference', ''),
294-
'created_at': datetime.datetime.now().isoformat()
299+
'created_at': datetime.datetime.now().isoformat(),
300+
'reason': task.get('reason', '')
295301
}
296302

297303
# Process the deadline

opencontext/managers/consumption_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ def check_and_generate_daily_report():
216216
self._task_timers['report'].start()
217217

218218
check_and_generate_daily_report()
219-
logger.info(f"Daily report generation timer started, checking every minute, generation time: {self._daily_report_time}")
220219

221220
def _start_activity_timer(self):
222221
"""Start activity recording timer"""

0 commit comments

Comments
 (0)