Skip to content

feat(resources): add resource watch scheduling and status tracking#709

Merged
qin-ctx merged 13 commits intomainfrom
feature/add_resource_watch
Mar 18, 2026
Merged

feat(resources): add resource watch scheduling and status tracking#709
qin-ctx merged 13 commits intomainfrom
feature/add_resource_watch

Conversation

@myysy
Copy link
Copy Markdown
Collaborator

@myysy myysy commented Mar 17, 2026

Description

This PR adds a “resource watch” capability that automatically re-processes resources at a configurable interval. It introduces persisted watch tasks, a background scheduler to run due tasks safely, and end-to-end support (API/SDK/CLI) for enabling, updating, canceling.

Related Issue

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement
  • Test update

Changes Made

  • Implemented persisted watch tasks with:
    • create/update/delete/list operations
    • URI conflict prevention for watched targets
    • per-tenant/user permission checks
    • tracking last_execution_time and next_execution_time
  • Implemented a background watch scheduler that:
    • loads watch tasks on startup
    • periodically runs due tasks with concurrency control
    • updates next/last execution timestamps after each run
    • deactivates tasks when the watched resource path no longer exists
  • Wired watch support into resource ingestion:
    • watch_interval (minutes) supported end-to-end (server API + Python client + CLI)
    • scheduler execution path avoids recursive watch-task creation
  • Added watch status querying to return:
    • whether a resource is being watched
    • interval, task id, next/last execution timestamps
  • Updated docs and added an example demonstrating watch usage.

Testing

  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • I have tested this on the following platforms:
    • Linux
    • macOS
    • Windows

Checklist

  • My code follows the project's coding style
  • I have performed a self-review of my code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

Screenshots (if applicable)

Additional Notes

myysy added 7 commits March 17, 2026 14:09
implement resource watch functionality that allows automatic monitoring and re-processing of resources at specified intervals. key features include:
- add watch_interval parameter to resource APIs
- create watch scheduler service for task execution
- handle conflict detection for active watch tasks
- provide watch status query capability
- include comprehensive tests and examples

the watch feature enables periodic automatic updates of resources without manual intervention, improving data freshness for frequently changing content
…ssing

- Implement get_watch_status API for tracking resource watch status
- Add immediate persistence for first-time resource additions
- Improve file change detection with size comparison
- Refactor watch scheduler with better concurrency control
- Add test coverage for watch status and resource processing
- Remove unused watch manager references and clean up code
- Simplify logging by removing redundant data copying
- Fix syntax errors in docstrings and string literals
- Add new fields to EmbeddingMsg class
- Improve line wrapping and formatting
- Update watch task storage URIs to use hidden files
…g unused fields

Remove media_uri, media_mime_type and id parameters as they are not used in the implementation
@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Permission Check Bug

The _check_permission method ignores the require_owner flag for USER role. Both branches return task.user_id == user_id, so non-owner users cannot access tasks even when require_owner=False (e.g., for read operations).

if require_owner:
    return task.user_id == user_id

return task.user_id == user_id
Unnecessary Exception Reraise

The _load_tasks method catches FileNotFoundError and immediately re-raises it, which serves no purpose and can be removed.

except FileNotFoundError:
    raise

@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Use public async viking_fs.mv instead of private sync call

Replace direct private viking_fs.agfs.mv with the public async viking_fs.mv method
to avoid race conditions and use the proper API. Await the move operation to ensure
it completes before proceeding. Remove usage of private _uri_to_path and direct agfs
attribute.

openviking/utils/resource_processor.py [220-246]

 root_uri = result.get("root_uri")
 temp_uri = result.get("temp_uri")  # temp_doc_uri
 
 if root_uri and temp_uri:
     viking_fs = get_viking_fs()
     target_exists = await viking_fs.exists(root_uri, ctx=ctx)
     if not target_exists:
