spec: add BEHAVIOR_SPEC and fix B-01~B-10 (resolved/decay/scoring)

- Add BEHAVIOR_SPEC.md as full system behaviour reference
- B-01: stop auto-archiving resolved buckets in update()
- B-03: keep activation_count as float in calculate_score
- B-04: initialise activation_count=0 on create
- B-05: time score coefficient 0.1 -> 0.02
- B-06: w_time default 2.5 -> 1.5
- B-07: content_weight default 3.0 -> 1.0
- B-08: refresh local meta after auto_resolve
- B-09: user-supplied valence/arousal takes priority over analyze()
- B-10: allow empty domain for feel buckets
- Refresh INTERNALS/README/dashboard accordingly
This commit is contained in:
P0luz
2026-04-21 18:45:52 +08:00
parent c7ddfd46ad
commit ccdffdb626
10 changed files with 1186 additions and 36 deletions

305
server.py
View File

@@ -35,6 +35,11 @@ import sys
import random
import logging
import asyncio
import hashlib
import hmac
import secrets
import time
import json as _json_lib
import httpx
@@ -57,10 +62,10 @@ setup_logging(config.get("log_level", "INFO"))
logger = logging.getLogger("ombre_brain")
# --- Initialize core components / 初始化核心组件 ---
bucket_mgr = BucketManager(config) # Bucket manager / 记忆桶管理器
embedding_engine = EmbeddingEngine(config) # Embedding engine first (BucketManager depends on it)
bucket_mgr = BucketManager(config, embedding_engine=embedding_engine) # Bucket manager / 记忆桶管理器
dehydrator = Dehydrator(config) # Dehydrator / 脱水器
decay_engine = DecayEngine(config, bucket_mgr) # Decay engine / 衰减引擎
embedding_engine = EmbeddingEngine(config) # Embedding engine / 向量化引擎
import_engine = ImportEngine(config, bucket_mgr, dehydrator, embedding_engine) # Import engine / 导入引擎
# --- Create MCP server instance / 创建 MCP 服务器实例 ---
@@ -73,6 +78,183 @@ mcp = FastMCP(
)
# =============================================================
# Dashboard Auth — simple cookie-based session auth
# Dashboard 认证 —— 基于 Cookie 的会话认证
#
# Env var OMBRE_DASHBOARD_PASSWORD overrides file-stored password.
# First visit with no password set → forced setup wizard.
# Sessions stored in memory (lost on restart, 7-day expiry).
# =============================================================
_sessions: dict[str, float] = {} # {token: expiry_timestamp}
def _get_auth_file() -> str:
return os.path.join(config["buckets_dir"], ".dashboard_auth.json")
def _load_password_hash() -> str | None:
try:
auth_file = _get_auth_file()
if os.path.exists(auth_file):
with open(auth_file, "r", encoding="utf-8") as f:
return _json_lib.load(f).get("password_hash")
except Exception:
pass
return None
def _save_password_hash(password: str) -> None:
salt = secrets.token_hex(16)
h = hashlib.sha256(f"{salt}:{password}".encode()).hexdigest()
auth_file = _get_auth_file()
os.makedirs(os.path.dirname(auth_file), exist_ok=True)
with open(auth_file, "w", encoding="utf-8") as f:
_json_lib.dump({"password_hash": f"{salt}:{h}"}, f)
def _verify_password_hash(password: str, stored: str) -> bool:
if ":" not in stored:
return False
salt, h = stored.split(":", 1)
return hmac.compare_digest(
h, hashlib.sha256(f"{salt}:{password}".encode()).hexdigest()
)
def _is_setup_needed() -> bool:
"""True if no password is configured (env var or file)."""
if os.environ.get("OMBRE_DASHBOARD_PASSWORD", ""):
return False
return _load_password_hash() is None
def _verify_any_password(password: str) -> bool:
"""Check password against env var (first) or stored hash."""
env_pwd = os.environ.get("OMBRE_DASHBOARD_PASSWORD", "")
if env_pwd:
return hmac.compare_digest(password, env_pwd)
stored = _load_password_hash()
if not stored:
return False
return _verify_password_hash(password, stored)
def _create_session() -> str:
token = secrets.token_urlsafe(32)
_sessions[token] = time.time() + 86400 * 7 # 7-day expiry
return token
def _is_authenticated(request) -> bool:
token = request.cookies.get("ombre_session")
if not token:
return False
expiry = _sessions.get(token)
if expiry is None or time.time() > expiry:
_sessions.pop(token, None)
return False
return True
def _require_auth(request):
"""Return JSONResponse(401) if not authenticated, else None."""
from starlette.responses import JSONResponse
if not _is_authenticated(request):
return JSONResponse(
{"error": "Unauthorized", "setup_needed": _is_setup_needed()},
status_code=401,
)
return None
# --- Auth endpoints ---
@mcp.custom_route("/auth/status", methods=["GET"])
async def auth_status(request):
"""Return auth state (authenticated, setup_needed)."""
from starlette.responses import JSONResponse
return JSONResponse({
"authenticated": _is_authenticated(request),
"setup_needed": _is_setup_needed(),
})
@mcp.custom_route("/auth/setup", methods=["POST"])
async def auth_setup_endpoint(request):
"""Initial password setup (only when no password is configured)."""
from starlette.responses import JSONResponse
if not _is_setup_needed():
return JSONResponse({"error": "Already configured"}, status_code=400)
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
password = body.get("password", "").strip()
if len(password) < 6:
return JSONResponse({"error": "密码不能少于6位"}, status_code=400)
_save_password_hash(password)
token = _create_session()
resp = JSONResponse({"ok": True})
resp.set_cookie("ombre_session", token, httponly=True, samesite="lax", max_age=86400 * 7)
return resp
@mcp.custom_route("/auth/login", methods=["POST"])
async def auth_login(request):
"""Login with password."""
from starlette.responses import JSONResponse
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
password = body.get("password", "")
if _verify_any_password(password):
token = _create_session()
resp = JSONResponse({"ok": True})
resp.set_cookie("ombre_session", token, httponly=True, samesite="lax", max_age=86400 * 7)
return resp
return JSONResponse({"error": "密码错误"}, status_code=401)
@mcp.custom_route("/auth/logout", methods=["POST"])
async def auth_logout(request):
"""Invalidate session."""
from starlette.responses import JSONResponse
token = request.cookies.get("ombre_session")
if token:
_sessions.pop(token, None)
resp = JSONResponse({"ok": True})
resp.delete_cookie("ombre_session")
return resp
@mcp.custom_route("/auth/change-password", methods=["POST"])
async def auth_change_password(request):
"""Change dashboard password (requires current password)."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err:
return err
if os.environ.get("OMBRE_DASHBOARD_PASSWORD", ""):
return JSONResponse({"error": "当前使用环境变量密码,请直接修改 OMBRE_DASHBOARD_PASSWORD"}, status_code=400)
try:
body = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
current = body.get("current", "")
new_pwd = body.get("new", "").strip()
if not _verify_any_password(current):
return JSONResponse({"error": "当前密码错误"}, status_code=401)
if len(new_pwd) < 6:
return JSONResponse({"error": "新密码不能少于6位"}, status_code=400)
_save_password_hash(new_pwd)
_sessions.clear()
token = _create_session()
resp = JSONResponse({"ok": True})
resp.set_cookie("ombre_session", token, httponly=True, samesite="lax", max_age=86400 * 7)
return resp
# =============================================================
# /health endpoint: lightweight keepalive
# 轻量保活接口
@@ -274,12 +456,47 @@ async def breath(
valence: float = -1,
arousal: float = -1,
max_results: int = 20,
importance_min: int = -1,
) -> str:
"""检索/浮现记忆。不传query或传空=自动浮现,有query=关键词检索。max_tokens控制返回总token上限(默认10000)。domain逗号分隔,valence/arousal 0~1(-1忽略)。max_results控制返回数量上限(默认20,最大50)。"""
"""检索/浮现记忆。不传query或传空=自动浮现,有query=关键词检索。max_tokens控制返回总token上限(默认10000)。domain逗号分隔,valence/arousal 0~1(-1忽略)。max_results控制返回数量上限(默认20,最大50)。importance_min>=1时按重要度批量拉取(不走语义搜索,按importance降序返回最多20条)。"""
await decay_engine.ensure_started()
max_results = min(max_results, 50)
max_tokens = min(max_tokens, 20000)
# --- importance_min mode: bulk fetch by importance threshold ---
# --- 重要度批量拉取模式:跳过语义搜索,按 importance 降序返回 ---
if importance_min >= 1:
try:
all_buckets = await bucket_mgr.list_all(include_archive=False)
except Exception as e:
return f"记忆系统暂时无法访问: {e}"
filtered = [
b for b in all_buckets
if int(b["metadata"].get("importance", 0)) >= importance_min
and b["metadata"].get("type") not in ("feel",)
]
filtered.sort(key=lambda b: int(b["metadata"].get("importance", 0)), reverse=True)
filtered = filtered[:20]
if not filtered:
return f"没有重要度 >= {importance_min} 的记忆。"
results = []
token_used = 0
for b in filtered:
if token_used >= max_tokens:
break
try:
clean_meta = {k: v for k, v in b["metadata"].items() if k != "tags"}
summary = await dehydrator.dehydrate(strip_wikilinks(b["content"]), clean_meta)
t = count_tokens_approx(summary)
if token_used + t > max_tokens:
break
imp = b["metadata"].get("importance", 0)
results.append(f"[importance:{imp}] [bucket_id:{b['id']}] {summary}")
token_used += t
except Exception as e:
logger.warning(f"importance_min dehydrate failed: {e}")
return "\n---\n".join(results) if results else "没有可以展示的记忆。"
# --- No args or empty query: surfacing mode (weight pool active push) ---
# --- 无参数或空query浮现模式权重池主动推送---
if not query or not query.strip():
@@ -330,6 +547,18 @@ async def breath(
top_scores = [(b["metadata"].get("name", b["id"]), decay_engine.calculate_score(b["metadata"])) for b in scored[:5]]
logger.info(f"Top unresolved scores: {top_scores}")
# --- Cold-start detection: never-seen important buckets surface first ---
# --- 冷启动检测:从未被访问过且重要度>=8的桶优先插入最前面最多2个---
cold_start = [
b for b in unresolved
if int(b["metadata"].get("activation_count", 0)) == 0
and int(b["metadata"].get("importance", 0)) >= 8
][:2]
cold_start_ids = {b["id"] for b in cold_start}
# Merge: cold_start first, then scored (excluding duplicates)
scored_deduped = [b for b in scored if b["id"] not in cold_start_ids]
scored_with_cold = cold_start + scored_deduped
# --- Token-budgeted surfacing with diversity + hard cap ---
# --- 按 token 预算浮现,带多样性 + 硬上限 ---
# Top-1 always surfaces; rest sampled from top-20 for diversity
@@ -337,13 +566,17 @@ async def breath(
for r in pinned_results:
token_budget -= count_tokens_approx(r)
candidates = list(scored)
candidates = list(scored_with_cold)
if len(candidates) > 1:
# Ensure highest-score bucket is first, shuffle rest from top-20
top1 = [candidates[0]]
pool = candidates[1:min(20, len(candidates))]
random.shuffle(pool)
candidates = top1 + pool + candidates[min(20, len(candidates)):]
# Cold-start buckets stay at front; shuffle rest from top-20
n_cold = len(cold_start)
non_cold = candidates[n_cold:]
if len(non_cold) > 1:
top1 = [non_cold[0]]
pool = non_cold[1:min(20, len(non_cold))]
random.shuffle(pool)
non_cold = top1 + pool + non_cold[min(20, len(non_cold)):]
candidates = cold_start + non_cold
# Hard cap: never surface more than max_results buckets
candidates = candidates[:max_results]
@@ -978,6 +1211,8 @@ async def dream() -> str:
async def api_buckets(request):
"""List all buckets with metadata (no content for efficiency)."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
all_buckets = await bucket_mgr.list_all(include_archive=True)
result = []
@@ -1012,6 +1247,8 @@ async def api_buckets(request):
async def api_bucket_detail(request):
"""Get full bucket content by ID."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
bucket_id = request.path_params["bucket_id"]
bucket = await bucket_mgr.get(bucket_id)
if not bucket:
@@ -1029,6 +1266,8 @@ async def api_bucket_detail(request):
async def api_search(request):
"""Search buckets by query."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
query = request.query_params.get("q", "")
if not query:
return JSONResponse({"error": "missing q parameter"}, status_code=400)
@@ -1055,6 +1294,8 @@ async def api_search(request):
async def api_network(request):
"""Get embedding similarity network for visualization."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
all_buckets = await bucket_mgr.list_all(include_archive=False)
nodes = []
@@ -1098,6 +1339,8 @@ async def api_network(request):
async def api_breath_debug(request):
"""Debug endpoint: simulate breath scoring and return per-bucket breakdown."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
query = request.query_params.get("q", "")
q_valence = request.query_params.get("valence")
q_arousal = request.query_params.get("arousal")
@@ -1189,6 +1432,8 @@ async def dashboard(request):
async def api_config_get(request):
"""Get current runtime config (safe fields only, API key masked)."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
dehy = config.get("dehydration", {})
emb = config.get("embedding", {})
api_key = dehy.get("api_key", "")
@@ -1216,6 +1461,8 @@ async def api_config_update(request):
"""Hot-update runtime config. Optionally persist to config.yaml."""
from starlette.responses import JSONResponse
import yaml
err = _require_auth(request)
if err: return err
try:
body = await request.json()
except Exception:
@@ -1306,6 +1553,8 @@ async def api_config_update(request):
async def api_import_upload(request):
"""Upload a conversation file and start import."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
if import_engine.is_running:
return JSONResponse({"error": "Import already running"}, status_code=409)
@@ -1357,6 +1606,8 @@ async def api_import_upload(request):
async def api_import_status(request):
"""Get current import progress."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
return JSONResponse(import_engine.get_status())
@@ -1364,6 +1615,8 @@ async def api_import_status(request):
async def api_import_pause(request):
"""Pause the running import."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
if not import_engine.is_running:
return JSONResponse({"error": "No import running"}, status_code=400)
import_engine.pause()
@@ -1374,6 +1627,8 @@ async def api_import_pause(request):
async def api_import_patterns(request):
"""Detect high-frequency patterns after import."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
patterns = await import_engine.detect_patterns()
return JSONResponse({"patterns": patterns})
@@ -1385,6 +1640,8 @@ async def api_import_patterns(request):
async def api_import_results(request):
"""List recently imported/created buckets for review."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
limit = int(request.query_params.get("limit", "50"))
all_buckets = await bucket_mgr.list_all(include_archive=False)
@@ -1411,6 +1668,8 @@ async def api_import_results(request):
async def api_import_review(request):
"""Apply review decisions: mark buckets as important/noise/pinned."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
body = await request.json()
except Exception:
@@ -1446,6 +1705,34 @@ async def api_import_review(request):
return JSONResponse({"applied": applied, "errors": errors})
# =============================================================
# /api/status — system status for Dashboard settings tab
# /api/status — Dashboard 设置页用系统状态
# =============================================================
@mcp.custom_route("/api/status", methods=["GET"])
async def api_system_status(request):
"""Return detailed system status for the settings panel."""
from starlette.responses import JSONResponse
err = _require_auth(request)
if err: return err
try:
stats = await bucket_mgr.get_stats()
return JSONResponse({
"decay_engine": "running" if decay_engine.is_running else "stopped",
"embedding_enabled": embedding_engine.enabled,
"buckets": {
"permanent": stats.get("permanent_count", 0),
"dynamic": stats.get("dynamic_count", 0),
"archive": stats.get("archive_count", 0),
"total": stats.get("permanent_count", 0) + stats.get("dynamic_count", 0),
},
"using_env_password": bool(os.environ.get("OMBRE_DASHBOARD_PASSWORD", "")),
"version": "1.3.0",
})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
# --- Entry point / 启动入口 ---
if __name__ == "__main__":
transport = config.get("transport", "stdio")