Hook recipes
This page collects working recipes for the most common hook patterns. Each recipe is grounded in the actual signature from src/bub/hookspecs.py. For the full hook contract — argument types, sync/async rules, and firstresult semantics — see the Hook reference.
Before you begin
Section titled “Before you begin”- A plugin package wired to the
bubentry-point group (see Plugins). - The marker
from bub import hookimplavailable in your module.
A hook is a named extension point declared on BubHookSpecs; your plugin contributes implementations that Bub calls during a turn.
Several recipes below use plugin classes. Register those classes through a callable entry point such as pkg.plugin:MyPlugin, or create a module-level instance and point the entry point at that instance. A bare module target only registers module-level @hookimpl functions.
1. Override the prompt
Section titled “1. Override the prompt”Replace the text Bub sends to the model. build_prompt is firstresult, so the highest-priority implementation wins.
from bub import hookimpl
class PrefixPrompt:
@hookimpl
def build_prompt(self, message, session_id, state):
content = message["content"] if isinstance(message, dict) else message.content
return f"[prefix] {content}"
To observe this directly, pair it with an echo run_model hook that returns the prompt, or inspect the prompt in a test. A normal uv run bub run "hello" prints the final outbound text, not the raw prompt sent to the model.
2. Replace the model
Section titled “2. Replace the model”Implement run_model to swap the model stage with any callable that returns a string. The shape below mirrors bub-codex, which delegates to another agent CLI:
import asyncio
from bub import hookimpl
from bub.types import State
@hookimpl
async def run_model(prompt: str, session_id: str, state: State) -> str:
proc = await asyncio.create_subprocess_exec(
"codex", "e", prompt,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await proc.communicate()
return stdout.decode()
3. Wrap the parent stream
Section titled “3. Wrap the parent stream”To observe or transform the existing model stream without replacing it, implement run_model_stream and use pluggy’s subset_hook_caller to call the chain without your plugin:
from republic import AsyncStreamEvents
from bub import BubFramework, hookimpl
class StreamTap:
def __init__(self, framework: BubFramework) -> None:
self.framework = framework
@hookimpl
async def run_model_stream(self, prompt, session_id, state):
parent = self.framework._plugin_manager.subset_hook_caller(
"run_model_stream", remove_plugins=[self]
)
upstream = await parent(prompt=prompt, session_id=session_id, state=state)
if upstream is None:
raise RuntimeError("no parent run_model_stream implementation")
async def iterator():
async for event in upstream:
if event.kind == "text":
print(event.data.get("delta", ""), end="")
yield event
return AsyncStreamEvents(iterator(), state=upstream._state)
Use this for logging, redaction, or telemetry — without reimplementing the underlying model call.
4. Persist state across turns
Section titled “4. Persist state across turns”load_state runs at the start of each turn; save_state runs in finally after the model output is produced. The two together let you carry per-session data forward and tear it down safely. Lifespan hand-off is the canonical case:
from bub import hookimpl
class SessionStorePlugin:
def __init__(self) -> None:
self._sessions: dict[str, list[dict[str, object]]] = {}
@hookimpl
def load_state(self, message, session_id):
return {"history": self._sessions.get(session_id, [])}
@hookimpl
def save_state(self, session_id, state, message, model_output):
history = state.get("history", [])
history.append({"in": message, "out": model_output})
self._sessions[session_id] = history
A state dict is the per-turn workspace passed through every stage. See Tape and context for how this relates to recorded tapes.
5. Provide a tape store
Section titled “5. Provide a tape store”A tape is a recorded conversation thread. provide_tape_store is firstresult and resolved when the framework runtime scope opens. Return a store instance, or yield one for setup/teardown.
from bub import hookimpl
@hookimpl
def provide_tape_store():
from bub_tapestore_sqlite.store import SQLiteTapeStore
return SQLiteTapeStore(path="/var/lib/bub/tapes.sqlite3")
The full plugin lives at bub-tapestore-sqlite. For stores that need cleanup, return a generator instead — Bub treats it as a context manager.
6. Add a channel
Section titled “6. Add a channel”A channel is an inbound/outbound surface — CLI, Telegram, WeChat, a scheduled trigger. provide_channels lets your plugin contribute one or more Channel subclasses.
The minimum is a class that sets name, implements start and stop, and optionally implements send or stream_events:
import asyncio
from bub import hookimpl
from bub.channels import Channel
from bub.types import MessageHandler
class WebhookChannel(Channel):
name = "webhook"
def __init__(self, message_handler: MessageHandler) -> None:
self._handler = message_handler
self._task: asyncio.Task | None = None
async def start(self, stop_event: asyncio.Event) -> None:
# bind your transport (HTTP server, message queue, etc.)
await stop_event.wait()
async def stop(self) -> None:
if self._task:
self._task.cancel()
class WebhookPlugin:
@hookimpl
def provide_channels(self, message_handler):
return [WebhookChannel(message_handler)]
bub-wechat is the canonical custom channel example: it implements both inbound receive and outbound send, and exposes a wechat tool the model can call.
provide_channels is also the right hook for background services that are not chat surfaces. Visual-base’s EyeChannel uses it to host an FFmpeg supervisor and a screen-understanding loop — send is a no-op, but start/stop give the service a lifecycle tied to bub gateway.
7. Register a CLI command
Section titled “7. Register a CLI command”register_cli_commands is called once at CLI bootstrap with the root typer.Typer app. Add commands or sub-apps directly:
import typer
from bub import hookimpl
class HelloPlugin:
@hookimpl
def register_cli_commands(self, app: typer.Typer) -> None:
@app.command()
def hello(name: str = "world") -> None:
"""Say hello."""
typer.echo(f"hello, {name}")
Run uv run bub hello --name bub to invoke it. This hook runs synchronously — async functions are skipped with a warning.
8. Participate in onboarding
Section titled “8. Participate in onboarding”onboard_config(current_config) lets your plugin contribute fields to the interactive bub onboard flow. Each implementation receives the dict accumulated so far and returns a fragment to merge.
from bub import hookimpl
class WeatherPlugin:
@hookimpl
def onboard_config(self, current_config):
existing = current_config.get("weather", {})
return {
"weather": {
"api_key": existing.get("api_key", ""),
"enabled": True,
}
}
Return None to skip without changing accumulated config. Returning a non-dict value raises TypeError and aborts onboarding. Verify by running uv run bub onboard and inspecting the saved config file.
Next steps
Section titled “Next steps”- Hook reference — full signature table, precedence, and sync/async rules
- Tools — register tools the model can call
- Distribution — bundle hooks into a product
- Turn pipeline — where each hook fits in one turn