Skip to content

Commit 2a215de

Browse files
dlkakbsteknium1
authored andcommitted
fix(msgraph): bound webhook receipt dedupe cache
1 parent 46a6f39 commit 2a215de

2 files changed

Lines changed: 56 additions & 2 deletions

File tree

gateway/platforms/msgraph_webhook.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import asyncio
66
import json
77
import logging
8+
from collections import deque
89
from hashlib import sha1
910
from typing import Any, Awaitable, Callable, Dict, Optional
1011

@@ -29,6 +30,7 @@
2930
DEFAULT_HOST = "0.0.0.0"
3031
DEFAULT_PORT = 8646
3132
DEFAULT_WEBHOOK_PATH = "/msgraph/webhook"
33+
DEFAULT_MAX_SEEN_RECEIPTS = 5000
3234
NotificationScheduler = Callable[[Dict[str, Any], MessageEvent], Awaitable[None] | None]
3335

3436

@@ -55,9 +57,13 @@ def __init__(self, config: PlatformConfig):
5557
if str(value).strip()
5658
]
5759
self._client_state: Optional[str] = self._string_or_none(extra.get("client_state"))
60+
self._max_seen_receipts = max(
61+
1, int(extra.get("max_seen_receipts", DEFAULT_MAX_SEEN_RECEIPTS))
62+
)
5863
self._runner = None
5964
self._notification_scheduler: Optional[NotificationScheduler] = None
6065
self._seen_receipts: set[str] = set()
66+
self._seen_receipt_order: deque[str] = deque()
6167
self._accepted_count = 0
6268
self._duplicate_count = 0
6369

@@ -172,10 +178,10 @@ async def _handle_notification(self, request: "web.Request") -> "web.Response":
172178
continue
173179

174180
receipt_key = self._build_receipt_key(notification)
175-
if receipt_key in self._seen_receipts:
181+
if self._has_seen_receipt(receipt_key):
176182
duplicates += 1
177183
continue
178-
self._seen_receipts.add(receipt_key)
184+
self._remember_receipt(receipt_key)
179185

180186
accepted += 1
181187
scheduled += 1
@@ -213,6 +219,16 @@ def _verify_client_state(self, notification: Dict[str, Any]) -> bool:
213219
provided = self._string_or_none(notification.get("clientState"))
214220
return provided == expected
215221

222+
def _has_seen_receipt(self, receipt_key: str) -> bool:
223+
return receipt_key in self._seen_receipts
224+
225+
def _remember_receipt(self, receipt_key: str) -> None:
226+
self._seen_receipts.add(receipt_key)
227+
self._seen_receipt_order.append(receipt_key)
228+
while len(self._seen_receipt_order) > self._max_seen_receipts:
229+
oldest = self._seen_receipt_order.popleft()
230+
self._seen_receipts.discard(oldest)
231+
216232
def _build_message_event(
217233
self,
218234
notification: Dict[str, Any],

tests/gateway/test_msgraph_webhook.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,41 @@ async def _capture(notification, event):
185185
await asyncio.sleep(0.05)
186186

187187
assert len(scheduled) == 1
188+
189+
@pytest.mark.anyio
190+
async def test_seen_receipts_are_bounded(self):
191+
adapter = _make_adapter(max_seen_receipts=2)
192+
193+
async def _capture(notification, event):
194+
return None
195+
196+
adapter.set_notification_scheduler(_capture)
197+
198+
async def _post(notification_id: str):
199+
payload = {
200+
"value": [
201+
{
202+
"id": notification_id,
203+
"subscriptionId": "sub-1",
204+
"changeType": "updated",
205+
"resource": "communications/onlineMeetings/meeting-3",
206+
"clientState": "expected-client-state",
207+
}
208+
]
209+
}
210+
return await adapter._handle_notification(_FakeRequest(json_payload=payload))
211+
212+
first = await _post("notif-a")
213+
second = await _post("notif-b")
214+
third = await _post("notif-c")
215+
216+
assert first.status == 202
217+
assert second.status == 202
218+
assert third.status == 202
219+
assert len(adapter._seen_receipts) == 2
220+
assert list(adapter._seen_receipt_order) == ["id:notif-b", "id:notif-c"]
221+
222+
replay = await _post("notif-a")
223+
replay_data = json.loads(replay.text)
224+
assert replay_data["accepted"] == 1
225+
assert replay_data["duplicates"] == 0

0 commit comments

Comments
 (0)