跳转到内容

Hooks

HookRuntime 驱动大部分框架 hook:

  • call_first(...):按优先级执行,返回第一个非 None
  • call_many(...):执行所有实现,收集所有返回值(包括 None
  • call_first_sync(...) / call_many_sync(...):仅限同步的引导路径

当前 process_inbound() 的 hook 使用情况:

  1. resolve_sessioncall_first
  2. load_statecall_many,然后由框架合并)
  3. build_promptcall_first
  4. run_model_streamcall_first
  5. save_statecall_many,始终在 finally 中执行)
  6. render_outboundcall_many
  7. dispatch_outboundcall_many,按每条出站执行)

兼容性说明:

  • run_model_stream 是主要的模型 hook。
  • 如果没有插件实现 run_model_stream,Bub 会回退到 run_model
  • run_model 的返回值会被包装为仅含一个文本块的流。
  • 插件应只实现其中一个 hook,不要同时实现两个。

其他 hook 消费者:

  • register_cli_commands:由 call_many_sync 调用
  • provide_channels:在 BubFramework.get_channels() 中由 call_many_sync 调用
  • system_promptprovide_tape_storebuild_tape_context:由 BubFramework 和内置 Agent 消费
  • 内置插件最先注册。
  • 后注册的插件具有更高的运行时优先级。
  • HookRuntime 反转 pluggy 的实现顺序,因此后注册的先执行。
  • 对于 load_state,框架在合并前再次反转,使高优先级的值覆盖低优先级的值。
  • 异步 hook 调用可以运行同步和异步两种实现。
  • 同步 hook 调用会跳过可等待的返回值并记录警告。
  • 因此,引导阶段的 hook 应保持同步:
    • register_cli_commands
    • provide_channels
    • provide_tape_store
    • build_tape_context

HookRuntime 只传递你函数签名中声明的参数。 你可以安全地省略未使用的 hook 参数。

示例:

from bub import hookimpl


class SessionPlugin:
    @hookimpl
    def resolve_session(self, message):
        return "my-session"
from __future__ import annotations

from republic import AsyncStreamEvents, StreamEvent

from bub import hookimpl


class EchoPlugin:
    @hookimpl
    def build_prompt(self, message, session_id, state):
        return f"[echo] {message['content']}"

    @hookimpl
    async def run_model_stream(self, prompt, session_id, state):
        async def iterator():
            yield StreamEvent("text", {"delta": prompt})

        return AsyncStreamEvents(iterator())

运行并验证:

uv run bub hooks
uv run bub run "hello"

检查你的插件是否出现在 build_prompt / run_model_stream 的绑定列表中,输出是否反映了你的覆盖。 如果你有意使用旧版兼容 hook,请检查 run_model

如果你想观察或转换父级流而非完全替换它,请实现 run_model_stream 并包装父级 hook 的异步迭代器。

此模式使用 subset_hook_caller(...) 调用去除当前插件的同一 hook 链,然后返回一个新的 AsyncStreamEvents 包装器。

from __future__ import annotations

from republic import AsyncStreamEvents, StreamEvent

from bub import hookimpl


class StreamTapPlugin:
    def __init__(self, framework) -> None:
        self.framework = framework

    @hookimpl
    async def run_model_stream(self, prompt, session_id, state):
        parent_hook = self.framework._plugin_manager.subset_hook_caller(
            "run_model_stream",
            remove_plugins=[self],
        )
        parent_stream = await parent_hook(
            prompt=prompt,
            session_id=session_id,
            state=state,
        )
        if parent_stream is None:
            raise RuntimeError("no parent run_model_stream implementation found")

        async def iterator():
            async for event in parent_stream:
                if event.kind == "text":
                    delta = str(event.data.get("delta", ""))
                    print(delta, end="")
                yield event

        return AsyncStreamEvents(iterator(), state=parent_stream._state)

当你需要记录块、脱敏文本、注入额外事件或测量流时序,同时又不想重新实现底层模型调用时,可使用此模式。

如果你还需要支持仅实现了旧版 run_model 的父级,请添加你自己的回退路径,并将文本结果包装为单块流。

  • 定义了 @tool 函数但未在插件中导入该模块,意味着工具永远不会被注册。
  • 从通过同步路径(call_many_sync / call_first_sync)调用的 hook 返回可等待对象会导致被跳过。
  • 假设 hook 失败是隔离的:非 on_error 的 hook 异常会传播,可能导致整个 turn 失败。
  • 使用过时的 hook 名称:请始终对照 src/bub/hookspecs.py 确认。