钩子配方
本页汇总最常用钩子模式的可运行配方。每个配方都基于 src/bub/hookspecs.py 中的真实签名。完整钩子契约 —— 参数类型、同步/异步规则、firstresult 语义 —— 见钩子参考。
- 一个已经接入
bub入口点组的插件包(见插件)。 - 模块中可用的
from bub import hookimpl标记。
钩子是声明在 BubHookSpecs 上的具名扩展点;插件贡献的实现会在轮次中被 Bub 调用。
下面多个 recipe 使用插件类。请通过 pkg.plugin:MyPlugin 这样的 callable entry point 注册类,或者创建模块级实例并让 entry point 指向该实例。裸模块目标只会注册模块级 @hookimpl 函数。
1. 覆盖提示
Section titled “1. 覆盖提示”替换 Bub 发给模型的文本。build_prompt 是 firstresult,优先级最高的实现胜出。
from bub import hookimpl
class PrefixPrompt:
@hookimpl
def build_prompt(self, message, session_id, state):
content = message["content"] if isinstance(message, dict) else message.content
return f"[prefix] {content}"
若要直接观察该行为,请配套一个返回 prompt 的 echo run_model hook,或在测试中检查 prompt。普通 uv run bub run "hello" 打印的是最终 outbound 文本,不是发给模型的原始 prompt。
2. 替换模型
Section titled “2. 替换模型”实现 run_model 把模型阶段换成任意返回字符串的可调用对象。下面的形态镜像 bub-codex,它把模型委派给另一个代理 CLI:
import asyncio
from bub import hookimpl
from bub.types import State
@hookimpl
async def run_model(prompt: str, session_id: str, state: State) -> str:
proc = await asyncio.create_subprocess_exec(
"codex", "e", prompt,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
return stdout.decode()
3. 包裹父级流
Section titled “3. 包裹父级流”如果你想观察或转换现有模型流而不替换它,实现 run_model_stream,并用 pluggy 的 subset_hook_caller 在剔除自身之后调用钩子链:
from republic import AsyncStreamEvents
from bub import BubFramework, hookimpl
class StreamTap:
def __init__(self, framework: BubFramework) -> None:
self.framework = framework
@hookimpl
async def run_model_stream(self, prompt, session_id, state):
parent = self.framework._plugin_manager.subset_hook_caller(
"run_model_stream", remove_plugins=[self]
)
upstream = await parent(prompt=prompt, session_id=session_id, state=state)
if upstream is None:
raise RuntimeError("no parent run_model_stream implementation")
async def iterator():
async for event in upstream:
if event.kind == "text":
print(event.data.get("delta", ""), end="")
yield event
return AsyncStreamEvents(iterator(), state=upstream._state)
适用于日志、脱敏或遥测,而不必重新实现底层模型调用。
4. 跨轮次持久化状态
Section titled “4. 跨轮次持久化状态”load_state 在每轮开始运行;save_state 在产生模型输出之后的 finally 中运行。两者配合可以让你把每会话数据带到下一轮,并安全地拆解。lifespan 交接是经典案例:
from bub import hookimpl
class SessionStorePlugin:
def __init__(self) -> None:
self._sessions: dict[str, list[dict[str, object]]] = {}
@hookimpl
def load_state(self, message, session_id):
return {"history": self._sessions.get(session_id, [])}
@hookimpl
def save_state(self, session_id, state, message, model_output):
history = state.get("history", [])
history.append({"in": message, "out": model_output})
self._sessions[session_id] = history
state 字典是贯穿每个阶段的每轮工作区。它与录制的磁带的关系见磁带与上下文。
5. 提供磁带库
Section titled “5. 提供磁带库”磁带是被记录的对话线索。provide_tape_store 是 firstresult,在框架运行时作用域开启时解析。返回一个库实例,或 yield 一个用于 setup/teardown。
from bub import hookimpl
@hookimpl
def provide_tape_store():
from bub_tapestore_sqlite.store import SQLiteTapeStore
return SQLiteTapeStore(path="/var/lib/bub/tapes.sqlite3")
完整插件位于 bub-tapestore-sqlite。需要清理的库可改为返回生成器 —— Bub 会以上下文管理器对待它。
6. 新增通道
Section titled “6. 新增通道”通道是一个收发端 —— CLI、Telegram、微信、定时触发器。provide_channels 让插件贡献一个或多个 Channel 子类。
最小要求:设置 name、实现 start 与 stop,可选实现 send 或 stream_events:
import asyncio
from bub import hookimpl
from bub.channels import Channel
from bub.types import MessageHandler
class WebhookChannel(Channel):
name = "webhook"
def __init__(self, message_handler: MessageHandler) -> None:
self._handler = message_handler
self._task: asyncio.Task | None = None
async def start(self, stop_event: asyncio.Event) -> None:
# 绑定你的传输(HTTP 服务器、消息队列等)
await stop_event.wait()
async def stop(self) -> None:
if self._task:
self._task.cancel()
class WebhookPlugin:
@hookimpl
def provide_channels(self, message_handler):
return [WebhookChannel(message_handler)]
bub-wechat 是自定义通道的范本:实现入站接收和出站 send,并暴露一个模型可调用的 wechat 工具。
provide_channels 也是承载非聊天背景服务的合适钩子。visual-base 的 EyeChannel 用它来托管一个 FFmpeg 监督进程和屏幕理解循环 —— send 是空操作,但 start/stop 给了服务一条与 bub gateway 绑定的生命周期。
7. 注册 CLI 命令
Section titled “7. 注册 CLI 命令”register_cli_commands 在 CLI 启动时被调用一次,参数是根 typer.Typer 应用。直接在上面追加命令或子应用:
import typer
from bub import hookimpl
class HelloPlugin:
@hookimpl
def register_cli_commands(self, app: typer.Typer) -> None:
@app.command()
def hello(name: str = "world") -> None:
"""Say hello."""
typer.echo(f"hello, {name}")
运行 uv run bub hello --name bub 触发它。该钩子是同步运行的 —— 异步函数会被跳过并发出告警。
8. 参与引导
Section titled “8. 参与引导”onboard_config(current_config) 让插件向交互式 bub onboard 流程贡献字段。每个实现都会拿到累积到目前的字典,并返回要合入的片段。
from bub import hookimpl
class WeatherPlugin:
@hookimpl
def onboard_config(self, current_config):
existing = current_config.get("weather", {})
return {
"weather": {
"api_key": existing.get("api_key", ""),
"enabled": True,
}
}
返回 None 表示跳过且不修改累积配置。返回非 dict 会抛 TypeError 并中止引导。运行 uv run bub onboard 并查看保存的配置文件来验证。