Skip to content

Commit 397f750

Browse files
dlkakbsteknium1
authored andcommitted
feat(teams): add pipeline outbound delivery via existing adapter
1 parent a995477 commit 397f750

4 files changed

Lines changed: 378 additions & 1 deletion

File tree

plugins/platforms/teams/adapter.py

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
from __future__ import annotations
2424

2525
import asyncio
26+
import html
2627
import json
2728
import logging
2829
import os
2930
from typing import Any, Dict, Optional
31+
from urllib.parse import quote
32+
33+
import httpx
3034

3135
try:
3236
from aiohttp import web
@@ -93,6 +97,241 @@
9397
_WEBHOOK_PATH = "/api/messages"
9498

9599

100+
def _parse_bool(value: Any, *, default: bool = False) -> bool:
101+
if isinstance(value, bool):
102+
return value
103+
if isinstance(value, str):
104+
normalized = value.strip().lower()
105+
if normalized in {"1", "true", "yes", "on"}:
106+
return True
107+
if normalized in {"0", "false", "no", "off"}:
108+
return False
109+
return default
110+
111+
112+
class _StaticAccessTokenProvider:
113+
"""Minimal token-provider shim so outbound Graph delivery can reuse the shared client."""
114+
115+
def __init__(self, access_token: str):
116+
self._access_token = str(access_token or "").strip()
117+
118+
async def get_access_token(self, *, force_refresh: bool = False) -> str:
119+
del force_refresh
120+
if not self._access_token:
121+
raise ValueError("TEAMS_GRAPH_ACCESS_TOKEN is required for graph delivery mode.")
122+
return self._access_token
123+
124+
def clear_cache(self) -> None:
125+
return None
126+
127+
128+
class TeamsSummaryWriter:
129+
"""Pipeline-facing Teams outbound delivery surface.
130+
131+
This stays inside the existing Teams platform plugin so the meeting-pipeline
132+
PR can reuse one Teams integration surface instead of introducing a second
133+
adapter elsewhere in the gateway core.
134+
"""
135+
136+
def __init__(
137+
self,
138+
platform_config: PlatformConfig | None = None,
139+
*,
140+
graph_client: Any | None = None,
141+
transport: httpx.AsyncBaseTransport | None = None,
142+
) -> None:
143+
self._platform_config = platform_config
144+
self._graph_client = graph_client
145+
self._transport = transport
146+
147+
async def write_summary(
148+
self,
149+
payload: Any,
150+
config: dict[str, Any] | None,
151+
existing_record: Optional[dict[str, Any]] = None,
152+
) -> dict[str, Any]:
153+
merged = self._resolve_delivery_config(config)
154+
if existing_record and not _parse_bool(merged.get("force_resend"), default=False):
155+
return dict(existing_record)
156+
157+
mode = str(merged.get("delivery_mode") or merged.get("mode") or "").strip().lower()
158+
if not mode:
159+
if merged.get("incoming_webhook_url"):
160+
mode = "incoming_webhook"
161+
elif merged.get("chat_id") or (
162+
merged.get("team_id") and (merged.get("channel_id") or merged.get("chat_id"))
163+
):
164+
mode = "graph"
165+
if mode == "incoming_webhook":
166+
return await self._write_summary_via_incoming_webhook(payload, merged)
167+
if mode == "graph":
168+
return await self._write_summary_via_graph(payload, merged)
169+
raise ValueError(
170+
"Teams delivery_mode must be 'incoming_webhook' or 'graph'."
171+
)
172+
173+
def _resolve_delivery_config(self, config: dict[str, Any] | None) -> dict[str, Any]:
174+
merged: dict[str, Any] = {}
175+
platform_cfg = self._platform_config
176+
if platform_cfg is not None:
177+
merged.update(dict(platform_cfg.extra or {}))
178+
if platform_cfg.token and "access_token" not in merged:
179+
merged["access_token"] = platform_cfg.token
180+
if platform_cfg.home_channel:
181+
merged.setdefault("channel_id", platform_cfg.home_channel.chat_id)
182+
merged.update(dict(config or {}))
183+
184+
env_defaults = {
185+
"delivery_mode": os.getenv("TEAMS_DELIVERY_MODE", ""),
186+
"incoming_webhook_url": os.getenv("TEAMS_INCOMING_WEBHOOK_URL", ""),
187+
"access_token": os.getenv("TEAMS_GRAPH_ACCESS_TOKEN", ""),
188+
"team_id": os.getenv("TEAMS_TEAM_ID", ""),
189+
"channel_id": os.getenv("TEAMS_CHANNEL_ID", ""),
190+
"chat_id": os.getenv("TEAMS_CHAT_ID", ""),
191+
}
192+
for key, value in env_defaults.items():
193+
if value and not merged.get(key):
194+
merged[key] = value
195+
return merged
196+
197+
async def _write_summary_via_incoming_webhook(
198+
self,
199+
payload: Any,
200+
config: dict[str, Any],
201+
) -> dict[str, Any]:
202+
webhook_url = str(config.get("incoming_webhook_url") or "").strip()
203+
if not webhook_url:
204+
raise ValueError("TEAMS_INCOMING_WEBHOOK_URL is required for incoming_webhook mode.")
205+
body = {"text": self._render_summary_markdown(payload)}
206+
async with httpx.AsyncClient(timeout=20.0, transport=self._transport) as client:
207+
response = await client.post(webhook_url, json=body)
208+
response.raise_for_status()
209+
return {
210+
"delivery_mode": "incoming_webhook",
211+
"webhook_url": webhook_url,
212+
"status_code": response.status_code,
213+
"delivered": True,
214+
}
215+
216+
async def _write_summary_via_graph(
217+
self,
218+
payload: Any,
219+
config: dict[str, Any],
220+
) -> dict[str, Any]:
221+
graph_client = self._build_graph_client(config)
222+
chat_id = str(config.get("chat_id") or "").strip()
223+
if chat_id:
224+
path = f"/chats/{quote(chat_id, safe='')}/messages"
225+
response = await graph_client.post_json(
226+
path,
227+
json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
228+
)
229+
return {
230+
"delivery_mode": "graph",
231+
"target_type": "chat",
232+
"chat_id": chat_id,
233+
"message_id": (response or {}).get("id"),
234+
"web_url": (response or {}).get("webUrl"),
235+
}
236+
237+
team_id = str(config.get("team_id") or "").strip()
238+
channel_id = str(config.get("channel_id") or "").strip()
239+
if not team_id or not channel_id:
240+
raise ValueError(
241+
"Graph delivery mode requires chat_id, or both team_id and channel_id."
242+
)
243+
path = (
244+
f"/teams/{quote(team_id, safe='')}/channels/"
245+
f"{quote(channel_id, safe='')}/messages"
246+
)
247+
response = await graph_client.post_json(
248+
path,
249+
json_body={"body": {"contentType": "html", "content": self._render_summary_html(payload)}},
250+
)
251+
return {
252+
"delivery_mode": "graph",
253+
"target_type": "channel",
254+
"team_id": team_id,
255+
"channel_id": channel_id,
256+
"message_id": (response or {}).get("id"),
257+
"web_url": (response or {}).get("webUrl"),
258+
}
259+
260+
def _build_graph_client(self, config: dict[str, Any]) -> Any:
261+
if self._graph_client is not None:
262+
return self._graph_client
263+
264+
from tools.microsoft_graph_auth import MicrosoftGraphTokenProvider
265+
from tools.microsoft_graph_client import MicrosoftGraphClient
266+
267+
access_token = str(config.get("access_token") or "").strip()
268+
if access_token:
269+
return MicrosoftGraphClient(
270+
_StaticAccessTokenProvider(access_token),
271+
transport=self._transport,
272+
)
273+
return MicrosoftGraphClient(
274+
MicrosoftGraphTokenProvider.from_env(),
275+
transport=self._transport,
276+
)
277+
278+
def _render_summary_markdown(self, payload: Any) -> str:
279+
lines = [
280+
f"**{self._title(payload)}**",
281+
"",
282+
f"Summary: {self._text(getattr(payload, 'summary', None), 'No summary available.')}",
283+
"",
284+
"Key decisions:",
285+
*self._bullet_lines(getattr(payload, "key_decisions", None)),
286+
"",
287+
"Action items:",
288+
*self._bullet_lines(getattr(payload, "action_items", None)),
289+
"",
290+
"Risks:",
291+
*self._bullet_lines(getattr(payload, "risks", None)),
292+
]
293+
return "\n".join(lines)
294+
295+
def _render_summary_html(self, payload: Any) -> str:
296+
sections = [
297+
("Summary", [self._text(getattr(payload, "summary", None), "No summary available.")]),
298+
("Key decisions", list(getattr(payload, "key_decisions", None) or [])),
299+
("Action items", list(getattr(payload, "action_items", None) or [])),
300+
("Risks", list(getattr(payload, "risks", None) or [])),
301+
]
302+
blocks = [f"<h2>{html.escape(self._title(payload))}</h2>"]
303+
for heading, items in sections:
304+
blocks.append(f"<h3>{html.escape(heading)}</h3>")
305+
if len(items) == 1 and heading == "Summary":
306+
blocks.append(f"<p>{html.escape(str(items[0]))}</p>")
307+
continue
308+
if items:
309+
rendered = "".join(f"<li>{html.escape(str(item))}</li>" for item in items if str(item).strip())
310+
blocks.append(rendered and f"<ul>{rendered}</ul>" or "<p>None</p>")
311+
else:
312+
blocks.append("<p>None</p>")
313+
return "".join(blocks)
314+
315+
@staticmethod
316+
def _title(payload: Any) -> str:
317+
title = getattr(payload, "title", None)
318+
if title:
319+
return str(title)
320+
meeting_ref = getattr(payload, "meeting_ref", None)
321+
meeting_id = getattr(meeting_ref, "meeting_id", None) if meeting_ref else None
322+
return f"Meeting {meeting_id or 'summary'}"
323+
324+
@staticmethod
325+
def _text(value: Any, default: str) -> str:
326+
text = str(value or "").strip()
327+
return text or default
328+
329+
@classmethod
330+
def _bullet_lines(cls, values: Any) -> list[str]:
331+
items = [str(item).strip() for item in (values or []) if str(item).strip()]
332+
return [f"- {item}" for item in items] or ["- None"]
333+
334+
96335
class _AiohttpBridgeAdapter:
97336
"""HttpServerAdapter that bridges the Teams SDK into an aiohttp server.
98337

plugins/teams_pipeline/pipeline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,10 @@ async def _generate_summary_payload(
550550
confidence_notes=parsed.get("confidence_notes"),
551551
notion_target=(self.config.notion or {}).get("database_id"),
552552
linear_target=(self.config.linear or {}).get("team_id"),
553-
teams_target=(self.config.teams_delivery or {}).get("channel_id"),
553+
teams_target=(
554+
(self.config.teams_delivery or {}).get("channel_id")
555+
or (self.config.teams_delivery or {}).get("chat_id")
556+
),
554557
)
555558

556559
async def _write_sinks(self, job: TeamsMeetingPipelineJob, payload: TeamsMeetingSummaryPayload) -> None:

0 commit comments

Comments
 (0)