Hooks
Hook 执行语义
Section titled “Hook 执行语义”HookRuntime 驱动大部分框架 hook:
call_first(...):按优先级执行,返回第一个非None值call_many(...):执行所有实现,收集所有返回值(包括None)call_first_sync(...)/call_many_sync(...):仅限同步的引导路径
当前 process_inbound() 的 hook 使用情况:
resolve_session(call_first)load_state(call_many,然后由框架合并)build_prompt(call_first)run_model_stream(call_first)save_state(call_many,始终在finally中执行)render_outbound(call_many)dispatch_outbound(call_many,按每条出站执行)
兼容性说明:
run_model_stream是主要的模型 hook。- 如果没有插件实现
run_model_stream,Bub 会回退到run_model。 run_model的返回值会被包装为仅含一个文本块的流。- 插件应只实现其中一个 hook,不要同时实现两个。
其他 hook 消费者:
register_cli_commands:由call_many_sync调用provide_channels:在BubFramework.get_channels()中由call_many_sync调用system_prompt、provide_tape_store、build_tape_context:由BubFramework和内置Agent消费
优先级与覆盖规则
Section titled “优先级与覆盖规则”- 内置插件最先注册。
- 后注册的插件具有更高的运行时优先级。
HookRuntime反转 pluggy 的实现顺序,因此后注册的先执行。- 对于
load_state,框架在合并前再次反转,使高优先级的值覆盖低优先级的值。
同步与异步规则
Section titled “同步与异步规则”- 异步 hook 调用可以运行同步和异步两种实现。
- 同步 hook 调用会跳过可等待的返回值并记录警告。
- 因此,引导阶段的 hook 应保持同步:
register_cli_commandsprovide_channelsprovide_tape_storebuild_tape_context
HookRuntime 只传递你函数签名中声明的参数。
你可以安全地省略未使用的 hook 参数。
示例:
from bub import hookimpl
class SessionPlugin:
@hookimpl
def resolve_session(self, message):
return "my-session"
最小端到端示例
Section titled “最小端到端示例”from __future__ import annotations
from republic import AsyncStreamEvents, StreamEvent
from bub import hookimpl
class EchoPlugin:
@hookimpl
def build_prompt(self, message, session_id, state):
return f"[echo] {message['content']}"
@hookimpl
async def run_model_stream(self, prompt, session_id, state):
async def iterator():
yield StreamEvent("text", {"delta": prompt})
return AsyncStreamEvents(iterator())
运行并验证:
uv run bub hooks
uv run bub run "hello"
检查你的插件是否出现在 build_prompt / run_model_stream 的绑定列表中,输出是否反映了你的覆盖。
如果你有意使用旧版兼容 hook,请检查 run_model。
如果你想观察或转换父级流而非完全替换它,请实现 run_model_stream 并包装父级 hook 的异步迭代器。
此模式使用 subset_hook_caller(...) 调用去除当前插件的同一 hook 链,然后返回一个新的 AsyncStreamEvents 包装器。
from __future__ import annotations
from republic import AsyncStreamEvents, StreamEvent
from bub import hookimpl
class StreamTapPlugin:
def __init__(self, framework) -> None:
self.framework = framework
@hookimpl
async def run_model_stream(self, prompt, session_id, state):
parent_hook = self.framework._plugin_manager.subset_hook_caller(
"run_model_stream",
remove_plugins=[self],
)
parent_stream = await parent_hook(
prompt=prompt,
session_id=session_id,
state=state,
)
if parent_stream is None:
raise RuntimeError("no parent run_model_stream implementation found")
async def iterator():
async for event in parent_stream:
if event.kind == "text":
delta = str(event.data.get("delta", ""))
print(delta, end="")
yield event
return AsyncStreamEvents(iterator(), state=parent_stream._state)
当你需要记录块、脱敏文本、注入额外事件或测量流时序,同时又不想重新实现底层模型调用时,可使用此模式。
如果你还需要支持仅实现了旧版 run_model 的父级,请添加你自己的回退路径,并将文本结果包装为单块流。
- 定义了
@tool函数但未在插件中导入该模块,意味着工具永远不会被注册。 - 从通过同步路径(
call_many_sync/call_first_sync)调用的 hook 返回可等待对象会导致被跳过。 - 假设 hook 失败是隔离的:非
on_error的 hook 异常会传播,可能导致整个 turn 失败。 - 使用过时的 hook 名称:请始终对照
src/bub/hookspecs.py确认。