钩子
本页列出 bub/hookspecs.py (BubHookSpecs) 中声明的每一个钩子。类型 列使用三种标签:
- firstresult — 对应
pluggy的firstresult=True;HookRuntime.call_first按优先级返回第一个非None的结果。 - broadcast — 所有实现都会执行,结果聚合到列表(
call_many)。 - sync-only consumer — 通过
call_first_sync或call_many_sync调用;返回 awaitable 的实现会被跳过并写入告警。
每个阶段的语义与典型用法见 Turn 流水线 与 构建 › 钩子配方。
| 钩子 | 类型 | 签名 | 返回 | 调用方 | 备注 |
|---|---|---|---|---|---|
resolve_session | firstresult | (message: Envelope) -> str | session id | BubFramework.process_inbound | 没有任何实现返回值时,框架回退为 "{channel}:{chat_id}"。 |
load_state | broadcast | (message: Envelope, session_id: str) -> State | dict to merge | BubFramework.process_inbound | 框架先反转结果列表再依次合并,使高优先级的键覆盖低优先级。 |
build_prompt | firstresult | (message: Envelope, session_id: str, state: State) -> str | list[dict] | prompt or content parts | BubFramework.process_inbound | 没有实现返回非 None 值,或选中的 firstresult 为 falsy 时,回退到 content_of(message)。已选中的 falsy 值不会触发继续尝试低优先级实现。 |
run_model | firstresult | (prompt, session_id, state) -> str | model text | HookRuntime.run_model | 旧版纯文本路径。run_model 与 run_model_stream 选其一实现,不要同时实现。 |
run_model_stream | firstresult | (prompt, session_id, state) -> AsyncStreamEvents | async stream | HookRuntime.run_model_stream | 推荐路径。无流式实现时框架将 run_model 结果包装为单个 chunk 的流。 |
save_state | broadcast | (session_id, state, message, model_output) -> None | none | process_inbound model-stage finally block | prompt 解析完成后,在模型阶段成功或失败时执行;若失败发生在 prompt/model 执行前则跳过。 |
render_outbound | broadcast | (message, session_id, state, model_output) -> list[Envelope] | outbound batch | BubFramework._collect_outbounds | 所有批次通过 unpack_batch 拼接;结果为空时使用默认回声 envelope。 |
dispatch_outbound | broadcast | (message: Envelope) -> bool | sent flag | process_inbound per outbound | 每个 outbound 都会广播给所有实现。 |
register_cli_commands | sync-only consumer | (app: typer.Typer) -> None | none | BubFramework.create_cli_app (call_many_sync) | 仅用于启动期;async 实现会被跳过并产生告警。 |
onboard_config | sync-only consumer (custom merge) | (current_config: dict) -> dict | None | config fragment | BubFramework.collect_onboard_config | 按优先级遍历;每个返回值通过 configure.merge 合并;非 dict 返回会抛出 TypeError。 |
on_error | observer | (stage: str, error: Exception, message: Envelope | None) -> None | none | HookRuntime.notify_error / notify_error_sync | on_error 实现内部抛出的异常会被吞掉并写日志,确保其他观察者继续运行。 |
system_prompt | broadcast (joined) | (prompt, state) -> str | prompt fragment | BubFramework.get_system_prompt (call_many_sync) | 结果先反转再用 \n\n 拼接,只保留真值片段。 |
provide_tape_store | firstresult | () -> TapeStore | AsyncTapeStore | tape store | BubFramework.running() | 仅在 runtime 作用域开启时解析一次;返回同步或异步迭代器时会被作为 context manager 进入。 |
provide_channels | sync-only consumer (deduped) | (message_handler: MessageHandler) -> list[Channel] | channels | BubFramework.get_channels (call_many_sync) | 按 Channel.name 去重;在钩子优先级顺序中最先出现的 channel 胜出。 |
build_tape_context | firstresult | () -> TapeContext | tape context | BubFramework.build_tape_context (call_first_sync) | 仅同步;awaitable 返回会被跳过。 |
钩子如何被调用
Section titled “钩子如何被调用”HookRuntime(位于 src/bub/hook_runtime.py)在 pluggy.PluginManager 之上提供以下语义。
def _iter_hookimpls(self, hook_name: str) -> list[Any]:
hook = getattr(self._plugin_manager.hook, hook_name, None)
if hook is None or not hasattr(hook, "get_hookimpls"):
return []
return list(reversed(hook.get_hookimpls()))
pluggy 按注册顺序返回实现。HookRuntime 将列表反转,因此 最近注册的插件最先运行。Builtin 早于所有 entry-point 插件注册,所以用户插件在 firstresult 钩子上始终优先。
bub hooks 是发现报告。当前实现打印的是 pluggy 的原始 hook implementation 顺序,因此判断运行时优先级时应以本节说明为准,而不是只看输出顺序。
call_first vs call_many
Section titled “call_first vs call_many”| 方法 | 行为 |
|---|---|
call_first(name, **kwargs) | 按反转后的顺序遍历实现,返回第一个非 None 的值。 |
call_many(name, **kwargs) | 遍历全部实现,把每个未跳过的返回值收集到列表里。 |
call_first_sync / call_many_sync | 同样的遍历;awaitable 返回值会被丢弃并写入告警。 |
异步调用 (call_first、call_many) 会 await 任何 awaitable 返回值。同步调用 (*_sync) 通过 inspect.isawaitable(value) 判断,若为真则发出 hook.async_not_supported hook=<name> adapter=<plugin> 告警并跳过该值。
启动期钩子 必须 同步:register_cli_commands、onboard_config、provide_channels、provide_tape_store、build_tape_context,以及 system_prompt。
@staticmethod
def _kwargs_for_impl(impl: Any, kwargs: dict[str, Any]) -> dict[str, Any]:
return {name: kwargs[name] for name in impl.argnames if name in kwargs}
每个实现只会收到自己声明的关键字参数。函数签名中可以省略不需要的参数,不会影响分发。
Tape store 生命周期
Section titled “Tape store 生命周期”provide_tape_store 在每个 BubFramework.running() 作用域内 只解析一次,而不是每个 turn 一次:
bub run在单次 inbound turn 周围打开作用域,然后关闭。bub chat与bub gateway让作用域持续到 listener 退出。- 实现返回同步或异步迭代器时,Bub 会把它当作
contextmanager/asynccontextmanager;yield出的值是作用域结束前生效的 store。
BubFramework.get_tape_store() 在作用域之外返回 None。
on_error 观察者安全性
Section titled “on_error 观察者安全性”notify_error 与 notify_error_sync 把每个实现包在 try/except 中;观察者失败会写入日志 (hook.on_error_failed stage=… adapter=…) 但不会向上传递。这样可以保证某个观察者出错不会阻塞其他观察者,也不会让 on_error 掩盖原始异常。
每个签名的运行时契约见 src/bub/hookspecs.py。要核对当前环境注册的实现,运行 bub hooks(详见 CLI › hooks)。