Skip to content

Hook recipes

This page collects working recipes for the most common hook patterns. Each recipe is grounded in the actual signature from src/bub/hookspecs.py. For the full hook contract — argument types, sync/async rules, and firstresult semantics — see the Hook reference.

  • A plugin package wired to the bub entry-point group (see Plugins).
  • The marker from bub import hookimpl available in your module.

A hook is a named extension point declared on BubHookSpecs; your plugin contributes implementations that Bub calls during a turn.

Several recipes below use plugin classes. Register those classes through a callable entry point such as pkg.plugin:MyPlugin, or create a module-level instance and point the entry point at that instance. A bare module target only registers module-level @hookimpl functions.

Replace the text Bub sends to the model. build_prompt is firstresult, so the highest-priority implementation wins.

from bub import hookimpl


class PrefixPrompt:
    @hookimpl
    def build_prompt(self, message, session_id, state):
        content = message["content"] if isinstance(message, dict) else message.content
        return f"[prefix] {content}"

To observe this directly, pair it with an echo run_model hook that returns the prompt, or inspect the prompt in a test. A normal uv run bub run "hello" prints the final outbound text, not the raw prompt sent to the model.

Implement run_model to swap the model stage with any callable that returns a string. The shape below mirrors bub-codex, which delegates to another agent CLI:

import asyncio

from bub import hookimpl
from bub.types import State


@hookimpl
async def run_model(prompt: str, session_id: str, state: State) -> str:
    proc = await asyncio.create_subprocess_exec(
        "codex", "e", prompt,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, _ = await proc.communicate()
    return stdout.decode()

To observe or transform the existing model stream without replacing it, implement run_model_stream and use pluggy’s subset_hook_caller to call the chain without your plugin:

from republic import AsyncStreamEvents

from bub import BubFramework, hookimpl


class StreamTap:
    def __init__(self, framework: BubFramework) -> None:
        self.framework = framework

    @hookimpl
    async def run_model_stream(self, prompt, session_id, state):
        parent = self.framework._plugin_manager.subset_hook_caller(
            "run_model_stream", remove_plugins=[self]
        )
        upstream = await parent(prompt=prompt, session_id=session_id, state=state)
        if upstream is None:
            raise RuntimeError("no parent run_model_stream implementation")

        async def iterator():
            async for event in upstream:
                if event.kind == "text":
                    print(event.data.get("delta", ""), end="")
                yield event

        return AsyncStreamEvents(iterator(), state=upstream._state)

Use this for logging, redaction, or telemetry — without reimplementing the underlying model call.

load_state runs at the start of each turn; save_state runs in finally after the model output is produced. The two together let you carry per-session data forward and tear it down safely. Lifespan hand-off is the canonical case:

from bub import hookimpl


class SessionStorePlugin:
    def __init__(self) -> None:
        self._sessions: dict[str, list[dict[str, object]]] = {}

    @hookimpl
    def load_state(self, message, session_id):
        return {"history": self._sessions.get(session_id, [])}

    @hookimpl
    def save_state(self, session_id, state, message, model_output):
        history = state.get("history", [])
        history.append({"in": message, "out": model_output})
        self._sessions[session_id] = history

A state dict is the per-turn workspace passed through every stage. See Tape and context for how this relates to recorded tapes.

A tape is a recorded conversation thread. provide_tape_store is firstresult and resolved when the framework runtime scope opens. Return a store instance, or yield one for setup/teardown.

from bub import hookimpl


@hookimpl
def provide_tape_store():
    from bub_tapestore_sqlite.store import SQLiteTapeStore

    return SQLiteTapeStore(path="/var/lib/bub/tapes.sqlite3")

The full plugin lives at bub-tapestore-sqlite. For stores that need cleanup, return a generator instead — Bub treats it as a context manager.

A channel is an inbound/outbound surface — CLI, Telegram, WeChat, a scheduled trigger. provide_channels lets your plugin contribute one or more Channel subclasses.

The minimum is a class that sets name, implements start and stop, and optionally implements send or stream_events:

import asyncio

from bub import hookimpl
from bub.channels import Channel
from bub.types import MessageHandler


class WebhookChannel(Channel):
    name = "webhook"

    def __init__(self, message_handler: MessageHandler) -> None:
        self._handler = message_handler
        self._task: asyncio.Task | None = None

    async def start(self, stop_event: asyncio.Event) -> None:
        # bind your transport (HTTP server, message queue, etc.)
        await stop_event.wait()

    async def stop(self) -> None:
        if self._task:
            self._task.cancel()


class WebhookPlugin:
    @hookimpl
    def provide_channels(self, message_handler):
        return [WebhookChannel(message_handler)]

bub-wechat is the canonical custom channel example: it implements both inbound receive and outbound send, and exposes a wechat tool the model can call.

provide_channels is also the right hook for background services that are not chat surfaces. Visual-base’s EyeChannel uses it to host an FFmpeg supervisor and a screen-understanding loop — send is a no-op, but start/stop give the service a lifecycle tied to bub gateway.

register_cli_commands is called once at CLI bootstrap with the root typer.Typer app. Add commands or sub-apps directly:

import typer

from bub import hookimpl


class HelloPlugin:
    @hookimpl
    def register_cli_commands(self, app: typer.Typer) -> None:
        @app.command()
        def hello(name: str = "world") -> None:
            """Say hello."""
            typer.echo(f"hello, {name}")

Run uv run bub hello --name bub to invoke it. This hook runs synchronously — async functions are skipped with a warning.

onboard_config(current_config) lets your plugin contribute fields to the interactive bub onboard flow. Each implementation receives the dict accumulated so far and returns a fragment to merge.

from bub import hookimpl


class WeatherPlugin:
    @hookimpl
    def onboard_config(self, current_config):
        existing = current_config.get("weather", {})
        return {
            "weather": {
                "api_key": existing.get("api_key", ""),
                "enabled": True,
            }
        }

Return None to skip without changing accumulated config. Returning a non-dict value raises TypeError and aborts onboarding. Verify by running uv run bub onboard and inspecting the saved config file.