refactor: doc/code consistency, OMBRE_PORT, webhook push, host-vault dashboard

Doc-code consistency (per BEHAVIOR_SPEC.md ground truth):
- INTERNALS.md, dehydrator.py, README.md, config.example.yaml: drop the
  outdated "API 不可用自动降级到本地关键词提取" claims; align with the
  "RuntimeError on API outage, no silent fallback" design decision
- INTERNALS.md & BEHAVIOR_SPEC.md narrative: activation_count=1 → 0 (B-04)
- server.py header: 5 MCP tools → 6 (add dream)

OMBRE_PORT (T5/T6):
- Replace hardcoded 8000 in FastMCP / uvicorn / keepalive URL
  with int(os.environ.get("OMBRE_PORT", "8000"))

OMBRE_HOOK_URL / OMBRE_HOOK_SKIP webhook (T7):
- Implement _fire_webhook() helper: fire-and-forget POST with 5s timeout,
  failures logged at WARNING but never propagated
- Wired into breath / dream MCP tools and /breath-hook + /dream-hook routes
- Push payload: {event, timestamp, payload:{...}}; documented in ENV_VARS.md

Dashboard host-vault input (T12, per user request):
- New /api/host-vault GET/POST endpoints persist OMBRE_HOST_VAULT_DIR
  to project-root .env (idempotent upsert, preserves other entries,
  rejects quotes/newlines)
- Settings tab gains a "宿主机记忆桶目录 (Docker)" panel with
  load/save buttons and a clear "需要 docker compose down/up 生效" notice
This commit is contained in:
P0luz
2026-04-21 20:08:52 +08:00
parent 38be7610f4
commit 71154d905f
8 changed files with 290 additions and 27 deletions

183
server.py
View File