-
-        dst_path = viking_fs._uri_to_path(root_uri, ctx=ctx)
-        parent_path = dst_path.rsplit("/", 1)[0] if "/" in dst_path else dst_path
-
-        # 确保父目录存在
-        parent_uri = "/".join(root_uri.rsplit("/", 1)[:-1]
+        # Ensure parent directory exists
+        parent_uri = "/".join(root_uri.rsplit("/", 1)[:-1])
         if parent_uri:
             await viking_fs.mkdir(parent_uri, exist_ok=True, ctx=ctx)
 
-        src_path = viking_fs._uri_to_path(temp_uri, ctx=ctx)
-        viking_fs.agfs.mv(src_path, dst_path)
+        # Move temp to final location using public async API
+        await viking_fs.mv(temp_uri, root_uri, ctx=ctx)
 
-        # 清理 temp 根目录
+        # Clean up temp root directory
         try:
             await viking_fs.delete_temp(parse_result.temp_dir_path, ctx=ctx)
         except Exception:
             pass
 
-        # 更新 temp_uri → DAG 直接在 final 上跑
+        # Update temp_uri → DAG runs directly on final location
         result["temp_uri"] = root_uri
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly replaces usage of private viking_fs._uri_to_path and synchronous viking_fs.agfs.mv with the public async viking_fs.mv API. This improves maintainability (avoids private API usage), ensures proper async/await semantics, and reduces risk of race conditions.

Medium

myysy added 2 commits March 18, 2026 11:29
Add test case for recovering tasks from backup storage when primary is missing
Remove require_owner parameter from _check_permission as it's redundant with the existing role-based checks
Add watch_interval parameter to enable periodic resource updates. When target is specified, watch_interval > 0 creates/updates a watch task, while <= 0 disables it. Also simplify resource moving logic in ResourceProcessor by using direct mv operation.
Copy link
Copy Markdown
Collaborator

@qin-ctx qin-ctx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

The resource watch feature is well-structured with comprehensive test coverage (~2300 lines of tests including unit, integration, E2E, and recovery tests). The WatchManager's persistence design with atomic write rotation (tmp→bak→main) is solid.

However, there are blocking issues around silent failure on cross-user URI conflicts, use of private VikingFS APIs in Phase 3.5, and privilege escalation in the scheduler.

Additionally:

  • PR description has all "Type of Change" checkboxes unchecked (this is clearly a New feature)
  • No REST API endpoint for get_watch_status — only available through Python SDK, not via HTTP API or CLI

remove get_watch_status method and related tests, update examples to use direct task access
update watch manager to use ConflictError for URI conflicts and include original_role in tasks
add validation for watch_interval requiring target URI
Copy link
Copy Markdown
Collaborator

@qin-ctx qin-ctx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

All 5 blocking issues from the previous review have been addressed:

  • URI conflict now properly raises ConflictError (not silently caught)
  • Phase 3.5 uses public VikingFS APIs (mv/exists/mkdir)
  • Scheduler preserves original user roles via task.original_role
  • watch_interval > 0 without to now raises InvalidArgumentError
  • Example uses public API only

One previously flagged non-blocking issue remains unaddressed: naive datetime.now() without timezone.

There is one new blocking issue: watch_interval validation runs after resource processing, leading to resource ingestion before the error is raised.

Additionally, the embedding_tracker.py changes (removal of get_status, remove, get_all_tracked methods) are unrelated to the watch feature. Consider moving these to a separate PR to keep change scope focused and simplify rollback if needed.

Minor: The PR's "Type of Change" checkboxes are still all unchecked — this should be marked as "New feature".

@qin-ctx qin-ctx merged commit 834b808 into main Mar 18, 2026
11 of 12 checks passed
@qin-ctx qin-ctx deleted the feature/add_resource_watch branch March 18, 2026 07:16
@github-project-automation github-project-automation bot moved this from Backlog to Done in OpenViking project Mar 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

2 participants