662 lines
26 KiB
Python
662 lines
26 KiB
Python
# ============================================================
|
||
# Module: MCP Server Entry Point (server.py)
|
||
# 模块:MCP 服务器主入口
|
||
#
|
||
# Starts the Ombre Brain MCP service and registers memory
|
||
# operation tools for Claude to call.
|
||
# 启动 Ombre Brain MCP 服务,注册记忆操作工具供 Claude 调用。
|
||
#
|
||
# Core responsibilities:
|
||
# 核心职责:
|
||
# - Initialize config, bucket manager, dehydrator, decay engine
|
||
# 初始化配置、记忆桶管理器、脱水器、衰减引擎
|
||
# - Expose 5 MCP tools:
|
||
# 暴露 5 个 MCP 工具:
|
||
# breath — Surface unresolved memories or search by keyword
|
||
# 浮现未解决记忆 或 按关键词检索
|
||
# hold — Store a single memory
|
||
# 存储单条记忆
|
||
# grow — Diary digest, auto-split into multiple buckets
|
||
# 日记归档,自动拆分多桶
|
||
# trace — Modify metadata / resolved / delete
|
||
# 修改元数据 / resolved 标记 / 删除
|
||
# pulse — System status + bucket listing
|
||
# 系统状态 + 所有桶列表
|
||
#
|
||
# Startup:
|
||
# 启动方式:
|
||
# Local: python server.py
|
||
# Remote: OMBRE_TRANSPORT=streamable-http python server.py
|
||
# Docker: docker-compose up
|
||
# ============================================================
|
||
|
||
import os
|
||
import sys
|
||
import random
|
||
import logging
|
||
import asyncio
|
||
import httpx
|
||
from typing import Optional
|
||
|
||
# --- Ensure same-directory modules can be imported ---
|
||
# --- 确保同目录下的模块能被正确导入 ---
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from mcp.server.fastmcp import FastMCP
|
||
|
||
from bucket_manager import BucketManager
|
||
from dehydrator import Dehydrator
|
||
from decay_engine import DecayEngine
|
||
from utils import load_config, setup_logging
|
||
|
||
# --- Load config & init logging / 加载配置 & 初始化日志 ---
|
||
config = load_config()
|
||
setup_logging(config.get("log_level", "INFO"))
|
||
logger = logging.getLogger("ombre_brain")
|
||
|
||
# --- Initialize three core components / 初始化三大核心组件 ---
|
||
bucket_mgr = BucketManager(config) # Bucket manager / 记忆桶管理器
|
||
dehydrator = Dehydrator(config) # Dehydrator / 脱水器
|
||
decay_engine = DecayEngine(config, bucket_mgr) # Decay engine / 衰减引擎
|
||
|
||
# --- Create MCP server instance / 创建 MCP 服务器实例 ---
|
||
# host="0.0.0.0" so Docker container's SSE is externally reachable
|
||
# stdio mode ignores host (no network)
|
||
mcp = FastMCP(
|
||
"Ombre Brain",
|
||
host="0.0.0.0",
|
||
port=8000,
|
||
)
|
||
|
||
|
||
# =============================================================
|
||
# /health endpoint: lightweight keepalive
|
||
# 轻量保活接口
|
||
# For Cloudflare Tunnel or reverse proxy to ping, preventing idle timeout
|
||
# 供 Cloudflare Tunnel 或反代定期 ping,防止空闲超时断连
|
||
# =============================================================
|
||
@mcp.custom_route("/health", methods=["GET"])
|
||
async def health_check(request):
|
||
from starlette.responses import JSONResponse
|
||
try:
|
||
stats = await bucket_mgr.get_stats()
|
||
return JSONResponse({
|
||
"status": "ok",
|
||
"buckets": stats["permanent_count"] + stats["dynamic_count"],
|
||
"decay_engine": "running" if decay_engine.is_running else "stopped",
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({"status": "error", "detail": str(e)}, status_code=500)
|
||
|
||
|
||
# =============================================================
|
||
# /breath-hook endpoint: Dedicated hook for SessionStart
|
||
# 会话启动专用挂载点
|
||
# =============================================================
|
||
@mcp.custom_route("/breath-hook", methods=["GET"])
|
||
async def breath_hook(request):
|
||
from starlette.responses import PlainTextResponse
|
||
try:
|
||
all_buckets = await bucket_mgr.list_all(include_archive=False)
|
||
# pinned
|
||
pinned = [b for b in all_buckets if b["metadata"].get("pinned") or b["metadata"].get("protected")]
|
||
# top 2 unresolved by score
|
||
unresolved = [b for b in all_buckets
|
||
if not b["metadata"].get("resolved", False)
|
||
and b["metadata"].get("type") != "permanent"
|
||
and not b["metadata"].get("pinned")
|
||
and not b["metadata"].get("protected")]
|
||
scored = sorted(unresolved, key=lambda b: decay_engine.calculate_score(b["metadata"]), reverse=True)
|
||
top = scored[:2]
|
||
|
||
parts = []
|
||
for b in pinned:
|
||
summary = await dehydrator.dehydrate(b["content"], {k: v for k, v in b["metadata"].items() if k != "tags"})
|
||
parts.append(f"📌 [核心准则] {summary}")
|
||
for b in top:
|
||
summary = await dehydrator.dehydrate(b["content"], {k: v for k, v in b["metadata"].items() if k != "tags"})
|
||
await bucket_mgr.touch(b["id"])
|
||
parts.append(summary)
|
||
|
||
if not parts:
|
||
return PlainTextResponse("")
|
||
return PlainTextResponse("[Ombre Brain - 记忆浮现]\n" + "\n---\n".join(parts))
|
||
except Exception as e:
|
||
logger.warning(f"Breath hook failed: {e}")
|
||
return PlainTextResponse("")
|
||
|
||
|
||
# =============================================================
|
||
# Internal helper: merge-or-create
|
||
# 内部辅助:检查是否可合并,可以则合并,否则新建
|
||
# Shared by hold and grow to avoid duplicate logic
|
||
# hold 和 grow 共用,避免重复逻辑
|
||
# =============================================================
|
||
async def _merge_or_create(
|
||
content: str,
|
||
tags: list,
|
||
importance: int,
|
||
domain: list,
|
||
valence: float,
|
||
arousal: float,
|
||
name: str = "",
|
||
) -> tuple[str, bool]:
|
||
"""
|
||
Check if a similar bucket exists for merging; merge if so, create if not.
|
||
Returns (bucket_id_or_name, is_merged).
|
||
检查是否有相似桶可合并,有则合并,无则新建。
|
||
返回 (桶ID或名称, 是否合并)。
|
||
"""
|
||
try:
|
||
existing = await bucket_mgr.search(content, limit=1, domain_filter=domain or None)
|
||
except Exception as e:
|
||
logger.warning(f"Search for merge failed, creating new / 合并搜索失败,新建: {e}")
|
||
existing = []
|
||
|
||
if existing and existing[0].get("score", 0) > config.get("merge_threshold", 75):
|
||
bucket = existing[0]
|
||
# --- Never merge into pinned/protected buckets ---
|
||
# --- 不合并到钉选/保护桶 ---
|
||
if not (bucket["metadata"].get("pinned") or bucket["metadata"].get("protected")):
|
||
try:
|
||
merged = await dehydrator.merge(bucket["content"], content)
|
||
await bucket_mgr.update(
|
||
bucket["id"],
|
||
content=merged,
|
||
tags=list(set(bucket["metadata"].get("tags", []) + tags)),
|
||
importance=max(bucket["metadata"].get("importance", 5), importance),
|
||
domain=list(set(bucket["metadata"].get("domain", []) + domain)),
|
||
valence=valence,
|
||
arousal=arousal,
|
||
)
|
||
return bucket["metadata"].get("name", bucket["id"]), True
|
||
except Exception as e:
|
||
logger.warning(f"Merge failed, creating new / 合并失败,新建: {e}")
|
||
|
||
bucket_id = await bucket_mgr.create(
|
||
content=content,
|
||
tags=tags,
|
||
importance=importance,
|
||
domain=domain,
|
||
valence=valence,
|
||
arousal=arousal,
|
||
name=name or None,
|
||
)
|
||
return bucket_id, False
|
||
|
||
|
||
# =============================================================
|
||
# Tool 1: breath — Breathe
|
||
# 工具 1:breath — 呼吸
|
||
#
|
||
# No args: surface highest-weight unresolved memories (active push)
|
||
# 无参数:浮现权重最高的未解决记忆
|
||
# With args: search by keyword + emotion coordinates
|
||
# 有参数:按关键词+情感坐标检索记忆
|
||
# =============================================================
|
||
@mcp.tool()
|
||
async def breath(
|
||
query: Optional[str] = None,
|
||
max_results: int = 3,
|
||
domain: str = "",
|
||
valence: float = -1,
|
||
arousal: float = -1,
|
||
) -> str:
|
||
"""检索/浮现记忆。不传query或传空=自动浮现,有query=关键词检索。domain逗号分隔,valence/arousal 0~1(-1忽略)。"""
|
||
await decay_engine.ensure_started()
|
||
|
||
# --- No args or empty query: surfacing mode (weight pool active push) ---
|
||
# --- 无参数或空query:浮现模式(权重池主动推送)---
|
||
if not query or not query.strip():
|
||
try:
|
||
all_buckets = await bucket_mgr.list_all(include_archive=False)
|
||
except Exception as e:
|
||
logger.error(f"Failed to list buckets for surfacing / 浮现列桶失败: {e}")
|
||
return "记忆系统暂时无法访问。"
|
||
|
||
# --- Pinned/protected buckets: always surface as core principles ---
|
||
# --- 钉选桶:作为核心准则,始终浮现 ---
|
||
pinned_buckets = [
|
||
b for b in all_buckets
|
||
if b["metadata"].get("pinned") or b["metadata"].get("protected")
|
||
]
|
||
pinned_results = []
|
||
for b in pinned_buckets:
|
||
try:
|
||
clean_meta = {k: v for k, v in b["metadata"].items() if k != "tags"}
|
||
summary = await dehydrator.dehydrate(b["content"], clean_meta)
|
||
pinned_results.append(f"📌 [核心准则] {summary}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to dehydrate pinned bucket / 钉选桶脱水失败: {e}")
|
||
continue
|
||
|
||
# --- Unresolved buckets: surface top 2 by weight ---
|
||
# --- 未解决桶:按权重浮现前 2 条 ---
|
||
unresolved = [
|
||
b for b in all_buckets
|
||
if not b["metadata"].get("resolved", False)
|
||
and b["metadata"].get("type") != "permanent"
|
||
and not b["metadata"].get("pinned", False)
|
||
and not b["metadata"].get("protected", False)
|
||
]
|
||
|
||
scored = sorted(
|
||
unresolved,
|
||
key=lambda b: decay_engine.calculate_score(b["metadata"]),
|
||
reverse=True,
|
||
)
|
||
top = scored[:2]
|
||
dynamic_results = []
|
||
for b in top:
|
||
try:
|
||
clean_meta = {k: v for k, v in b["metadata"].items() if k != "tags"}
|
||
summary = await dehydrator.dehydrate(b["content"], clean_meta)
|
||
await bucket_mgr.touch(b["id"])
|
||
score = decay_engine.calculate_score(b["metadata"])
|
||
dynamic_results.append(f"[权重:{score:.2f}] {summary}")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to dehydrate surfaced bucket / 浮现脱水失败: {e}")
|
||
continue
|
||
|
||
if not pinned_results and not dynamic_results:
|
||
return "权重池平静,没有需要处理的记忆。"
|
||
|
||
parts = []
|
||
if pinned_results:
|
||
parts.append("=== 核心准则 ===\n" + "\n---\n".join(pinned_results))
|
||
if dynamic_results:
|
||
parts.append("=== 浮现记忆 ===\n" + "\n---\n".join(dynamic_results))
|
||
return "\n\n".join(parts)
|
||
|
||
# --- With args: search mode / 有参数:检索模式 ---
|
||
domain_filter = [d.strip() for d in domain.split(",") if d.strip()] or None
|
||
q_valence = valence if 0 <= valence <= 1 else None
|
||
q_arousal = arousal if 0 <= arousal <= 1 else None
|
||
|
||
try:
|
||
matches = await bucket_mgr.search(
|
||
query,
|
||
limit=max_results,
|
||
domain_filter=domain_filter,
|
||
query_valence=q_valence,
|
||
query_arousal=q_arousal,
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Search failed / 检索失败: {e}")
|
||
return "检索过程出错,请稍后重试。"
|
||
|
||
results = []
|
||
for bucket in matches:
|
||
try:
|
||
clean_meta = {k: v for k, v in bucket["metadata"].items() if k != "tags"}
|
||
summary = await dehydrator.dehydrate(bucket["content"], clean_meta)
|
||
await bucket_mgr.touch(bucket["id"])
|
||
results.append(summary)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to dehydrate search result / 检索结果脱水失败: {e}")
|
||
continue
|
||
|
||
# --- Random surfacing: when search returns < 3, 40% chance to float old memories ---
|
||
# --- 随机浮现:检索结果不足 3 条时,40% 概率从低权重旧桶里漂上来 ---
|
||
if len(matches) < 3 and random.random() < 0.4:
|
||
try:
|
||
all_buckets = await bucket_mgr.list_all(include_archive=False)
|
||
matched_ids = {b["id"] for b in matches}
|
||
low_weight = [
|
||
b for b in all_buckets
|
||
if b["id"] not in matched_ids
|
||
and decay_engine.calculate_score(b["metadata"]) < 2.0
|
||
]
|
||
if low_weight:
|
||
drifted = random.sample(low_weight, min(random.randint(1, 3), len(low_weight)))
|
||
drift_results = []
|
||
for b in drifted:
|
||
clean_meta = {k: v for k, v in b["metadata"].items() if k != "tags"}
|
||
summary = await dehydrator.dehydrate(b["content"], clean_meta)
|
||
drift_results.append(f"[surface_type: random]\n{summary}")
|
||
results.append("--- 忽然想起来 ---\n" + "\n---\n".join(drift_results))
|
||
except Exception as e:
|
||
logger.warning(f"Random surfacing failed / 随机浮现失败: {e}")
|
||
|
||
if not results:
|
||
return "未找到相关记忆。"
|
||
|
||
return "\n---\n".join(results)
|
||
|
||
|
||
# =============================================================
|
||
# Tool 2: hold — Hold on to this
|
||
# 工具 2:hold — 握住,留下来
|
||
# =============================================================
|
||
@mcp.tool()
|
||
async def hold(
|
||
content: str,
|
||
tags: str = "",
|
||
importance: int = 5,
|
||
pinned: bool = False,
|
||
) -> str:
|
||
"""存储单条记忆,自动打标+合并。tags逗号分隔,importance 1-10。pinned=True创建永久钉选桶。"""
|
||
await decay_engine.ensure_started()
|
||
|
||
# --- Input validation / 输入校验 ---
|
||
if not content or not content.strip():
|
||
return "内容为空,无法存储。"
|
||
|
||
importance = max(1, min(10, importance))
|
||
extra_tags = [t.strip() for t in tags.split(",") if t.strip()]
|
||
|
||
# --- Step 1: auto-tagging / 自动打标 ---
|
||
try:
|
||
analysis = await dehydrator.analyze(content)
|
||
except Exception as e:
|
||
logger.warning(f"Auto-tagging failed, using defaults / 自动打标失败: {e}")
|
||
analysis = {
|
||
"domain": ["未分类"], "valence": 0.5, "arousal": 0.3,
|
||
"tags": [], "suggested_name": "",
|
||
}
|
||
|
||
domain = analysis["domain"]
|
||
valence = analysis["valence"]
|
||
arousal = analysis["arousal"]
|
||
auto_tags = analysis["tags"]
|
||
suggested_name = analysis.get("suggested_name", "")
|
||
|
||
all_tags = list(dict.fromkeys(auto_tags + extra_tags))
|
||
|
||
# --- Pinned buckets bypass merge and are created directly in permanent dir ---
|
||
# --- 钉选桶跳过合并,直接新建到 permanent 目录 ---
|
||
if pinned:
|
||
bucket_id = await bucket_mgr.create(
|
||
content=content,
|
||
tags=all_tags,
|
||
importance=10,
|
||
domain=domain,
|
||
valence=valence,
|
||
arousal=arousal,
|
||
name=suggested_name or None,
|
||
bucket_type="permanent",
|
||
pinned=True,
|
||
)
|
||
return f"📌钉选→{bucket_id} {','.join(domain)}"
|
||
|
||
# --- Step 2: merge or create / 合并或新建 ---
|
||
result_name, is_merged = await _merge_or_create(
|
||
content=content,
|
||
tags=all_tags,
|
||
importance=importance,
|
||
domain=domain,
|
||
valence=valence,
|
||
arousal=arousal,
|
||
name=suggested_name,
|
||
)
|
||
|
||
action = "合并→" if is_merged else "新建→"
|
||
return f"{action}{result_name} {','.join(domain)}"
|
||
|
||
|
||
# =============================================================
|
||
# Tool 3: grow — Grow, fragments become memories
|
||
# 工具 3:grow — 生长,一天的碎片长成记忆
|
||
# =============================================================
|
||
@mcp.tool()
|
||
async def grow(content: str) -> str:
|
||
"""日记归档,自动拆分为多桶。短内容(<30字)走快速路径。"""
|
||
await decay_engine.ensure_started()
|
||
|
||
if not content or not content.strip():
|
||
return "内容为空,无法整理。"
|
||
|
||
# --- Short content fast path: skip digest, use hold logic directly ---
|
||
# --- 短内容快速路径:跳过 digest 拆分,直接走 hold 逻辑省一次 API ---
|
||
# For very short inputs (like "1"), calling digest is wasteful:
|
||
# it sends the full DIGEST_PROMPT (~800 tokens) to DeepSeek for nothing.
|
||
# Instead, run analyze + create directly.
|
||
if len(content.strip()) < 30:
|
||
logger.info(f"grow short-content fast path: {len(content.strip())} chars")
|
||
try:
|
||
analysis = await dehydrator.analyze(content)
|
||
except Exception as e:
|
||
logger.warning(f"Fast-path analyze failed / 快速路径打标失败: {e}")
|
||
analysis = {
|
||
"domain": ["未分类"], "valence": 0.5, "arousal": 0.3,
|
||
"tags": [], "suggested_name": "",
|
||
}
|
||
result_name, is_merged = await _merge_or_create(
|
||
content=content.strip(),
|
||
tags=analysis.get("tags", []),
|
||
importance=analysis.get("importance", 5) if isinstance(analysis.get("importance"), int) else 5,
|
||
domain=analysis.get("domain", ["未分类"]),
|
||
valence=analysis.get("valence", 0.5),
|
||
arousal=analysis.get("arousal", 0.3),
|
||
name=analysis.get("suggested_name", ""),
|
||
)
|
||
action = "合并" if is_merged else "新建"
|
||
return f"{action} → {result_name} | {','.join(analysis.get('domain', []))} V{analysis.get('valence', 0.5):.1f}/A{analysis.get('arousal', 0.3):.1f}"
|
||
|
||
# --- Step 1: let API split and organize / 让 API 拆分整理 ---
|
||
try:
|
||
items = await dehydrator.digest(content)
|
||
except Exception as e:
|
||
logger.error(f"Diary digest failed / 日记整理失败: {e}")
|
||
return f"日记整理失败: {e}"
|
||
|
||
if not items:
|
||
return "内容为空或整理失败。"
|
||
|
||
results = []
|
||
created = 0
|
||
merged = 0
|
||
|
||
# --- Step 2: merge or create each item (with per-item error handling) ---
|
||
# --- 逐条合并或新建(单条失败不影响其他)---
|
||
for item in items:
|
||
try:
|
||
result_name, is_merged = await _merge_or_create(
|
||
content=item["content"],
|
||
tags=item.get("tags", []),
|
||
importance=item.get("importance", 5),
|
||
domain=item.get("domain", ["未分类"]),
|
||
valence=item.get("valence", 0.5),
|
||
arousal=item.get("arousal", 0.3),
|
||
name=item.get("name", ""),
|
||
)
|
||
|
||
if is_merged:
|
||
results.append(f"📎{result_name}")
|
||
merged += 1
|
||
else:
|
||
results.append(f"📝{item.get('name', result_name)}")
|
||
created += 1
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"Failed to process diary item / 日记条目处理失败: "
|
||
f"{item.get('name', '?')}: {e}"
|
||
)
|
||
results.append(f"⚠️{item.get('name', '?')}")
|
||
|
||
return f"{len(items)}条|新{created}合{merged}\n" + "\n".join(results)
|
||
|
||
|
||
# =============================================================
|
||
# Tool 4: trace — Trace, redraw the outline of a memory
|
||
# 工具 4:trace — 描摹,重新勾勒记忆的轮廓
|
||
# Also handles deletion (delete=True)
|
||
# 同时承接删除功能
|
||
# =============================================================
|
||
@mcp.tool()
|
||
async def trace(
|
||
bucket_id: str,
|
||
name: str = "",
|
||
domain: str = "",
|
||
valence: float = -1,
|
||
arousal: float = -1,
|
||
importance: int = -1,
|
||
tags: str = "",
|
||
resolved: int = -1,
|
||
pinned: int = -1,
|
||
delete: bool = False,
|
||
) -> str:
|
||
"""修改记忆元数据。resolved=1沉底/0激活,pinned=1钉选/0取消,delete=True删除。只传需改的,-1或空=不改。"""
|
||
|
||
if not bucket_id or not bucket_id.strip():
|
||
return "请提供有效的 bucket_id。"
|
||
|
||
# --- Delete mode / 删除模式 ---
|
||
if delete:
|
||
success = await bucket_mgr.delete(bucket_id)
|
||
return f"已遗忘记忆桶: {bucket_id}" if success else f"未找到记忆桶: {bucket_id}"
|
||
|
||
bucket = await bucket_mgr.get(bucket_id)
|
||
if not bucket:
|
||
return f"未找到记忆桶: {bucket_id}"
|
||
|
||
# --- Collect only fields actually passed / 只收集用户实际传入的字段 ---
|
||
updates = {}
|
||
if name:
|
||
updates["name"] = name
|
||
if domain:
|
||
updates["domain"] = [d.strip() for d in domain.split(",") if d.strip()]
|
||
if 0 <= valence <= 1:
|
||
updates["valence"] = valence
|
||
if 0 <= arousal <= 1:
|
||
updates["arousal"] = arousal
|
||
if 1 <= importance <= 10:
|
||
updates["importance"] = importance
|
||
if tags:
|
||
updates["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
|
||
if resolved in (0, 1):
|
||
updates["resolved"] = bool(resolved)
|
||
if pinned in (0, 1):
|
||
updates["pinned"] = bool(pinned)
|
||
if pinned == 1:
|
||
updates["importance"] = 10 # pinned → lock importance
|
||
|
||
if not updates:
|
||
return "没有任何字段需要修改。"
|
||
|
||
success = await bucket_mgr.update(bucket_id, **updates)
|
||
if not success:
|
||
return f"修改失败: {bucket_id}"
|
||
|
||
changed = ", ".join(f"{k}={v}" for k, v in updates.items())
|
||
# Explicit hint about resolved state change semantics
|
||
# 特别提示 resolved 状态变化的语义
|
||
if "resolved" in updates:
|
||
if updates["resolved"]:
|
||
changed += " → 已沉底,只在关键词触发时重新浮现"
|
||
else:
|
||
changed += " → 已重新激活,将参与浮现排序"
|
||
return f"已修改记忆桶 {bucket_id}: {changed}"
|
||
|
||
|
||
# =============================================================
|
||
# Tool 5: pulse — Heartbeat, system status + memory listing
|
||
# 工具 5:pulse — 脉搏,系统状态 + 记忆列表
|
||
# =============================================================
|
||
@mcp.tool()
|
||
async def pulse(include_archive: bool = False) -> str:
|
||
"""系统状态+记忆桶列表。include_archive=True含归档。"""
|
||
try:
|
||
stats = await bucket_mgr.get_stats()
|
||
except Exception as e:
|
||
return f"获取系统状态失败: {e}"
|
||
|
||
status = (
|
||
f"=== Ombre Brain 记忆系统 ===\n"
|
||
f"固化记忆桶: {stats['permanent_count']} 个\n"
|
||
f"动态记忆桶: {stats['dynamic_count']} 个\n"
|
||
f"归档记忆桶: {stats['archive_count']} 个\n"
|
||
f"总存储大小: {stats['total_size_kb']:.1f} KB\n"
|
||
f"衰减引擎: {'运行中' if decay_engine.is_running else '已停止'}\n"
|
||
)
|
||
|
||
# --- List all bucket summaries / 列出所有桶摘要 ---
|
||
try:
|
||
buckets = await bucket_mgr.list_all(include_archive=include_archive)
|
||
except Exception as e:
|
||
return status + f"\n列出记忆桶失败: {e}"
|
||
|
||
if not buckets:
|
||
return status + "\n记忆库为空。"
|
||
|
||
lines = []
|
||
for b in buckets:
|
||
meta = b.get("metadata", {})
|
||
if meta.get("pinned") or meta.get("protected"):
|
||
icon = "📌"
|
||
elif meta.get("type") == "permanent":
|
||
icon = "📦"
|
||
elif meta.get("type") == "archived":
|
||
icon = "🗄️"
|
||
elif meta.get("resolved", False):
|
||
icon = "✅"
|
||
else:
|
||
icon = "💭"
|
||
try:
|
||
score = decay_engine.calculate_score(meta)
|
||
except Exception:
|
||
score = 0.0
|
||
domains = ",".join(meta.get("domain", []))
|
||
val = meta.get("valence", 0.5)
|
||
aro = meta.get("arousal", 0.3)
|
||
resolved_tag = " [已解决]" if meta.get("resolved", False) else ""
|
||
lines.append(
|
||
f"{icon} [{meta.get('name', b['id'])}]{resolved_tag} "
|
||
f"主题:{domains} "
|
||
f"情感:V{val:.1f}/A{aro:.1f} "
|
||
f"重要:{meta.get('importance', '?')} "
|
||
f"权重:{score:.2f} "
|
||
f"标签:{','.join(meta.get('tags', []))}"
|
||
)
|
||
|
||
return status + "\n=== 记忆列表 ===\n" + "\n".join(lines)
|
||
|
||
|
||
# --- Entry point / 启动入口 ---
|
||
if __name__ == "__main__":
|
||
transport = config.get("transport", "stdio")
|
||
logger.info(f"Ombre Brain starting | transport: {transport}")
|
||
|
||
if transport in ("sse", "streamable-http"):
|
||
import threading
|
||
import uvicorn
|
||
from starlette.middleware.cors import CORSMiddleware
|
||
|
||
# --- Application-level keepalive: ping /health every 60s ---
|
||
# --- 应用层保活:每 60 秒 ping 一次 /health,防止 Cloudflare Tunnel 空闲断连 ---
|
||
async def _keepalive_loop():
|
||
await asyncio.sleep(10) # Wait for server to fully start
|
||
async with httpx.AsyncClient() as client:
|
||
while True:
|
||
try:
|
||
await client.get("http://localhost:8000/health", timeout=5)
|
||
logger.debug("Keepalive ping OK / 保活 ping 成功")
|
||
except Exception as e:
|
||
logger.warning(f"Keepalive ping failed / 保活 ping 失败: {e}")
|
||
await asyncio.sleep(60)
|
||
|
||
def _start_keepalive():
|
||
loop = asyncio.new_event_loop()
|
||
loop.run_until_complete(_keepalive_loop())
|
||
|
||
t = threading.Thread(target=_start_keepalive, daemon=True)
|
||
t.start()
|
||
|
||
# --- Add CORS middleware so remote clients (Cloudflare Tunnel / ngrok) can connect ---
|
||
# --- 添加 CORS 中间件,让远程客户端(Cloudflare Tunnel / ngrok)能正常连接 ---
|
||
if transport == "streamable-http":
|
||
_app = mcp.streamable_http_app()
|
||
else:
|
||
_app = mcp.sse_app()
|
||
_app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
expose_headers=["*"],
|
||
)
|
||
logger.info("CORS middleware enabled for remote transport / 已启用 CORS 中间件")
|
||
uvicorn.run(_app, host="0.0.0.0", port=8000)
|
||
else:
|
||
mcp.run(transport=transport)
|