-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy pathhookspecs.py
More file actions
123 lines (97 loc) · 4.19 KB
/
hookspecs.py
File metadata and controls
123 lines (97 loc) · 4.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Pluggy hook namespace and framework hook specifications."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import pluggy
from republic import AsyncStreamEvents, AsyncTapeStore, TapeContext
from republic.tape import TapeStore
from bub.turn_admission import AdmitDecision, TurnSnapshot
from bub.types import Envelope, MessageHandler, State
if TYPE_CHECKING:
from bub.channels.base import Channel
BUB_HOOK_NAMESPACE = "bub"
hookspec = pluggy.HookspecMarker(BUB_HOOK_NAMESPACE)
hookimpl = pluggy.HookimplMarker(BUB_HOOK_NAMESPACE)
class BubHookSpecs:
"""Hook contract for Bub framework extensions."""
@hookspec(firstresult=True)
def resolve_session(self, message: Envelope) -> str:
"""Resolve session id for one inbound message."""
raise NotImplementedError
@hookspec(firstresult=True)
def build_prompt(self, message: Envelope, session_id: str, state: State) -> str | list[dict]:
"""Build model prompt for this turn.
Returns either a plain text string or a list of content parts
(OpenAI multimodal format) when media attachments are present.
"""
raise NotImplementedError
@hookspec(firstresult=True)
def run_model(self, prompt: str | list[dict], session_id: str, state: State) -> str:
"""Run model for one turn and return plain text output. Should not be implemented if `run_model_stream` is implemented."""
raise NotImplementedError
@hookspec(firstresult=True)
def run_model_stream(self, prompt: str | list[dict], session_id: str, state: State) -> AsyncStreamEvents:
"""Run model for one turn and return a stream of events. Should not be implemented if `run_model` is implemented."""
raise NotImplementedError
@hookspec
def load_state(self, message: Envelope, session_id: str) -> State:
"""Load state snapshot for one session."""
raise NotImplementedError
@hookspec
def save_state(
self,
session_id: str,
state: State,
message: Envelope,
model_output: str,
) -> None:
"""Persist state updates after one model turn."""
@hookspec
def render_outbound(
self,
message: Envelope,
session_id: str,
state: State,
model_output: str,
) -> list[Envelope]:
"""Render outbound messages from model output."""
raise NotImplementedError
@hookspec
def dispatch_outbound(self, message: Envelope) -> bool:
"""Dispatch one outbound message to external channel(s)."""
raise NotImplementedError
@hookspec
def register_cli_commands(self, app: Any) -> None:
"""Register CLI commands onto the root Typer application."""
@hookspec
def onboard_config(self, current_config: dict[str, Any]) -> dict[str, Any] | None:
"""Collect a plugin config fragment for the interactive onboarding command."""
@hookspec
def on_error(self, stage: str, error: Exception, message: Envelope | None) -> None:
"""Observe framework errors from any stage."""
@hookspec
def system_prompt(self, prompt: str | list[dict], state: State) -> str:
"""Provide a system prompt to be prepended to all model prompts."""
raise NotImplementedError
@hookspec(firstresult=True)
def provide_tape_store(self) -> TapeStore | AsyncTapeStore:
"""Provide a tape store instance for Bub's conversation recording feature."""
...
@hookspec
def provide_channels(self, message_handler: MessageHandler) -> list[Channel]:
"""Provide a list of channels for receiving messages."""
raise NotImplementedError
@hookspec(firstresult=True)
def build_tape_context(self) -> TapeContext:
"""Build a tape context for the current session, to be used to build context messages."""
raise NotImplementedError
@hookspec(firstresult=True)
def admit_message(
self,
session_id: str,
message: Envelope,
turn: TurnSnapshot,
) -> AdmitDecision | None:
"""Decide how to handle an inbound channel message for a session.
Return ``None`` to keep Bub's default concurrent scheduling behavior.
"""
raise NotImplementedError