Skip to content

Commit c49599c

Browse files
committed
Fix dynamic NIXL store secondary-lookup byte overcount race
1 parent 4cb34c4 commit c49599c

2 files changed

Lines changed: 48 additions & 3 deletions

File tree

lmcache/v1/distributed/l2_adapters/nixl_store_dynamic_l2_adapter.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -571,9 +571,12 @@ async def _execute_store_in_the_loop(
571571
pin_count=1,
572572
)
573573
with self._lock:
574-
self._memory_objects[key] = store_obj
575-
self._total_bytes += store_obj.size
576-
store_obj.decrease_pin_count()
574+
# Re-check in case lookup/recovery populated this key
575+
# while we awaited dynamic_store_file.
576+
if key not in self._memory_objects:
577+
self._memory_objects[key] = store_obj
578+
self._total_bytes += store_obj.size
579+
store_obj.decrease_pin_count()
577580
stored_keys.append(key)
578581

579582
except Exception:

tests/v1/distributed/test_nixl_store_dynamic_l2_adapter.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""
88

99
# Standard
10+
import asyncio
1011
import os
1112
import select
1213
import shutil
@@ -561,6 +562,47 @@ def test_get_usage_decreases_after_delete(self, adapter):
561562

562563
assert usage_after < usage_before
563564

565+
def test_store_secondary_lookup_race_does_not_overcount_usage(
566+
self, adapter_with_persist
567+
):
568+
"""Store+recover race should not leave stale bytes after delete."""
569+
adpt, buf, _, _, _ = adapter_with_persist
570+
key = create_object_key(1001)
571+
obj = create_memory_obj(buf, page_index=0)
572+
lookup_tasks: list[int] = []
573+
574+
async def _dynamic_store_with_lookup(mem_indices, file_path, page_size):
575+
file_size = len(mem_indices) * page_size
576+
with open(file_path, "wb") as f:
577+
f.truncate(file_size)
578+
lookup_tasks.append(adpt.submit_lookup_and_lock_task([key]))
579+
await asyncio.sleep(0)
580+
581+
try:
582+
adpt.nixl_agent.dynamic_store_file = _dynamic_store_with_lookup
583+
adpt.submit_store_task([key], [obj])
584+
wait_for_event_fd(adpt.get_store_event_fd())
585+
wait_for_event_fd(adpt.get_lookup_and_lock_event_fd())
586+
adpt.pop_completed_store_tasks()
587+
588+
bitmap = adpt.query_lookup_and_lock_result(lookup_tasks[0])
589+
assert bitmap is not None
590+
assert bitmap.test(0)
591+
592+
adpt.submit_unlock([key])
593+
sync_task = adpt.submit_lookup_and_lock_task([])
594+
wait_for_event_fd(adpt.get_lookup_and_lock_event_fd())
595+
adpt.query_lookup_and_lock_result(sync_task)
596+
597+
usage_before_delete, _ = adpt.get_usage()
598+
adpt.delete([key])
599+
usage_after_delete, _ = adpt.get_usage()
600+
601+
assert usage_before_delete > 0.0
602+
assert usage_after_delete == 0.0
603+
finally:
604+
adpt.close()
605+
564606
def test_store_rejected_when_capacity_exceeded(self):
565607
"""Store should stop when max capacity is reached."""
566608
tmp_dir = tempfile.mkdtemp(prefix="nixl_dyn_cap_test_")

0 commit comments

Comments
 (0)