@@ -10,18 +10,20 @@
# 核心职责:
# - Initialize config, bucket manager, dehydrator, decay engine
# 初始化配置、记忆桶管理器、脱水器、衰减引擎
# - Expose 5 MCP tools:
# 暴露 5 个 MCP 工具:
# - Expose 6 MCP tools:
# 暴露 6 个 MCP 工具:
# breath — Surface unresolved memories or search by keyword
# 浮现未解决记忆 或 按关键词检索
# hold — Store a single memory
# 存储单条记忆
# hold — Store a single memory (or write a `feel` reflection)
# 存储单条记忆(或写 feel 反思)
# grow — Diary digest, auto-split into multiple buckets
# 日记归档,自动拆分多桶
# trace — Modify metadata / resolved / delete
# 修改元数据 / resolved 标记 / 删除
# pulse — System status + bucket listing
# 系统状态 + 所有桶列表
# dream — Surface recent dynamic buckets for self-digestion
# 返回最近桶 供模型自省/写 feel
#
# Startup:
# 启动方式:
@@ -61,6 +63,39 @@ config = load_config()
setup_logging(config.get("log_level", "INFO"))
logger = logging.getLogger("ombre_brain")
# --- Runtime env vars (port + webhook) / 运行时环境变量 ---
# OMBRE_PORT: HTTP/SSE 监听端口,默认 8000
try:
OMBRE_PORT = int(os.environ.get("OMBRE_PORT", "8000") or "8000")
except ValueError:
logger.warning("OMBRE_PORT 不是合法整数,回退到 8000")
OMBRE_PORT = 8000
# OMBRE_HOOK_URL: 在 breath/dream 被调用后推送事件到该 URLPOST JSON
# OMBRE_HOOK_SKIP: 设为 true/1/yes 跳过推送。
# 详见 ENV_VARS.md。
OMBRE_HOOK_URL = os.environ.get("OMBRE_HOOK_URL", "").strip()
OMBRE_HOOK_SKIP = os.environ.get("OMBRE_HOOK_SKIP", "").strip().lower() in ("1", "true", "yes", "on")
async def _fire_webhook(event: str, payload: dict) -> None:
"""
Fire-and-forget POST to OMBRE_HOOK_URL with the given event payload.
Failures are logged at WARNING level only — never propagated to the caller.
"""
if OMBRE_HOOK_SKIP or not OMBRE_HOOK_URL:
return
try:
body = {
"event": event,
"timestamp": time.time(),
"payload": payload,
}
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(OMBRE_HOOK_URL, json=body)
except Exception as e:
logger.warning(f"Webhook push failed ({event}{OMBRE_HOOK_URL}): {e}")
# --- Initialize core components / 初始化核心组件 ---
embedding_engine = EmbeddingEngine(config) # Embedding engine first (BucketManager depends on it)
bucket_mgr = BucketManager(config, embedding_engine=embedding_engine) # Bucket manager / 记忆桶管理器
@@ -74,7 +109,7 @@ import_engine = ImportEngine(config, bucket_mgr, dehydrator, embedding_engine)
mcp = FastMCP(
"Ombre Brain",
host="0.0.0.0",
port=8000,
port=OMBRE_PORT,
)
@@ -322,8 +357,11 @@ async def breath_hook(request):
token_budget -= summary_tokens
if not parts:
await _fire_webhook("breath_hook", {"surfaced": 0})
return PlainTextResponse("")
return PlainTextResponse("[Ombre Brain - 记忆浮现]\n" + "\n---\n".join(parts))
body_text = "[Ombre Brain - 记忆浮现]\n" + "\n---\n".join(parts)
await _fire_webhook("breath_hook", {"surfaced": len(parts), "chars": len(body_text)})
return PlainTextResponse(body_text)
except Exception as e:
logger.warning(f"Breath hook failed: {e}")
return PlainTextResponse("")
@@ -360,7 +398,9 @@ async def dream_hook(request):
f"{strip_wikilinks(b['content'][:200])}"
)
return PlainTextResponse("[Ombre Brain - Dreaming]\n" + "\n---\n".join(parts))
body_text = "[Ombre Brain - Dreaming]\n" + "\n---\n".join(parts)
await _fire_webhook("dream_hook", {"surfaced": len(parts), "chars": len(body_text)})
return PlainTextResponse(body_text)
except Exception as e:
logger.warning(f"Dream hook failed: {e}")
return PlainTextResponse("")
@@ -718,9 +758,12 @@ async def breath(
logger.warning(f"Random surfacing failed / 随机浮现失败: {e}")
if not results:
await _fire_webhook("breath", {"mode": "empty", "matches": 0})
return "未找到相关记忆。"
return "\n---\n".join(results)
final_text = "\n---\n".join(results)
await _fire_webhook("breath", {"mode": "ok", "matches": len(matches), "chars": len(final_text)})
return final_text
# =============================================================
@@ -1205,7 +1248,9 @@ async def dream() -> str:
except Exception as e:
logger.warning(f"Dream crystallization hint failed: {e}")
return header + "\n---\n".join(parts) + connection_hint + crystal_hint
final_text = header + "\n---\n".join(parts) + connection_hint + crystal_hint
await _fire_webhook("dream", {"recent": len(recent), "chars": len(final_text)})
return final_text
# =============================================================
@@ -1549,6 +1594,122 @@ async def api_config_update(request):
return JSONResponse({"updated": updated, "ok": True})
# =============================================================
# /api/host-vault — read/write the host-side OMBRE_HOST_VAULT_DIR
# 用于在 Dashboard 设置 docker-compose 挂载的宿主机记忆桶目录。
# 写入项目根目录的 .env 文件,需 docker compose down/up 才能生效。
# =============================================================
def _project_env_path() -> str:
return os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")
def _read_env_var(name: str) -> str:
"""Return current value of `name` from process env first, then .env file (best-effort)."""
val = os.environ.get(name, "").strip()
if val:
return val
env_path = _project_env_path()
if not os.path.exists(env_path):
return ""
try:
with open(env_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
if k.strip() == name:
return v.strip().strip('"').strip("'")
except Exception:
pass
return ""
def _write_env_var(name: str, value: str) -> None:
"""
Idempotent upsert of `NAME=value` in project .env. Creates the file if missing.
Preserves other entries verbatim. Quotes values containing spaces.
"""
env_path = _project_env_path()
quoted = f'"{value}"' if value and (" " in value or "#" in value) else value
new_line = f"{name}={quoted}\n"
lines: list[str] = []
if os.path.exists(env_path):
with open(env_path, "r", encoding="utf-8") as f:
lines = f.readlines()
replaced = False
for i, raw in enumerate(lines):
stripped = raw.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
k, _, _v = stripped.partition("=")
if k.strip() == name:
lines[i] = new_line
replaced = True
break
if not replaced:
if lines and not lines[-1].endswith("\n"):
lines[-1] += "\n"
lines.append(new_line)
with open(env_path, "w", encoding="utf-8") as f:
f.writelines(lines)
@mcp.custom_route("/api/host-vault", methods=["GET"])
async def api_host_vault_get(request):
"""Read the current OMBRE_HOST_VAULT_DIR (process env > project .env)."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
value = _read_env_var("OMBRE_HOST_VAULT_DIR")
return JSONResponse({
"value": value,
"source": "env" if os.environ.get("OMBRE_HOST_VAULT_DIR", "").strip() else ("file" if value else ""),
"env_file": _project_env_path(),
})
@mcp.custom_route("/api/host-vault", methods=["POST"])
async def api_host_vault_set(request):
"""
Persist OMBRE_HOST_VAULT_DIR to the project .env file.
Body: {"value": "/path/to/vault"} (empty string clears the entry)
Note: container restart is required for docker-compose to pick up the new mount.
"""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "invalid JSON"}, status_code=400)
raw = body.get("value", "")
if not isinstance(raw, str):
return JSONResponse({"error": "value must be a string"}, status_code=400)
value = raw.strip()
# Reject characters that would break .env / shell parsing
if "\n" in value or "\r" in value or '"' in value or "'" in value:
return JSONResponse({"error": "value must not contain quotes or newlines"}, status_code=400)
try:
_write_env_var("OMBRE_HOST_VAULT_DIR", value)
except Exception as e:
return JSONResponse({"error": f"failed to write .env: {e}"}, status_code=500)
return JSONResponse({
"ok": True,
"value": value,
"env_file": _project_env_path(),
"note": "已写入 .env需在宿主机执行 `docker compose down && docker compose up -d` 让新挂载生效。",
})
# =============================================================
# Import API — conversation history import
# 导入 API — 对话历史导入
@@ -1755,7 +1916,7 @@ if __name__ == "__main__":
async with httpx.AsyncClient() as client:
while True:
try:
await client.get("http://localhost:8000/health", timeout=5)
await client.get(f"http://localhost:{OMBRE_PORT}/health", timeout=5)
logger.debug("Keepalive ping OK / 保活 ping 成功")
except Exception as e:
logger.warning(f"Keepalive ping failed / 保活 ping 失败: {e}")
@@ -1782,6 +1943,6 @@ if __name__ == "__main__":
expose_headers=["*"],
)
logger.info("CORS middleware enabled for remote transport / 已启用 CORS 中间件")
uvicorn.run(_app, host="0.0.0.0", port=8000)
uvicorn.run(_app, host="0.0.0.0", port=OMBRE_PORT)
else:
mcp.run(transport=transport)