Skip to content

Commit 46a6f39

Browse files
dlkakbsteknium1
authored andcommitted
feat(msgraph): add webhook listener platform
1 parent f209a35 commit 46a6f39

5 files changed

Lines changed: 530 additions & 1 deletion

File tree

gateway/config.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class Platform(Enum):
101101
DINGTALK = "dingtalk"
102102
API_SERVER = "api_server"
103103
WEBHOOK = "webhook"
104+
MSGRAPH_WEBHOOK = "msgraph_webhook"
104105
FEISHU = "feishu"
105106
WECOM = "wecom"
106107
WECOM_CALLBACK = "wecom_callback"
@@ -376,6 +377,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "StreamingConfig":
376377
Platform.SMS: lambda cfg: bool(os.getenv("TWILIO_ACCOUNT_SID")),
377378
Platform.API_SERVER: lambda cfg: True,
378379
Platform.WEBHOOK: lambda cfg: True,
380+
Platform.MSGRAPH_WEBHOOK: lambda cfg: True,
379381
Platform.FEISHU: lambda cfg: bool(cfg.extra.get("app_id")),
380382
Platform.WECOM: lambda cfg: bool(cfg.extra.get("bot_id")),
381383
Platform.WECOM_CALLBACK: lambda cfg: bool(
@@ -1407,6 +1409,48 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
14071409
if webhook_secret:
14081410
config.platforms[Platform.WEBHOOK].extra["secret"] = webhook_secret
14091411

1412+
# Microsoft Graph webhook platform
1413+
msgraph_webhook_enabled = os.getenv("MSGRAPH_WEBHOOK_ENABLED", "").lower() in (
1414+
"true",
1415+
"1",
1416+
"yes",
1417+
)
1418+
msgraph_webhook_port = os.getenv("MSGRAPH_WEBHOOK_PORT")
1419+
msgraph_webhook_client_state = os.getenv("MSGRAPH_WEBHOOK_CLIENT_STATE", "")
1420+
msgraph_webhook_resources = os.getenv("MSGRAPH_WEBHOOK_ACCEPTED_RESOURCES", "")
1421+
if (
1422+
msgraph_webhook_enabled
1423+
or Platform.MSGRAPH_WEBHOOK in config.platforms
1424+
or msgraph_webhook_port
1425+
or msgraph_webhook_client_state
1426+
or msgraph_webhook_resources
1427+
):
1428+
if Platform.MSGRAPH_WEBHOOK not in config.platforms:
1429+
config.platforms[Platform.MSGRAPH_WEBHOOK] = PlatformConfig()
1430+
if msgraph_webhook_enabled:
1431+
config.platforms[Platform.MSGRAPH_WEBHOOK].enabled = True
1432+
if msgraph_webhook_port:
1433+
try:
1434+
config.platforms[Platform.MSGRAPH_WEBHOOK].extra["port"] = int(
1435+
msgraph_webhook_port
1436+
)
1437+
except ValueError:
1438+
pass
1439+
if msgraph_webhook_client_state:
1440+
config.platforms[Platform.MSGRAPH_WEBHOOK].extra["client_state"] = (
1441+
msgraph_webhook_client_state
1442+
)
1443+
if msgraph_webhook_resources:
1444+
resources = [
1445+
resource.strip()
1446+
for resource in msgraph_webhook_resources.split(",")
1447+
if resource.strip()
1448+
]
1449+
if resources:
1450+
config.platforms[Platform.MSGRAPH_WEBHOOK].extra[
1451+
"accepted_resources"
1452+
] = resources
1453+
14101454
# DingTalk
14111455
dingtalk_client_id = os.getenv("DINGTALK_CLIENT_ID")
14121456
dingtalk_client_secret = os.getenv("DINGTALK_CLIENT_SECRET")
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
"""Microsoft Graph webhook adapter for change-notification ingress."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import json
7+
import logging
8+
from hashlib import sha1
9+
from typing import Any, Awaitable, Callable, Dict, Optional
10+
11+
try:
12+
from aiohttp import web
13+
14+
AIOHTTP_AVAILABLE = True
15+
except ImportError:
16+
AIOHTTP_AVAILABLE = False
17+
web = None # type: ignore[assignment]
18+
19+
from gateway.config import Platform, PlatformConfig
20+
from gateway.platforms.base import (
21+
BasePlatformAdapter,
22+
MessageEvent,
23+
MessageType,
24+
SendResult,
25+
)
26+
27+
logger = logging.getLogger(__name__)
28+
29+
DEFAULT_HOST = "0.0.0.0"
30+
DEFAULT_PORT = 8646
31+
DEFAULT_WEBHOOK_PATH = "/msgraph/webhook"
32+
NotificationScheduler = Callable[[Dict[str, Any], MessageEvent], Awaitable[None] | None]
33+
34+
35+
def check_msgraph_webhook_requirements() -> bool:
36+
"""Return whether required webhook dependencies are available."""
37+
return AIOHTTP_AVAILABLE
38+
39+
40+
class MSGraphWebhookAdapter(BasePlatformAdapter):
41+
"""Receive Microsoft Graph change notifications and surface them internally."""
42+
43+
def __init__(self, config: PlatformConfig):
44+
super().__init__(config, Platform.MSGRAPH_WEBHOOK)
45+
extra = config.extra or {}
46+
self._host: str = str(extra.get("host", DEFAULT_HOST))
47+
self._port: int = int(extra.get("port", DEFAULT_PORT))
48+
self._webhook_path: str = self._normalize_path(
49+
extra.get("webhook_path", DEFAULT_WEBHOOK_PATH)
50+
)
51+
self._health_path: str = self._normalize_path(extra.get("health_path", "/health"))
52+
self._accepted_resources: list[str] = [
53+
str(value).strip()
54+
for value in (extra.get("accepted_resources") or [])
55+
if str(value).strip()
56+
]
57+
self._client_state: Optional[str] = self._string_or_none(extra.get("client_state"))
58+
self._runner = None
59+
self._notification_scheduler: Optional[NotificationScheduler] = None
60+
self._seen_receipts: set[str] = set()
61+
self._accepted_count = 0
62+
self._duplicate_count = 0
63+
64+
@staticmethod
65+
def _string_or_none(value: Any) -> Optional[str]:
66+
if value is None:
67+
return None
68+
text = str(value).strip()
69+
return text or None
70+
71+
@staticmethod
72+
def _normalize_path(path: Any) -> str:
73+
raw = str(path or "").strip() or "/"
74+
return raw if raw.startswith("/") else f"/{raw}"
75+
76+
@staticmethod
77+
def _build_receipt_key(notification: Dict[str, Any]) -> str:
78+
explicit_id = str(notification.get("id") or "").strip()
79+
if explicit_id:
80+
return f"id:{explicit_id}"
81+
payload = "|".join(
82+
[
83+
str(notification.get("subscriptionId") or ""),
84+
str(notification.get("changeType") or ""),
85+
str(notification.get("resource") or ""),
86+
json.dumps(notification.get("resourceData") or {}, sort_keys=True),
87+
]
88+
)
89+
return f"sha1:{sha1(payload.encode('utf-8')).hexdigest()}"
90+
91+
def set_notification_scheduler(self, scheduler: Optional[NotificationScheduler]) -> None:
92+
self._notification_scheduler = scheduler
93+
94+
async def connect(self) -> bool:
95+
app = web.Application()
96+
app.router.add_get(self._health_path, self._handle_health)
97+
app.router.add_get(self._webhook_path, self._handle_notification)
98+
app.router.add_post(self._webhook_path, self._handle_notification)
99+
100+
self._runner = web.AppRunner(app)
101+
await self._runner.setup()
102+
site = web.TCPSite(self._runner, self._host, self._port)
103+
await site.start()
104+
self._mark_connected()
105+
logger.info(
106+
"[msgraph_webhook] Listening on %s:%d%s",
107+
self._host,
108+
self._port,
109+
self._webhook_path,
110+
)
111+
return True
112+
113+
async def disconnect(self) -> None:
114+
if self._runner is not None:
115+
await self._runner.cleanup()
116+
self._runner = None
117+
self._mark_disconnected()
118+
119+
async def send(
120+
self,
121+
chat_id: str,
122+
content: str,
123+
reply_to: Optional[str] = None,
124+
metadata: Optional[Dict[str, Any]] = None,
125+
) -> SendResult:
126+
logger.info("[msgraph_webhook] Response for %s: %s", chat_id, content[:200])
127+
return SendResult(success=True)
128+
129+
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
130+
return {"name": chat_id, "type": "webhook"}
131+
132+
async def _handle_health(self, request: "web.Request") -> "web.Response":
133+
return web.json_response(
134+
{
135+
"status": "ok",
136+
"platform": self.platform.value,
137+
"webhook_path": self._webhook_path,
138+
"accepted": self._accepted_count,
139+
"duplicates": self._duplicate_count,
140+
}
141+
)
142+
143+
async def _handle_notification(self, request: "web.Request") -> "web.Response":
144+
validation_token = request.query.get("validationToken", "")
145+
if validation_token:
146+
return web.Response(text=validation_token, content_type="text/plain")
147+
148+
try:
149+
body = await request.json()
150+
except Exception:
151+
return web.json_response({"error": "Invalid JSON body"}, status=400)
152+
153+
notifications = body.get("value")
154+
if not isinstance(notifications, list):
155+
return web.json_response({"error": "Missing notification batch"}, status=400)
156+
157+
accepted = 0
158+
duplicates = 0
159+
rejected = 0
160+
scheduled = 0
161+
162+
for raw_notification in notifications:
163+
if not isinstance(raw_notification, dict):
164+
rejected += 1
165+
continue
166+
notification = dict(raw_notification)
167+
if not self._resource_accepted(str(notification.get("resource") or "")):
168+
rejected += 1
169+
continue
170+
if not self._verify_client_state(notification):
171+
rejected += 1
172+
continue
173+
174+
receipt_key = self._build_receipt_key(notification)
175+
if receipt_key in self._seen_receipts:
176+
duplicates += 1
177+
continue
178+
self._seen_receipts.add(receipt_key)
179+
180+
accepted += 1
181+
scheduled += 1
182+
self._accepted_count += 1
183+
event = self._build_message_event(notification, receipt_key)
184+
self._schedule_notification(notification, event)
185+
186+
self._duplicate_count += duplicates
187+
status = 202 if accepted or duplicates else 403
188+
return web.json_response(
189+
{
190+
"status": "accepted" if accepted or duplicates else "rejected",
191+
"accepted": accepted,
192+
"duplicates": duplicates,
193+
"rejected": rejected,
194+
"scheduled": scheduled,
195+
},
196+
status=status,
197+
)
198+
199+
def _resource_accepted(self, resource: str) -> bool:
200+
if not self._accepted_resources:
201+
return True
202+
for pattern in self._accepted_resources:
203+
if pattern.endswith("*") and resource.startswith(pattern[:-1]):
204+
return True
205+
if resource == pattern or resource.startswith(f"{pattern}/"):
206+
return True
207+
return False
208+
209+
def _verify_client_state(self, notification: Dict[str, Any]) -> bool:
210+
expected = self._client_state
211+
if expected is None:
212+
return True
213+
provided = self._string_or_none(notification.get("clientState"))
214+
return provided == expected
215+
216+
def _build_message_event(
217+
self,
218+
notification: Dict[str, Any],
219+
receipt_key: str,
220+
) -> MessageEvent:
221+
source = self.build_source(
222+
chat_id=f"msgraph:{notification.get('subscriptionId', 'unknown')}",
223+
chat_name="msgraph/webhook",
224+
chat_type="webhook",
225+
user_id="msgraph",
226+
user_name="Microsoft Graph",
227+
)
228+
return MessageEvent(
229+
text=self._render_prompt(notification),
230+
message_type=MessageType.TEXT,
231+
source=source,
232+
raw_message=notification,
233+
message_id=receipt_key,
234+
internal=True,
235+
)
236+
237+
def _render_prompt(self, notification: Dict[str, Any]) -> str:
238+
template = self.config.extra.get("prompt", "")
239+
if template:
240+
payload = {
241+
"notification": notification,
242+
"resource": notification.get("resource", ""),
243+
"change_type": notification.get("changeType", ""),
244+
"subscription_id": notification.get("subscriptionId", ""),
245+
}
246+
return self._render_template(template, payload)
247+
rendered = json.dumps(notification, indent=2, sort_keys=True)[:4000]
248+
return f"Microsoft Graph change notification:\n\n```json\n{rendered}\n```"
249+
250+
def _render_template(self, template: str, payload: Dict[str, Any]) -> str:
251+
import re
252+
253+
def _resolve(match: "re.Match[str]") -> str:
254+
key = match.group(1)
255+
value: Any = payload
256+
for part in key.split("."):
257+
if isinstance(value, dict):
258+
value = value.get(part, f"{{{key}}}")
259+
else:
260+
return f"{{{key}}}"
261+
if isinstance(value, (dict, list)):
262+
return json.dumps(value, sort_keys=True)[:2000]
263+
return str(value)
264+
265+
return re.sub(r"\{([a-zA-Z0-9_.]+)\}", _resolve, template)
266+
267+
def _schedule_notification(
268+
self,
269+
notification: Dict[str, Any],
270+
event: MessageEvent,
271+
) -> None:
272+
scheduler = self._notification_scheduler
273+
if scheduler is not None:
274+
result = scheduler(notification, event)
275+
if asyncio.iscoroutine(result):
276+
task = asyncio.create_task(result)
277+
self._background_tasks.add(task)
278+
task.add_done_callback(self._background_tasks.discard)
279+
return
280+
281+
task = asyncio.create_task(self.handle_message(event))
282+
self._background_tasks.add(task)
283+
task.add_done_callback(self._background_tasks.discard)

gateway/run.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4600,6 +4600,16 @@ def _create_adapter(
46004600
adapter.gateway_runner = self # For cross-platform delivery
46014601
return adapter
46024602

4603+
elif platform == Platform.MSGRAPH_WEBHOOK:
4604+
from gateway.platforms.msgraph_webhook import (
4605+
MSGraphWebhookAdapter,
4606+
check_msgraph_webhook_requirements,
4607+
)
4608+
if not check_msgraph_webhook_requirements():
4609+
logger.warning("MSGraph webhook: aiohttp not installed")
4610+
return None
4611+
return MSGraphWebhookAdapter(config)
4612+
46034613
elif platform == Platform.BLUEBUBBLES:
46044614
from gateway.platforms.bluebubbles import BlueBubblesAdapter, check_bluebubbles_requirements
46054615
if not check_bluebubbles_requirements():

0 commit comments

Comments
 (0)