跳转到内容

类型

本页列出插件作者会接触到的类型与辅助函数。源码位于 src/bub/types.pysrc/bub/envelope.pysrc/bub/channels/base.pysrc/bub/framework.pysrc/bub/__init__.py

type Envelope = Any

EnvelopeAny 的别名。Bub 不要求特定类型 —— dict、dataclasspydantic.BaseModel、任意带属性访问的对象都可以。框架通过下面的辅助函数读取字段;访问 envelope 时请使用它们,而不要直接索引。

def field_of(message: Envelope, key: str, default: Any = None) -> Any

messageMapping 时返回 message.get(key, default),否则返回 getattr(message, key, default)

def content_of(message: Envelope) -> str

返回 str(field_of(message, "content", ""))

def normalize_envelope(message: Envelope) -> dict[str, Any]

把任意 envelope 转换为可变 dict。Mappingdict(message);有 __dict__ 的对象 → dict(vars(message));其余 → {"content": str(message)}

def unpack_batch(batch: Any) -> list[Envelope]

把单个 render_outbound 返回值规范化为列表。None[];list / tuple → 列表副本;其他 → [batch]

type State = dict[str, Any]

per-turn 的 state dict。框架先以 _runtime_workspace 初始化,再合并所有 load_state 钩子的结果,然后才调用模型。同一个 dict 会被传给 build_promptrun_model[_stream]save_staterender_outboundsystem_prompt

type MessageHandler = Callable[[Envelope], Coroutine[Any, Any, None]]

Channel 在每次收到 inbound 消息时调用的异步可调用对象。ChannelManager 在调用 provide_channels 时会传入它。

type OutboundDispatcher = Callable[[Envelope], Coroutine[Any, Any, bool]]

异步可调用对象;outbound 已成功投递时返回 True。channel router 内部使用;插件从 dispatch_outbound 返回 True 来表示发送成功。

class OutboundChannelRouter(Protocol):
    async def dispatch_output(self, message: Envelope) -> bool: ...
    def wrap_stream(
        self, message: Envelope, stream: AsyncIterable[StreamEvent]
    ) -> AsyncIterable[StreamEvent]: ...
    async def quit(self, session_id: str) -> None: ...

ChannelManager 实现的结构化 protocol。通过 BubFramework.bind_outbound_router(router) 绑定到框架;用来发送 outbound,以及包装模型 stream 让 channel 增量渲染 token。

@dataclass(frozen=True)
class TurnResult:
    session_id: str
    prompt: str
    model_output: str
    outbounds: list[Envelope] = field(default_factory=list)

BubFramework.process_inbound 的返回值。prompt 是解析后的 prompt。源码标注当前仍是 str,但 build_prompt hook 可以返回多模态内容片段列表,运行时会保留这个 list。outbounds 是所有 render_outbound 实现结果的扁平拼接。

class Channel(ABC):
    name: ClassVar[str] = "base"

    @abstractmethod
    async def start(self, stop_event: asyncio.Event) -> None: ...

    @abstractmethod
    async def stop(self) -> None: ...

    @property
    def needs_debounce(self) -> bool: ...

    @property
    def enabled(self) -> bool: ...

    async def send(self, message: ChannelMessage) -> None: ...

    def stream_events(
        self, message: ChannelMessage, stream: AsyncIterable[StreamEvent]
    ) -> AsyncIterable[StreamEvent]: ...

inbound / outbound channel 的抽象基类(src/bub/channels/base.py)。

成员描述
name类变量;ChannelManager 据此查找,并在多个 provide_channels 实现间去重。
start(stop_event)开始监听;在 stop_event 被设置时退出。必须实现。
stop()释放资源。必须实现。
needs_debounceTrue 时,Manager 用 BufferedMessageHandler 包装 inbound。默认 False
enabledFalse 时,即使在 enabled_channels 中也会被 Manager 跳过。默认 True
send(message)outbound 投递。默认实现为 no-op。
stream_events(message, stream)OutboundChannelRouter.wrap_stream 调用的可选 stream 包装器。默认原样返回。

完整示例见 运维 › Channels构建 › 插件

src/bub/framework.py 暴露的构造函数与公共方法。

class BubFramework:
    workspace: Path
    config_file: Path

    def __init__(self, config_file: Path = DEFAULT_CONFIG_FILE) -> None: ...

    def load_hooks(self) -> None: ...
    def create_cli_app(self) -> typer.Typer: ...

    async def process_inbound(
        self, inbound: Envelope, stream_output: bool = False
    ) -> TurnResult: ...

    def get_channels(
        self, message_handler: MessageHandler
    ) -> dict[str, Channel]: ...

    def get_tape_store(self) -> TapeStore | AsyncTapeStore | None: ...
    def get_system_prompt(self, prompt: str | list[dict], state: dict[str, Any]) -> str: ...
    def hook_report(self) -> dict[str, list[str]]: ...

    @contextlib.asynccontextmanager
    async def running(self) -> AsyncGenerator[contextlib.AsyncExitStack, None]: ...

    def bind_outbound_router(self, router: OutboundChannelRouter | None) -> None: ...
    def build_tape_context(self) -> TapeContext: ...
    def collect_onboard_config(self) -> dict[str, Any]: ...
成员描述
workspace解析后的当前工作目录;被 --workspace 覆盖。
config_fileYAML 配置文件的解析路径(默认 ~/.bub/config.yml)。
load_hooks()注册 builtin 插件以及每个 bub entry-point 插件。结果记录在 _plugin_status
create_cli_app()构造根 typer.Typer,设置 ctx.obj,并同步调用 register_cli_commands
process_inbound(inbound, stream_output=False)把一个 inbound 跑完整条钩子流水线,返回 TurnResult
get_channels(message_handler)同步收集所有 provide_channels 实现产生的 channel,按 Channel.name 去重。
get_tape_store()返回 running() 中启用的 tape store;在作用域之外返回 None
get_system_prompt(prompt, state)同步调用 system_prompt 实现,反转后用 \n\n 拼接非空片段。
hook_report()返回 hook 名 → 已发现的 adapter 列表。bub hooks 的数据来源;不要只根据该输出顺序推断运行时优先级。
running()异步 context manager;一次性解析 provide_tape_store 并在作用域内绑定 tape store。
bind_outbound_router(router)绑定(或传 None 解绑)OutboundChannelRouterChannelManager 在启停时调用。
build_tape_context()同步调用 build_tape_context 并返回 TapeContext
collect_onboard_config()遍历 onboard_config 实现,通过 configure.merge 合并 dict 片段后再验证。

process_inbound 的逐步生命周期见 概念 › Turn 流水线。Tape 语义见 概念 › Tape 与 context

来自 src/bub/__init__.py:

名称类型描述
BubFrameworkclass框架运行时(见上)。
Settingsclass插件配置基类(从 bub.configure 重新导出)。
configdecorator@config(name="...") 注册一个用于 YAML/env 验证的配置类。
ensure_configfunctionensure_config(SettingsCls) —— 返回该类的单例实例。
homePath(惰性属性)BUB_HOME 已设置时为 Path(BUB_HOME),否则为 ~/.bub
hookimplmarkerpluggy.HookimplMarker("bub") —— 装饰插件的 hook 方法。
tooldecorator把内置 tool 注册到 agent(src/bub/tools.py)。
__version__str已安装的 package 版本。