Skip to content

Commit b3f0206

Browse files
committed
feat(kg-age): add_mention + backfill_age + edge-type-union walk fix
Builds out Phase 4 (backfill script) and tightens Phases 1-3: KnowledgeGraphAGE.add_mention(): New method that connects Drawer nodes (palace structure) to Entity nodes via :MENTIONS edges, distinct from add_triple's Entity-RELATION-Entity pattern. Lets the write-through layer (Phase 2) populate the kg in the same shape the read-side walk (walk_wing, MCP tools) expects. AGE 1.6.0 dialect gaps documented + worked around: - No SET on edge properties inline (errors at '=') - No ON CREATE SET - No coalesce() in SET - No edge-type union [:A|B] in MATCH CREATE-always edge semantics matches the SQLite KG's triples-table behavior (no upsert on triples; multiple add_triple calls create parallel rows). Aggregate at read time via count(r). palace_graph_age.walk_wing(): Fixed to use :RELATION + property filter instead of [:MENTIONS|RELATION] union — AGE doesn't parse the union syntax. kg_writethrough.make_age_writethrough(): Switched from add_triple to add_mention so write-through populates the Drawer-MENTIONS-Entity graph the read side expects. backfill_age.py (Phase 4): CLI + library entry point for one-shot AGE-graph population from existing drawer table. Restartable via mempalace_kg_backfill_state checkpoint table. Stream-based via psycopg2 named cursor (bounded memory). Configurable scope (--wing, --skip-palace, --skip-entities, --extractor). Default extractor is SME's regex extractor (with builtin fallback when SME isn't importable). Smoke test on sme_lme_bench: - populate_from_postgres(skip_drawers=True) → 2 wings, 237 rooms, 238 CONTAINS, 1 SHARED_VIA tunnel - backfill (docs wing only, max 20 entities/drawer) → 1181 drawers processed → 6015 entities + 13721 mentions in 5.85 min (~3.4 drawers/s; projects to ~22h for the prod 274K palace) - add_mention parallel-edge test: 3 calls (1 duplicate) → 2 edges for Atakan, 1 for adaptmem; aggregate count(r) recovers true count
1 parent ff583c0 commit b3f0206

4 files changed

Lines changed: 410 additions & 7 deletions

File tree

mempalace/backfill_age.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
"""Backfill the AGE graph from an existing drawer table.
2+
3+
Phase 4 of the AGE-integration goal. Builds the full palace-graph + KG
4+
state from drawer rows that were written BEFORE the write-through
5+
middleware was registered. Companion to ``migrate_to_postgres``: that
6+
script copies chroma → postgres, this one copies postgres-drawers →
7+
postgres-AGE.
8+
9+
Design goals:
10+
11+
1. **Restartable.** A checkpoint table (`mempalace_kg_backfill_state`)
12+
tracks last completed (wing, room) pair. Re-running from scratch is
13+
safe but skips already-completed (wing, room) groups.
14+
2. **Idempotent.** All inserts use MERGE, so the same drawer being
15+
processed twice has no duplicating effect.
16+
3. **Bounded memory.** Streams drawer rows via server-side cursor
17+
(psycopg2's named cursor), processes one at a time, never holds
18+
the full result set.
19+
4. **Configurable scope.** Can target one wing at a time, or all wings,
20+
or only do the high-level palace map (skip per-drawer entity
21+
extraction) for a fast first pass.
22+
23+
Three layers populated:
24+
25+
- Palace structure (Wing → Room → Drawer): from
26+
``palace_graph_age.populate_from_postgres``. Idempotent re-MERGE.
27+
- Entity extraction + MENTIONS edges: per-drawer regex extractor by
28+
default (configurable via env var).
29+
- Optional: KG triples seeded from ENTITY_FACTS if present.
30+
31+
CLI entry point (registered as ``mempalace-backfill-age``):
32+
33+
mempalace-backfill-age \\
34+
--dsn "$MEMPALACE_POSTGRES_DSN" \\
35+
--table mempalace_drawers \\
36+
[--wing <name>] \\
37+
[--skip-palace] \\
38+
[--skip-entities] \\
39+
[--restart]
40+
"""
41+
42+
from __future__ import annotations
43+
44+
import argparse
45+
import json
46+
import logging
47+
import sys
48+
import time
49+
from typing import Optional
50+
51+
from .knowledge_graph_age import KnowledgeGraphAGE
52+
from .palace_graph_age import populate_from_postgres
53+
54+
logger = logging.getLogger("mempalace.backfill_age")
55+
56+
CHECKPOINT_TABLE = "mempalace_kg_backfill_state"
57+
58+
59+
def _ensure_checkpoint_table(conn) -> None:
60+
"""Idempotent — creates the checkpoint table if not present."""
61+
with conn.cursor() as cur:
62+
cur.execute(
63+
f"""
64+
CREATE TABLE IF NOT EXISTS {CHECKPOINT_TABLE} (
65+
phase TEXT NOT NULL,
66+
key TEXT NOT NULL,
67+
completed_at TIMESTAMPTZ DEFAULT NOW(),
68+
PRIMARY KEY (phase, key)
69+
)
70+
"""
71+
)
72+
conn.commit()
73+
74+
75+
def _checkpoint_done(conn, phase: str, key: str) -> bool:
76+
with conn.cursor() as cur:
77+
cur.execute(
78+
f"SELECT 1 FROM {CHECKPOINT_TABLE} WHERE phase = %s AND key = %s",
79+
(phase, key),
80+
)
81+
return cur.fetchone() is not None
82+
83+
84+
def _checkpoint_mark(conn, phase: str, key: str) -> None:
85+
with conn.cursor() as cur:
86+
cur.execute(
87+
f"INSERT INTO {CHECKPOINT_TABLE} (phase, key) VALUES (%s, %s) "
88+
f"ON CONFLICT (phase, key) DO NOTHING",
89+
(phase, key),
90+
)
91+
conn.commit()
92+
93+
94+
def _checkpoint_clear(conn) -> int:
95+
with conn.cursor() as cur:
96+
cur.execute(f"DELETE FROM {CHECKPOINT_TABLE}")
97+
conn.commit()
98+
return cur.rowcount
99+
100+
101+
def _get_extractor(name: str = "regex"):
102+
"""Return an extractor callable matching (text) -> list[Entity]."""
103+
if name == "regex":
104+
try:
105+
from sme.extractors.regex import extract as sme_extract # type: ignore
106+
return sme_extract
107+
except ImportError:
108+
from .kg_writethrough import _builtin_regex_extractor
109+
return _builtin_regex_extractor
110+
raise ValueError(
111+
f"unknown extractor={name!r}; supported: regex (spacy/llm pending)"
112+
)
113+
114+
115+
def backfill(
116+
*,
117+
dsn: str,
118+
table_name: str = "mempalace_drawers",
119+
wing_filter: Optional[str] = None,
120+
skip_palace: bool = False,
121+
skip_entities: bool = False,
122+
extractor_name: str = "regex",
123+
max_entities_per_drawer: int = 50,
124+
relation_type: str = "mentions",
125+
confidence: float = 0.5,
126+
restart: bool = False,
127+
log_every: int = 500,
128+
) -> dict:
129+
"""Backfill AGE graph from an existing drawer table.
130+
131+
Args:
132+
dsn: Postgres DSN — must point at a database where AGE is loaded.
133+
table_name: Source drawer table.
134+
wing_filter: If set, only process drawers in this wing.
135+
skip_palace: Skip Wing/Room/Drawer/SHARED_VIA population. Useful
136+
if you've already run it once and just want a fresh entity
137+
pass.
138+
skip_entities: Skip MENTIONS extraction. Useful for first-pass
139+
"just give me the palace map" on huge palaces.
140+
extractor_name: regex (default), spacy, llm — only regex
141+
implemented today.
142+
max_entities_per_drawer: Cap on entities per drawer write; same
143+
knob as ``kg_writethrough.make_age_writethrough``.
144+
relation_type: Edge label for drawer → entity mentions.
145+
confidence: Default confidence for extracted mentions.
146+
restart: Clear the checkpoint table before starting (forces a
147+
full re-backfill).
148+
log_every: How often to emit progress logs.
149+
150+
Returns counters dict tracking what was processed.
151+
"""
152+
from .backends.postgres import _load_psycopg2
153+
154+
psycopg2, _ = _load_psycopg2()
155+
counters = {
156+
"palace": {},
157+
"drawers_seen": 0,
158+
"drawers_skipped_checkpoint": 0,
159+
"entities_added": 0,
160+
"errors": 0,
161+
"started_at": time.time(),
162+
}
163+
164+
# Connect KG + checkpoint connection separately so a long-running
165+
# entity extraction phase doesn't tie up the checkpoint cursor.
166+
kg = KnowledgeGraphAGE(dsn)
167+
checkpoint_conn = psycopg2.connect(dsn)
168+
checkpoint_conn.autocommit = False
169+
_ensure_checkpoint_table(checkpoint_conn)
170+
if restart:
171+
cleared = _checkpoint_clear(checkpoint_conn)
172+
logger.info("backfill: --restart cleared %d checkpoint rows", cleared)
173+
174+
# Phase A: palace structure
175+
if not skip_palace:
176+
palace_key = f"palace:{wing_filter or 'ALL'}"
177+
if not _checkpoint_done(checkpoint_conn, "palace", palace_key):
178+
logger.info("backfill: phase=palace key=%s", palace_key)
179+
counters["palace"] = populate_from_postgres(
180+
kg, dsn=dsn, table_name=table_name,
181+
# We rebuild drawers in phase B with entity extraction;
182+
# palace pass only needs the structural Wing/Room/SHARED_VIA.
183+
skip_drawers=True,
184+
skip_tunnels=False,
185+
)
186+
_checkpoint_mark(checkpoint_conn, "palace", palace_key)
187+
else:
188+
logger.info("backfill: phase=palace key=%s already done; skipping", palace_key)
189+
190+
# Phase B: drawer + entity extraction
191+
if not skip_entities:
192+
extractor = _get_extractor(extractor_name)
193+
194+
sql_drawers = f"""
195+
SELECT id, document, wing, room FROM "{table_name}"
196+
WHERE document IS NOT NULL
197+
"""
198+
params: list = []
199+
if wing_filter:
200+
sql_drawers += " AND wing = %s"
201+
params.append(wing_filter)
202+
sql_drawers += " ORDER BY id"
203+
204+
# Stream via named cursor so we don't load all drawers at once.
205+
scan_conn = psycopg2.connect(dsn)
206+
scan_conn.autocommit = False
207+
try:
208+
with scan_conn.cursor(name="drawer_scan_cur") as cur:
209+
cur.itersize = 1000
210+
cur.execute(sql_drawers, params)
211+
t0 = time.time()
212+
for drawer_id, document, wing, room in cur:
213+
counters["drawers_seen"] += 1
214+
if _checkpoint_done(checkpoint_conn, "drawer", drawer_id):
215+
counters["drawers_skipped_checkpoint"] += 1
216+
continue
217+
218+
# Ensure the Drawer node exists + CONTAINS edge from Room.
219+
try:
220+
kg._run_cypher(
221+
"MERGE (d:Drawer {id: $id})",
222+
{"id": drawer_id},
223+
)
224+
if room and room.strip():
225+
kg._run_cypher(
226+
"""
227+
MATCH (r:Room {name: $room}), (d:Drawer {id: $id})
228+
MERGE (r)-[:CONTAINS]->(d)
229+
""",
230+
{"room": room, "id": drawer_id},
231+
)
232+
except Exception as e: # noqa: BLE001
233+
counters["errors"] += 1
234+
logger.debug("drawer-node failed for %s: %s", drawer_id, e)
235+
continue
236+
237+
# Extract + add MENTIONS edges (Drawer)-[:MENTIONS]->(Entity).
238+
try:
239+
ents = extractor(document)
240+
except Exception as e: # noqa: BLE001
241+
counters["errors"] += 1
242+
logger.debug("extractor failed for %s: %s", drawer_id, e)
243+
ents = []
244+
for ent in (ents or [])[:max_entities_per_drawer]:
245+
try:
246+
kg.add_mention(
247+
drawer_id=drawer_id,
248+
entity_name=ent.name,
249+
entity_type=getattr(ent, "type", "unknown"),
250+
count=getattr(ent, "count", 1),
251+
confidence=confidence,
252+
)
253+
counters["entities_added"] += 1
254+
except Exception as e: # noqa: BLE001
255+
counters["errors"] += 1
256+
logger.debug(
257+
"add_mention failed (%s, %s): %s",
258+
drawer_id, ent.name, e,
259+
)
260+
261+
_checkpoint_mark(checkpoint_conn, "drawer", drawer_id)
262+
263+
if counters["drawers_seen"] % log_every == 0:
264+
elapsed = time.time() - t0
265+
rate = counters["drawers_seen"] / max(elapsed, 0.001)
266+
logger.info(
267+
"backfill: drawers_seen=%d entities_added=%d skipped=%d errors=%d rate=%.1f/s",
268+
counters["drawers_seen"],
269+
counters["entities_added"],
270+
counters["drawers_skipped_checkpoint"],
271+
counters["errors"],
272+
rate,
273+
)
274+
finally:
275+
scan_conn.close()
276+
277+
counters["finished_at"] = time.time()
278+
counters["wall_clock_s"] = round(counters["finished_at"] - counters["started_at"], 1)
279+
kg.close()
280+
checkpoint_conn.close()
281+
return counters
282+
283+
284+
def main(argv: Optional[list[str]] = None) -> int:
285+
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")
286+
parser = argparse.ArgumentParser(description="Backfill AGE graph from drawer table")
287+
parser.add_argument("--dsn", required=True, help="Postgres DSN")
288+
parser.add_argument("--table", default="mempalace_drawers", help="Drawer table name")
289+
parser.add_argument("--wing", default=None, help="Restrict to a single wing")
290+
parser.add_argument("--skip-palace", action="store_true",
291+
help="Skip Wing/Room/SHARED_VIA structure")
292+
parser.add_argument("--skip-entities", action="store_true",
293+
help="Skip per-drawer entity extraction")
294+
parser.add_argument("--extractor", default="regex", help="Entity extractor (regex)")
295+
parser.add_argument("--max-entities", type=int, default=50)
296+
parser.add_argument("--restart", action="store_true",
297+
help="Clear checkpoint table before starting")
298+
parser.add_argument("--log-every", type=int, default=500)
299+
args = parser.parse_args(argv)
300+
301+
counters = backfill(
302+
dsn=args.dsn,
303+
table_name=args.table,
304+
wing_filter=args.wing,
305+
skip_palace=args.skip_palace,
306+
skip_entities=args.skip_entities,
307+
extractor_name=args.extractor,
308+
max_entities_per_drawer=args.max_entities,
309+
restart=args.restart,
310+
log_every=args.log_every,
311+
)
312+
print(json.dumps(counters, indent=2))
313+
return 0
314+
315+
316+
if __name__ == "__main__":
317+
sys.exit(main())

mempalace/kg_writethrough.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,23 @@ def hook(*, drawer_id: str, document: str, metadata: dict) -> None:
103103
if not entities:
104104
return
105105
# Cap to bound per-drawer write cost.
106+
# Use add_mention (Drawer)-[:MENTIONS]->(Entity) rather than
107+
# add_triple (Entity-RELATION-Entity) so the drawer keeps its
108+
# :Drawer label and the palace-structure layer (Wing→Room→Drawer)
109+
# connects cleanly to the entity layer.
106110
for ent in entities[:max_entities_per_drawer]:
107111
try:
108-
kg.add_triple(
109-
subject=drawer_id,
110-
relation_type=relation_type,
111-
object_=ent.name,
112+
kg.add_mention(
113+
drawer_id=drawer_id,
114+
entity_name=ent.name,
115+
entity_type=getattr(ent, "type", "unknown"),
116+
count=getattr(ent, "count", 1),
112117
confidence=confidence,
113118
)
114119
except Exception as e: # noqa: BLE001
115120
logger.debug(
116-
"add_triple failed for (%s, %s, %s): %s",
117-
drawer_id, relation_type, ent.name, e,
121+
"add_mention failed for (%s, %s): %s",
122+
drawer_id, ent.name, e,
118123
)
119124

120125
return hook

0 commit comments

Comments
 (0)