跳转到内容

钩子配方

本页汇总最常用钩子模式的可运行配方。每个配方都基于 src/bub/hookspecs.py 中的真实签名。完整钩子契约 —— 参数类型、同步/异步规则、firstresult 语义 —— 见钩子参考

  • 一个已经接入 bub 入口点组的插件包(见插件)。
  • 模块中可用的 from bub import hookimpl 标记。

钩子是声明在 BubHookSpecs 上的具名扩展点;插件贡献的实现会在轮次中被 Bub 调用。

下面多个 recipe 使用插件类。请通过 pkg.plugin:MyPlugin 这样的 callable entry point 注册类,或者创建模块级实例并让 entry point 指向该实例。裸模块目标只会注册模块级 @hookimpl 函数。

替换 Bub 发给模型的文本。build_promptfirstresult,优先级最高的实现胜出。

from bub import hookimpl


class PrefixPrompt:
    @hookimpl
    def build_prompt(self, message, session_id, state):
        content = message["content"] if isinstance(message, dict) else message.content
        return f"[prefix] {content}"

若要直接观察该行为,请配套一个返回 prompt 的 echo run_model hook,或在测试中检查 prompt。普通 uv run bub run "hello" 打印的是最终 outbound 文本,不是发给模型的原始 prompt。

实现 run_model 把模型阶段换成任意返回字符串的可调用对象。下面的形态镜像 bub-codex,它把模型委派给另一个代理 CLI:

import asyncio

from bub import hookimpl
from bub.types import State


@hookimpl
async def run_model(prompt: str, session_id: str, state: State) -> str:
    proc = await asyncio.create_subprocess_exec(
        "codex", "e", prompt,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, _ = await proc.communicate()
    return stdout.decode()

如果你想观察或转换现有模型流而不替换它,实现 run_model_stream,并用 pluggy 的 subset_hook_caller剔除自身之后调用钩子链:

from republic import AsyncStreamEvents

from bub import BubFramework, hookimpl


class StreamTap:
    def __init__(self, framework: BubFramework) -> None:
        self.framework = framework

    @hookimpl
    async def run_model_stream(self, prompt, session_id, state):
        parent = self.framework._plugin_manager.subset_hook_caller(
            "run_model_stream", remove_plugins=[self]
        )
        upstream = await parent(prompt=prompt, session_id=session_id, state=state)
        if upstream is None:
            raise RuntimeError("no parent run_model_stream implementation")

        async def iterator():
            async for event in upstream:
                if event.kind == "text":
                    print(event.data.get("delta", ""), end="")
                yield event

        return AsyncStreamEvents(iterator(), state=upstream._state)

适用于日志、脱敏或遥测,而不必重新实现底层模型调用。

load_state 在每轮开始运行;save_state 在产生模型输出之后的 finally 中运行。两者配合可以让你把每会话数据带到下一轮,并安全地拆解。lifespan 交接是经典案例:

from bub import hookimpl


class SessionStorePlugin:
    def __init__(self) -> None:
        self._sessions: dict[str, list[dict[str, object]]] = {}

    @hookimpl
    def load_state(self, message, session_id):
        return {"history": self._sessions.get(session_id, [])}

    @hookimpl
    def save_state(self, session_id, state, message, model_output):
        history = state.get("history", [])
        history.append({"in": message, "out": model_output})
        self._sessions[session_id] = history

state 字典是贯穿每个阶段的每轮工作区。它与录制的磁带的关系见磁带与上下文

磁带是被记录的对话线索。provide_tape_storefirstresult,在框架运行时作用域开启时解析。返回一个库实例,或 yield 一个用于 setup/teardown。

from bub import hookimpl


@hookimpl
def provide_tape_store():
    from bub_tapestore_sqlite.store import SQLiteTapeStore

    return SQLiteTapeStore(path="/var/lib/bub/tapes.sqlite3")

完整插件位于 bub-tapestore-sqlite。需要清理的库可改为返回生成器 —— Bub 会以上下文管理器对待它。

通道是一个收发端 —— CLI、Telegram、微信、定时触发器。provide_channels 让插件贡献一个或多个 Channel 子类。

最小要求:设置 name、实现 startstop,可选实现 sendstream_events

import asyncio

from bub import hookimpl
from bub.channels import Channel
from bub.types import MessageHandler


class WebhookChannel(Channel):
    name = "webhook"

    def __init__(self, message_handler: MessageHandler) -> None:
        self._handler = message_handler
        self._task: asyncio.Task | None = None

    async def start(self, stop_event: asyncio.Event) -> None:
        # 绑定你的传输(HTTP 服务器、消息队列等)
        await stop_event.wait()

    async def stop(self) -> None:
        if self._task:
            self._task.cancel()


class WebhookPlugin:
    @hookimpl
    def provide_channels(self, message_handler):
        return [WebhookChannel(message_handler)]

bub-wechat 是自定义通道的范本:实现入站接收和出站 send,并暴露一个模型可调用的 wechat 工具。

provide_channels 也是承载非聊天背景服务的合适钩子。visual-base 的 EyeChannel 用它来托管一个 FFmpeg 监督进程和屏幕理解循环 —— send 是空操作,但 start/stop 给了服务一条与 bub gateway 绑定的生命周期。

register_cli_commands 在 CLI 启动时被调用一次,参数是根 typer.Typer 应用。直接在上面追加命令或子应用:

import typer

from bub import hookimpl


class HelloPlugin:
    @hookimpl
    def register_cli_commands(self, app: typer.Typer) -> None:
        @app.command()
        def hello(name: str = "world") -> None:
            """Say hello."""
            typer.echo(f"hello, {name}")

运行 uv run bub hello --name bub 触发它。该钩子是同步运行的 —— 异步函数会被跳过并发出告警。

onboard_config(current_config) 让插件向交互式 bub onboard 流程贡献字段。每个实现都会拿到累积到目前的字典,并返回要合入的片段。

from bub import hookimpl


class WeatherPlugin:
    @hookimpl
    def onboard_config(self, current_config):
        existing = current_config.get("weather", {})
        return {
            "weather": {
                "api_key": existing.get("api_key", ""),
                "enabled": True,
            }
        }

返回 None 表示跳过且不修改累积配置。返回非 dict 会抛 TypeError 并中止引导。运行 uv run bub onboard 并查看保存的配置文件来验证。

  • 钩子参考 —— 完整签名表、优先级与同步/异步规则
  • 工具 —— 注册可被模型调用的工具
  • 发行版 —— 把钩子打包为产品
  • 轮次管线 —— 每个钩子在一次轮次中的位置