Skip to content

Commit b4016c6

Browse files
jpheinclaude
andcommitted
feat: add /backfill-age endpoint for remote AGE graph population
Runs mempalace.backfill_age as a background subprocess, triggered via HTTP POST. Returns immediately; poll /backfill-age/status for progress. This enables populating the AGE entity graph from existing drawers without SSH access to the daemon host. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 20bc494 commit b4016c6

1 file changed

Lines changed: 95 additions & 0 deletions

File tree

main.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2501,6 +2501,101 @@ async def mine(request: Request, x_api_key: str | None = Header(default=None)):
25012501
return result
25022502

25032503

2504+
_backfill_state: dict[str, Any] = {"in_progress": False}
2505+
_backfill_lock = asyncio.Lock()
2506+
2507+
@app.post("/backfill-age")
2508+
async def backfill_age(request: Request, x_api_key: str | None = Header(default=None)):
2509+
"""Trigger AGE graph backfill from existing drawer rows.
2510+
2511+
Runs `mempalace-backfill-age` (or `python -m mempalace.backfill_age`)
2512+
as a background subprocess. Returns immediately with status; poll
2513+
/backfill-age/status for progress.
2514+
2515+
Body (all optional)::
2516+
2517+
{
2518+
"wing": null, // restrict to one wing
2519+
"skip_palace": false, // skip Wing/Room/Drawer structure
2520+
"skip_entities": false, // skip per-drawer entity extraction
2521+
"restart": false // clear checkpoint, start fresh
2522+
}
2523+
2524+
Requires MEMPALACE_BACKEND=postgres.
2525+
"""
2526+
_check_auth(x_api_key)
2527+
if _mp._config.backend != "postgres":
2528+
raise HTTPException(status_code=503, detail="backfill-age requires postgres backend")
2529+
2530+
async with _backfill_lock:
2531+
if _backfill_state["in_progress"]:
2532+
return {"status": "already_running", "started_at": _backfill_state.get("started_at")}
2533+
2534+
dsn = os.environ.get("MEMPALACE_POSTGRES_DSN")
2535+
if not dsn:
2536+
cfg = _mp.MempalaceConfig()
2537+
dsn = cfg.postgres_dsn
2538+
if not dsn:
2539+
raise HTTPException(status_code=500, detail="no postgres DSN available")
2540+
2541+
body = await request.json() if request.headers.get("content-type") == "application/json" else {}
2542+
cmd = [sys.executable, "-m", "mempalace.backfill_age", "--dsn", dsn]
2543+
if body.get("wing"):
2544+
cmd += ["--wing", body["wing"]]
2545+
if body.get("skip_palace"):
2546+
cmd.append("--skip-palace")
2547+
if body.get("skip_entities"):
2548+
cmd.append("--skip-entities")
2549+
if body.get("restart"):
2550+
cmd.append("--restart")
2551+
2552+
_backfill_state["in_progress"] = True
2553+
_backfill_state["started_at"] = _time.monotonic()
2554+
_backfill_state["output_lines"] = []
2555+
2556+
async def _run_backfill():
2557+
try:
2558+
proc = await asyncio.create_subprocess_exec(
2559+
*cmd,
2560+
stdout=asyncio.subprocess.PIPE,
2561+
stderr=asyncio.subprocess.STDOUT,
2562+
)
2563+
async for line in proc.stdout:
2564+
decoded = line.decode().rstrip()
2565+
_backfill_state.setdefault("output_lines", []).append(decoded)
2566+
if len(_backfill_state["output_lines"]) > 200:
2567+
_backfill_state["output_lines"] = _backfill_state["output_lines"][-100:]
2568+
await proc.wait()
2569+
_backfill_state["returncode"] = proc.returncode
2570+
except Exception as e:
2571+
_backfill_state["error"] = str(e)
2572+
finally:
2573+
_backfill_state["in_progress"] = False
2574+
_backfill_state["finished_at"] = _time.monotonic()
2575+
2576+
asyncio.create_task(_run_backfill())
2577+
return {"status": "started", "command": " ".join(cmd[:4]) + " ..."}
2578+
2579+
2580+
@app.get("/backfill-age/status")
2581+
async def backfill_age_status(x_api_key: str | None = Header(default=None)):
2582+
"""Poll backfill-age progress."""
2583+
_check_auth(x_api_key)
2584+
result = {
2585+
"in_progress": _backfill_state["in_progress"],
2586+
}
2587+
if _backfill_state.get("started_at"):
2588+
elapsed = _time.monotonic() - _backfill_state["started_at"]
2589+
result["elapsed_seconds"] = round(elapsed, 1)
2590+
if _backfill_state.get("output_lines"):
2591+
result["recent_output"] = _backfill_state["output_lines"][-10:]
2592+
if _backfill_state.get("returncode") is not None:
2593+
result["returncode"] = _backfill_state["returncode"]
2594+
if _backfill_state.get("error"):
2595+
result["error"] = _backfill_state["error"]
2596+
return result
2597+
2598+
25042599
@app.get("/watch")
25052600
async def watch_list(x_api_key: str | None = Header(default=None)):
25062601
"""List the directories the file-watcher is currently monitoring.

0 commit comments

Comments
 (0)