From 529c9cc172517095c8e0ba3771cd04090cdf0254 Mon Sep 17 00:00:00 2001 From: P0luz Date: Tue, 21 Apr 2026 20:11:47 +0800 Subject: [PATCH] chore: dead code cleanup (pyflakes-clean) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused imports: time/asyncio/Path/Optional/re/Counter/jieba across backfill_embeddings, bucket_manager, embedding_engine, import_memory - Drop unused local var `old_primary` in reclassify_domains - Fix 3 placeholder-less f-strings in backfill_embeddings, migrate_to_domains, reclassify_domains - Delete legacy quick-check scripts test_smoke.py / test_tools.py (replaced by tests/ pytest suite, never collected by pytest) - Delete backup_20260405_2124/ (stale local snapshot, no code references) Verified: pyflakes *.py → 0 warnings; pytest tests/ → 237 passed, 7 skipped. --- backfill_embeddings.py | 3 +- backup_20260405_2124/README.md | 205 ------- backup_20260405_2124/bucket_manager.py | 755 ------------------------- backup_20260405_2124/decay_engine.py | 242 -------- backup_20260405_2124/server.py | 536 ------------------ bucket_manager.py | 3 - embedding_engine.py | 2 - import_memory.py | 2 - migrate_to_domains.py | 2 +- reclassify_domains.py | 3 +- test_smoke.py | 126 ----- test_tools.py | 159 ------ 12 files changed, 3 insertions(+), 2035 deletions(-) delete mode 100644 backup_20260405_2124/README.md delete mode 100644 backup_20260405_2124/bucket_manager.py delete mode 100644 backup_20260405_2124/decay_engine.py delete mode 100644 backup_20260405_2124/server.py delete mode 100644 test_smoke.py delete mode 100644 test_tools.py diff --git a/backfill_embeddings.py b/backfill_embeddings.py index 4a06d5f..bb3db75 100644 --- a/backfill_embeddings.py +++ b/backfill_embeddings.py @@ -13,7 +13,6 @@ Free tier: 1500 requests/day, so ~75 batches of 20. import asyncio import argparse import sys -import time sys.path.insert(0, ".") from utils import load_config @@ -79,7 +78,7 @@ async def backfill(batch_size: int = 20, dry_run: bool = False): print(f" ERROR: {b['id'][:12]} ({name[:30]}): {e}") if i + batch_size < total: - print(f" Waiting 2s before next batch...") + print(" Waiting 2s before next batch...") await asyncio.sleep(2) print(f"\n=== Done: {success} success, {failed} failed, {total - success - failed} skipped ===") diff --git a/backup_20260405_2124/README.md b/backup_20260405_2124/README.md deleted file mode 100644 index b6932b0..0000000 --- a/backup_20260405_2124/README.md +++ /dev/null @@ -1,205 +0,0 @@ -# Ombre Brain - -一个给 Claude 用的长期情绪记忆系统。基于 Russell 效价/唤醒度坐标打标,Obsidian 做存储层,MCP 接入,带遗忘曲线。 - -A long-term emotional memory system for Claude. Tags memories using Russell's valence/arousal coordinates, stores them as Obsidian-compatible Markdown, connects via MCP, and has a forgetting curve. - ---- - -## 它是什么 / What is this - -Claude 没有跨对话记忆。每次对话结束,之前聊过的所有东西都会消失。 - -Ombre Brain 给了它一套持久记忆——不是那种冷冰冰的键值存储,而是带情感坐标的、会自然衰减的、像人类记忆一样会遗忘和浮现的系统。 - -Claude has no cross-conversation memory. Everything from a previous chat vanishes once it ends. - -Ombre Brain gives it persistent memory — not cold key-value storage, but a system with emotional coordinates, natural decay, and forgetting/surfacing mechanics that loosely mimic how human memory works. - -核心特点 / Key features: - -- **情感坐标打标 / Emotional tagging**: 每条记忆用 Russell 环形情感模型的 valence(效价)和 arousal(唤醒度)两个连续维度标记。不是"开心/难过"这种离散标签。 - Each memory is tagged with two continuous dimensions from Russell's circumplex model: valence and arousal. Not discrete labels like "happy/sad". - -- **自然遗忘 / Natural forgetting**: 改进版艾宾浩斯遗忘曲线。不活跃的记忆自动衰减归档,高情绪强度的记忆衰减更慢。 - Modified Ebbinghaus forgetting curve. Inactive memories naturally decay and archive. High-arousal memories decay slower. - -- **权重池浮现 / Weight pool surfacing**: 记忆不是被动检索的,它们会主动浮现——未解决的、情绪强烈的记忆权重更高,会在对话开头自动推送。 - Memories aren't just passively retrieved — they actively surface. Unresolved, emotionally intense memories carry higher weight and get pushed at conversation start. - -- **Obsidian 原生 / Obsidian-native**: 每个记忆桶就是一个 Markdown 文件,YAML frontmatter 存元数据。可以直接在 Obsidian 里浏览、编辑、搜索。自动注入 `[[双链]]`。 - Each memory bucket is a Markdown file with YAML frontmatter. Browse, edit, and search directly in Obsidian. Wikilinks are auto-injected. - -- **API 降级 / API degradation**: 脱水压缩和自动打标优先用廉价 LLM API(DeepSeek 等),API 不可用时自动降级到本地关键词分析——始终可用。 - Dehydration and auto-tagging prefer a cheap LLM API (DeepSeek etc.). When the API is unavailable, it degrades to local keyword analysis — always functional. - -## 边界说明 / Design boundaries - -官方记忆功能已经在做身份层的事了——你是谁,你有什么偏好,你们的关系是什么。那一层交给它,Ombre Brain不打算造重复的轮子。 - -Ombre Brain 的边界是时间里发生的事,不是你是谁。它记住的是:你们聊过什么,经历了什么,哪些事情还悬在那里没有解决。两层配合用,才是完整的。 - -每次新对话,Claude 从零开始——但它能从 Ombre Brain 里找回跟你有关的一切。不是重建,是接续。 - ---- - -Official memory already handles the identity layer — who you are, what you prefer, what your relationship is. That layer belongs there. Ombre Brain isn't trying to duplicate it. - -Ombre Brain's boundary is *what happened in time*, not *who you are*. It holds conversations, experiences, unresolved things. The two layers together are what make it feel complete. - -Each new conversation starts fresh — but Claude can reach back through Ombre Brain and find everything that happened between you. Not a rebuild. A continuation. - -## 架构 / Architecture - -``` -Claude ←→ MCP Protocol ←→ server.py - │ - ┌───────────────┼───────────────┐ - │ │ │ - bucket_manager dehydrator decay_engine - (CRUD + 搜索) (压缩 + 打标) (遗忘曲线) - │ - Obsidian Vault (Markdown files) -``` - -5 个 MCP 工具 / 5 MCP tools: - -| 工具 Tool | 作用 Purpose | -|-----------|-------------| -| `breath` | 浮现或检索记忆。无参数=推送未解决记忆;有参数=关键词+情感检索 / Surface or search memories | -| `hold` | 存储单条记忆,自动打标+合并相似桶 / Store a single memory with auto-tagging | -| `grow` | 日记归档,自动拆分长内容为多个记忆桶 / Diary digest, auto-split into multiple buckets | -| `trace` | 修改元数据、标记已解决、删除 / Modify metadata, mark resolved, delete | -| `pulse` | 系统状态 + 所有记忆桶列表 / System status + bucket listing | - -## 安装 / Setup - -### 环境要求 / Requirements - -- Python 3.11+ -- 一个 Obsidian Vault(可选,不用也行,会在项目目录下自建 `buckets/`) - An Obsidian vault (optional — without one, it uses a local `buckets/` directory) - -### 步骤 / Steps - -```bash -git clone https://github.com/P0lar1zzZ/Ombre-Brain.git -cd Ombre-Brain - -python -m venv .venv -source .venv/bin/activate # Windows: .venv\Scripts\activate - -pip install -r requirements.txt -``` - -复制配置文件并按需修改 / Copy config and edit as needed: - -```bash -cp config.example.yaml config.yaml -``` - -如果你要用 API 做脱水压缩和自动打标(推荐,效果好很多),设置环境变量: -If you want API-powered dehydration and tagging (recommended, much better quality): - -```bash -export OMBRE_API_KEY="your-api-key" -``` - -支持任何 OpenAI 兼容 API。在 `config.yaml` 里改 `base_url` 和 `model` 就行。 -Supports any OpenAI-compatible API. Just change `base_url` and `model` in `config.yaml`. - -### 接入 Claude Desktop / Connect to Claude Desktop - -在 Claude Desktop 配置文件中添加(macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`): - -Add to your Claude Desktop config: - -```json -{ - "mcpServers": { - "ombre-brain": { - "command": "python", - "args": ["/path/to/Ombre-Brain/server.py"], - "env": { - "OMBRE_API_KEY": "your-api-key" - } - } - } -} -``` - -### 接入 Claude.ai (远程) / Connect to Claude.ai (remote) - -需要 HTTP 传输 + 隧道。可以用 Docker: -Requires HTTP transport + tunnel. Docker setup: - -```bash -echo "OMBRE_API_KEY=your-api-key" > .env -docker-compose up -d -``` - -`docker-compose.yml` 里配好了 Cloudflare Tunnel。你需要自己在 `~/.cloudflared/` 下放凭证和路由配置。 -The `docker-compose.yml` includes Cloudflare Tunnel. You'll need your own credentials under `~/.cloudflared/`. - -### 指向 Obsidian / Point to Obsidian - -在 `config.yaml` 里设置 `buckets_dir`: -Set `buckets_dir` in `config.yaml`: - -```yaml -buckets_dir: "/path/to/your/Obsidian Vault/Ombre Brain" -``` - -不设的话,默认用项目目录下的 `buckets/`。 -If not set, defaults to `buckets/` in the project directory. - -## 配置 / Configuration - -所有参数在 `config.yaml`(从 `config.example.yaml` 复制)。关键的几个: -All parameters in `config.yaml` (copy from `config.example.yaml`). Key ones: - -| 参数 Parameter | 说明 Description | 默认 Default | -|---|---|---| -| `transport` | `stdio`(本地)/ `streamable-http`(远程)| `stdio` | -| `buckets_dir` | 记忆桶存储路径 / Bucket storage path | `./buckets/` | -| `dehydration.model` | 脱水用的 LLM 模型 / LLM model for dehydration | `deepseek-chat` | -| `dehydration.base_url` | API 地址 / API endpoint | `https://api.deepseek.com/v1` | -| `decay.lambda` | 衰减速率,越大越快忘 / Decay rate | `0.05` | -| `decay.threshold` | 归档阈值 / Archive threshold | `0.3` | -| `merge_threshold` | 合并相似度阈值 (0-100) / Merge similarity | `75` | - -敏感配置用环境变量: -Sensitive config via env vars: -- `OMBRE_API_KEY` — LLM API 密钥 -- `OMBRE_TRANSPORT` — 覆盖传输方式 -- `OMBRE_BUCKETS_DIR` — 覆盖存储路径 - -## 衰减公式 / Decay Formula - -$$Score = Importance \times activation\_count^{0.3} \times e^{-\lambda \times days} \times (base + arousal \times boost)$$ - -- `importance`: 1-10,记忆重要性 / memory importance -- `activation_count`: 被检索的次数,越常被想起衰减越慢 / retrieval count; more recalls = slower decay -- `days`: 距上次激活的天数 / days since last activation -- `arousal`: 唤醒度,越强烈的记忆越难忘 / arousal; intense memories are harder to forget -- 已解决的记忆权重降到 5%,沉底等被关键词唤醒 / resolved memories drop to 5%, sink until keyword-triggered - -## 给 Claude 的使用指南 / Usage Guide for Claude - -`CLAUDE_PROMPT.md` 是写给 Claude 看的使用说明。放到你的 system prompt 或 custom instructions 里就行。 - -`CLAUDE_PROMPT.md` is the usage guide written for Claude. Put it in your system prompt or custom instructions. - -## 工具脚本 / Utility Scripts - -| 脚本 Script | 用途 Purpose | -|---|---| -| `write_memory.py` | 手动写入记忆,绕过 MCP / Manually write memories, bypass MCP | -| `migrate_to_domains.py` | 迁移平铺文件到域子目录 / Migrate flat files to domain subdirs | -| `reclassify_domains.py` | 基于关键词重分类 / Reclassify by keywords | -| `reclassify_api.py` | 用 API 重打标未分类桶 / Re-tag uncategorized buckets via API | -| `test_smoke.py` | 冒烟测试 / Smoke test | - -## License - -MIT diff --git a/backup_20260405_2124/bucket_manager.py b/backup_20260405_2124/bucket_manager.py deleted file mode 100644 index 806583d..0000000 --- a/backup_20260405_2124/bucket_manager.py +++ /dev/null @@ -1,755 +0,0 @@ -# ============================================================ -# Module: Memory Bucket Manager (bucket_manager.py) -# 模块:记忆桶管理器 -# -# CRUD operations, multi-dimensional index search, activation updates -# for memory buckets. -# 记忆桶的增删改查、多维索引搜索、激活更新。 -# -# Core design: -# 核心逻辑: -# - Each bucket = one Markdown file (YAML frontmatter + body) -# 每个记忆桶 = 一个 Markdown 文件 -# - Storage by type: permanent / dynamic / archive -# 存储按类型分目录 -# - Multi-dimensional soft index: domain + valence/arousal + fuzzy text -# 多维软索引:主题域 + 情感坐标 + 文本模糊匹配 -# - Search strategy: domain pre-filter → weighted multi-dim ranking -# 搜索策略:主题域预筛 → 多维加权精排 -# - Emotion coordinates based on Russell circumplex model: -# 情感坐标基于环形情感模型(Russell circumplex): -# valence (0~1): 0=negative → 1=positive -# arousal (0~1): 0=calm → 1=excited -# -# Depended on by: server.py, decay_engine.py -# 被谁依赖:server.py, decay_engine.py -# ============================================================ - -import os -import math -import logging -import re -import shutil -from collections import Counter -from datetime import datetime -from pathlib import Path -from typing import Optional - -import frontmatter -import jieba -from rapidfuzz import fuzz - -from utils import generate_bucket_id, sanitize_name, safe_path, now_iso - -logger = logging.getLogger("ombre_brain.bucket") - - -class BucketManager: - """ - Memory bucket manager — entry point for all bucket CRUD operations. - Buckets are stored as Markdown files with YAML frontmatter for metadata - and body for content. Natively compatible with Obsidian browsing/editing. - 记忆桶管理器 —— 所有桶的 CRUD 操作入口。 - 桶以 Markdown 文件存储,YAML frontmatter 存元数据,正文存内容。 - 天然兼容 Obsidian 直接浏览和编辑。 - """ - - def __init__(self, config: dict): - # --- Read storage paths from config / 从配置中读取存储路径 --- - self.base_dir = config["buckets_dir"] - self.permanent_dir = os.path.join(self.base_dir, "permanent") - self.dynamic_dir = os.path.join(self.base_dir, "dynamic") - self.archive_dir = os.path.join(self.base_dir, "archive") - self.fuzzy_threshold = config.get("matching", {}).get("fuzzy_threshold", 50) - self.max_results = config.get("matching", {}).get("max_results", 5) - - # --- Wikilink config / 双链配置 --- - wikilink_cfg = config.get("wikilink", {}) - self.wikilink_enabled = wikilink_cfg.get("enabled", True) - self.wikilink_use_tags = wikilink_cfg.get("use_tags", False) - self.wikilink_use_domain = wikilink_cfg.get("use_domain", True) - self.wikilink_use_auto_keywords = wikilink_cfg.get("use_auto_keywords", True) - self.wikilink_auto_top_k = wikilink_cfg.get("auto_top_k", 8) - self.wikilink_min_len = wikilink_cfg.get("min_keyword_len", 2) - self.wikilink_exclude_keywords = set(wikilink_cfg.get("exclude_keywords", [])) - self.wikilink_stopwords = { - "的", "了", "在", "是", "我", "有", "和", "就", "不", "人", - "都", "一个", "上", "也", "很", "到", "说", "要", "去", - "你", "会", "着", "没有", "看", "好", "自己", "这", "他", "她", - "我们", "你们", "他们", "然后", "今天", "昨天", "明天", "一下", - "the", "and", "for", "are", "but", "not", "you", "all", "can", - "had", "her", "was", "one", "our", "out", "has", "have", "with", - "this", "that", "from", "they", "been", "said", "will", "each", - } - self.wikilink_stopwords |= {w.lower() for w in self.wikilink_exclude_keywords} - - # --- Search scoring weights / 检索权重配置 --- - scoring = config.get("scoring_weights", {}) - self.w_topic = scoring.get("topic_relevance", 4.0) - self.w_emotion = scoring.get("emotion_resonance", 2.0) - self.w_time = scoring.get("time_proximity", 1.5) - self.w_importance = scoring.get("importance", 1.0) - - # --------------------------------------------------------- - # Create a new bucket - # 创建新桶 - # Write content and metadata into a .md file - # 将内容和元数据写入一个 .md 文件 - # --------------------------------------------------------- - async def create( - self, - content: str, - tags: list[str] = None, - importance: int = 5, - domain: list[str] = None, - valence: float = 0.5, - arousal: float = 0.3, - bucket_type: str = "dynamic", - name: str = None, - ) -> str: - """ - Create a new memory bucket, return bucket ID. - 创建一个新的记忆桶,返回桶 ID。 - """ - bucket_id = generate_bucket_id() - bucket_name = sanitize_name(name) if name else bucket_id - domain = domain or ["未分类"] - tags = tags or [] - linked_content = self._apply_wikilinks(content, tags, domain, bucket_name) - - # --- Build YAML frontmatter metadata / 构建元数据 --- - metadata = { - "id": bucket_id, - "name": bucket_name, - "tags": tags, - "domain": domain, - "valence": max(0.0, min(1.0, valence)), - "arousal": max(0.0, min(1.0, arousal)), - "importance": max(1, min(10, importance)), - "type": bucket_type, - "created": now_iso(), - "last_active": now_iso(), - "activation_count": 1, - } - - # --- Assemble Markdown file (frontmatter + body) --- - # --- 组装 Markdown 文件 --- - post = frontmatter.Post(linked_content, **metadata) - - # --- Choose directory by type + primary domain --- - # --- 按类型 + 主题域选择存储目录 --- - type_dir = self.permanent_dir if bucket_type == "permanent" else self.dynamic_dir - primary_domain = sanitize_name(domain[0]) if domain else "未分类" - target_dir = os.path.join(type_dir, primary_domain) - os.makedirs(target_dir, exist_ok=True) - - # --- Filename: readable_name_bucketID.md (Obsidian friendly) --- - # --- 文件名:可读名称_桶ID.md --- - if bucket_name and bucket_name != bucket_id: - filename = f"{bucket_name}_{bucket_id}.md" - else: - filename = f"{bucket_id}.md" - file_path = safe_path(target_dir, filename) - - try: - with open(file_path, "w", encoding="utf-8") as f: - f.write(frontmatter.dumps(post)) - except OSError as e: - logger.error(f"Failed to write bucket file / 写入桶文件失败: {file_path}: {e}") - raise - - logger.info( - f"Created bucket / 创建记忆桶: {bucket_id} ({bucket_name}) → {primary_domain}/" - ) - return bucket_id - - # --------------------------------------------------------- - # Read bucket content - # 读取桶内容 - # Returns {"id", "metadata", "content", "path"} or None - # --------------------------------------------------------- - async def get(self, bucket_id: str) -> Optional[dict]: - """ - Read a single bucket by ID. - 根据 ID 读取单个桶。 - """ - if not bucket_id or not isinstance(bucket_id, str): - return None - file_path = self._find_bucket_file(bucket_id) - if not file_path: - return None - return self._load_bucket(file_path) - - # --------------------------------------------------------- - # Update bucket - # 更新桶 - # Supports: content, tags, importance, valence, arousal, name, resolved - # --------------------------------------------------------- - async def update(self, bucket_id: str, **kwargs) -> bool: - """ - Update bucket content or metadata fields. - 更新桶的内容或元数据字段。 - """ - file_path = self._find_bucket_file(bucket_id) - if not file_path: - return False - - try: - post = frontmatter.load(file_path) - except Exception as e: - logger.warning(f"Failed to load bucket for update / 加载桶失败: {file_path}: {e}") - return False - - # --- Update only fields that were passed in / 只改传入的字段 --- - if "content" in kwargs: - next_tags = kwargs.get("tags", post.get("tags", [])) - next_domain = kwargs.get("domain", post.get("domain", [])) - next_name = kwargs.get("name", post.get("name", "")) - post.content = self._apply_wikilinks( - kwargs["content"], - next_tags, - next_domain, - next_name, - ) - if "tags" in kwargs: - post["tags"] = kwargs["tags"] - if "importance" in kwargs: - post["importance"] = max(1, min(10, int(kwargs["importance"]))) - if "domain" in kwargs: - post["domain"] = kwargs["domain"] - if "valence" in kwargs: - post["valence"] = max(0.0, min(1.0, float(kwargs["valence"]))) - if "arousal" in kwargs: - post["arousal"] = max(0.0, min(1.0, float(kwargs["arousal"]))) - if "name" in kwargs: - post["name"] = sanitize_name(kwargs["name"]) - if "resolved" in kwargs: - post["resolved"] = bool(kwargs["resolved"]) - - # --- Auto-refresh activation time / 自动刷新激活时间 --- - post["last_active"] = now_iso() - - try: - with open(file_path, "w", encoding="utf-8") as f: - f.write(frontmatter.dumps(post)) - except OSError as e: - logger.error(f"Failed to write bucket update / 写入桶更新失败: {file_path}: {e}") - return False - - logger.info(f"Updated bucket / 更新记忆桶: {bucket_id}") - return True - - # --------------------------------------------------------- - # Wikilink injection - # 自动添加 Obsidian 双链 - # --------------------------------------------------------- - def _apply_wikilinks( - self, - content: str, - tags: list[str], - domain: list[str], - name: str, - ) -> str: - """ - Auto-inject Obsidian wikilinks, avoiding double-wrapping existing [[...]]. - 自动添加 Obsidian 双链,避免重复包裹已有 [[...]]。 - """ - if not self.wikilink_enabled or not content: - return content - - keywords = self._collect_wikilink_keywords(content, tags, domain, name) - if not keywords: - return content - - # Split on existing wikilinks to avoid wrapping them again - # 按已有双链切分,避免重复包裹 - segments = re.split(r"(\[\[[^\]]+\]\])", content) - pattern = re.compile("|".join(re.escape(kw) for kw in keywords)) - for i, segment in enumerate(segments): - if segment.startswith("[[") and segment.endswith("]]"): - continue - updated = pattern.sub(lambda m: f"[[{m.group(0)}]]", segment) - segments[i] = updated - return "".join(segments) - - def _collect_wikilink_keywords( - self, - content: str, - tags: list[str], - domain: list[str], - name: str, - ) -> list[str]: - """ - Collect candidate keywords from tags/domain/auto-extraction. - 汇总候选关键词:可选 tags/domain + 自动提词。 - """ - candidates = [] - - if self.wikilink_use_tags: - candidates.extend(tags or []) - if self.wikilink_use_domain: - candidates.extend(domain or []) - if name: - candidates.append(name) - if self.wikilink_use_auto_keywords: - candidates.extend(self._extract_auto_keywords(content)) - - return self._normalize_keywords(candidates) - - def _normalize_keywords(self, keywords: list[str]) -> list[str]: - """ - Deduplicate and sort by length (longer first to avoid short words - breaking long ones during replacement). - 去重并按长度排序,优先替换长词。 - """ - if not keywords: - return [] - - seen = set() - cleaned = [] - for keyword in keywords: - if not isinstance(keyword, str): - continue - kw = keyword.strip() - if len(kw) < self.wikilink_min_len: - continue - if kw in self.wikilink_exclude_keywords: - continue - if kw.lower() in self.wikilink_stopwords: - continue - if kw in seen: - continue - seen.add(kw) - cleaned.append(kw) - - return sorted(cleaned, key=len, reverse=True) - - def _extract_auto_keywords(self, content: str) -> list[str]: - """ - Auto-extract keywords from body text, prioritizing high-frequency words. - 从正文自动提词,优先高频词。 - """ - if not content: - return [] - - try: - zh_words = [w.strip() for w in jieba.lcut(content) if w.strip()] - except Exception: - zh_words = [] - en_words = re.findall(r"[A-Za-z][A-Za-z0-9_-]{2,20}", content) - - # Chinese bigrams / 中文双词组合 - zh_bigrams = [] - for i in range(len(zh_words) - 1): - left = zh_words[i] - right = zh_words[i + 1] - if len(left) < self.wikilink_min_len or len(right) < self.wikilink_min_len: - continue - if not re.fullmatch(r"[\u4e00-\u9fff]+", left + right): - continue - if len(left + right) > 8: - continue - zh_bigrams.append(left + right) - - merged = [] - for word in zh_words + zh_bigrams + en_words: - if len(word) < self.wikilink_min_len: - continue - if re.fullmatch(r"\d+", word): - continue - if word.lower() in self.wikilink_stopwords: - continue - merged.append(word) - - if not merged: - return [] - - counter = Counter(merged) - return [w for w, _ in counter.most_common(self.wikilink_auto_top_k)] - - # --------------------------------------------------------- - # Delete bucket - # 删除桶 - # --------------------------------------------------------- - async def delete(self, bucket_id: str) -> bool: - """ - Delete a memory bucket file. - 删除指定的记忆桶文件。 - """ - file_path = self._find_bucket_file(bucket_id) - if not file_path: - return False - - try: - os.remove(file_path) - except OSError as e: - logger.error(f"Failed to delete bucket file / 删除桶文件失败: {file_path}: {e}") - return False - - logger.info(f"Deleted bucket / 删除记忆桶: {bucket_id}") - return True - - # --------------------------------------------------------- - # Touch bucket (refresh activation time + increment count) - # 触碰桶(刷新激活时间 + 累加激活次数) - # Called on every recall hit; affects decay score. - # 每次检索命中时调用,影响衰减得分。 - # --------------------------------------------------------- - async def touch(self, bucket_id: str) -> None: - """ - Update a bucket's last activation time and count. - 更新桶的最后激活时间和激活次数。 - """ - file_path = self._find_bucket_file(bucket_id) - if not file_path: - return - - try: - post = frontmatter.load(file_path) - post["last_active"] = now_iso() - post["activation_count"] = post.get("activation_count", 0) + 1 - - with open(file_path, "w", encoding="utf-8") as f: - f.write(frontmatter.dumps(post)) - except Exception as e: - logger.warning(f"Failed to touch bucket / 触碰桶失败: {bucket_id}: {e}") - - # --------------------------------------------------------- - # Multi-dimensional search (core feature) - # 多维搜索(核心功能) - # - # Strategy: domain pre-filter → weighted multi-dim ranking - # 策略:主题域预筛 → 多维加权精排 - # - # Ranking formula: - # total = topic(×w_topic) + emotion(×w_emotion) - # + time(×w_time) + importance(×w_importance) - # - # Per-dimension scores (normalized to 0~1): - # topic = rapidfuzz weighted match (name/tags/domain/body) - # emotion = 1 - Euclidean distance (query v/a vs bucket v/a) - # time = e^(-0.02 × days) (recent memories first) - # importance = importance / 10 - # --------------------------------------------------------- - async def search( - self, - query: str, - limit: int = None, - domain_filter: list[str] = None, - query_valence: float = None, - query_arousal: float = None, - ) -> list[dict]: - """ - Multi-dimensional indexed search for memory buckets. - 多维索引搜索记忆桶。 - - domain_filter: pre-filter by domain (None = search all) - query_valence/arousal: emotion coordinates for resonance scoring - """ - if not query or not query.strip(): - return [] - - limit = limit or self.max_results - all_buckets = await self.list_all(include_archive=False) - - if not all_buckets: - return [] - - # --- Layer 1: domain pre-filter (fast scope reduction) --- - # --- 第一层:主题域预筛(快速缩小范围)--- - if domain_filter: - filter_set = {d.lower() for d in domain_filter} - candidates = [ - b for b in all_buckets - if {d.lower() for d in b["metadata"].get("domain", [])} & filter_set - ] - # Fall back to full search if pre-filter yields nothing - # 预筛为空则回退全量搜索 - if not candidates: - candidates = all_buckets - else: - candidates = all_buckets - - # --- Layer 2: weighted multi-dim ranking --- - # --- 第二层:多维加权精排 --- - scored = [] - for bucket in candidates: - meta = bucket.get("metadata", {}) - - try: - # Dim 1: topic relevance (fuzzy text, 0~1) - topic_score = self._calc_topic_score(query, bucket) - - # Dim 2: emotion resonance (coordinate distance, 0~1) - emotion_score = self._calc_emotion_score( - query_valence, query_arousal, meta - ) - - # Dim 3: time proximity (exponential decay, 0~1) - time_score = self._calc_time_score(meta) - - # Dim 4: importance (direct normalization) - importance_score = max(1, min(10, int(meta.get("importance", 5)))) / 10.0 - - # --- Weighted sum / 加权求和 --- - total = ( - topic_score * self.w_topic - + emotion_score * self.w_emotion - + time_score * self.w_time - + importance_score * self.w_importance - ) - # Normalize to 0~100 for readability - weight_sum = self.w_topic + self.w_emotion + self.w_time + self.w_importance - normalized = (total / weight_sum) * 100 if weight_sum > 0 else 0 - - # Resolved buckets get ranking penalty (but still reachable by keyword) - # 已解决的桶降权排序(但仍可被关键词激活) - if meta.get("resolved", False): - normalized *= 0.3 - - if normalized >= self.fuzzy_threshold: - bucket["score"] = round(normalized, 2) - scored.append(bucket) - except Exception as e: - logger.warning( - f"Scoring failed for bucket {bucket.get('id', '?')} / " - f"桶评分失败: {e}" - ) - continue - - scored.sort(key=lambda x: x["score"], reverse=True) - return scored[:limit] - - # --------------------------------------------------------- - # Topic relevance sub-score: - # name(×3) + domain(×2.5) + tags(×2) + body(×1) - # 文本相关性子分:桶名(×3) + 主题域(×2.5) + 标签(×2) + 正文(×1) - # --------------------------------------------------------- - def _calc_topic_score(self, query: str, bucket: dict) -> float: - """ - Calculate text dimension relevance score (0~1). - 计算文本维度的相关性得分。 - """ - meta = bucket.get("metadata", {}) - - name_score = fuzz.partial_ratio(query, meta.get("name", "")) * 3 - domain_score = ( - max( - (fuzz.partial_ratio(query, d) for d in meta.get("domain", [])), - default=0, - ) - * 2.5 - ) - tag_score = ( - max( - (fuzz.partial_ratio(query, tag) for tag in meta.get("tags", [])), - default=0, - ) - * 2 - ) - content_score = fuzz.partial_ratio(query, bucket.get("content", "")[:500]) * 1 - - return (name_score + domain_score + tag_score + content_score) / (100 * 8.5) - - # --------------------------------------------------------- - # Emotion resonance sub-score: - # Based on Russell circumplex Euclidean distance - # 情感共鸣子分:基于环形情感模型的欧氏距离 - # No emotion in query → neutral 0.5 (doesn't affect ranking) - # --------------------------------------------------------- - def _calc_emotion_score( - self, q_valence: float, q_arousal: float, meta: dict - ) -> float: - """ - Calculate emotion resonance score (0~1, closer = higher). - 计算情感共鸣度(0~1,越近越高)。 - """ - if q_valence is None or q_arousal is None: - return 0.5 # No emotion coordinates → neutral / 无情感坐标时给中性分 - - try: - b_valence = float(meta.get("valence", 0.5)) - b_arousal = float(meta.get("arousal", 0.3)) - except (ValueError, TypeError): - return 0.5 - - # Euclidean distance, max sqrt(2) ≈ 1.414 - dist = math.sqrt((q_valence - b_valence) ** 2 + (q_arousal - b_arousal) ** 2) - return max(0.0, 1.0 - dist / 1.414) - - # --------------------------------------------------------- - # Time proximity sub-score: - # More recent activation → higher score - # 时间亲近子分:距上次激活越近分越高 - # --------------------------------------------------------- - def _calc_time_score(self, meta: dict) -> float: - """ - Calculate time proximity score (0~1, more recent = higher). - 计算时间亲近度。 - """ - last_active_str = meta.get("last_active", meta.get("created", "")) - try: - last_active = datetime.fromisoformat(str(last_active_str)) - days = max(0.0, (datetime.now() - last_active).total_seconds() / 86400) - except (ValueError, TypeError): - days = 30 - return math.exp(-0.02 * days) - - # --------------------------------------------------------- - # List all buckets - # 列出所有桶 - # --------------------------------------------------------- - async def list_all(self, include_archive: bool = False) -> list[dict]: - """ - Recursively walk directories (including domain subdirs), list all buckets. - 递归遍历目录(含域子目录),列出所有记忆桶。 - """ - buckets = [] - - dirs = [self.permanent_dir, self.dynamic_dir] - if include_archive: - dirs.append(self.archive_dir) - - for dir_path in dirs: - if not os.path.exists(dir_path): - continue - for root, _, files in os.walk(dir_path): - for filename in files: - if not filename.endswith(".md"): - continue - file_path = os.path.join(root, filename) - bucket = self._load_bucket(file_path) - if bucket: - buckets.append(bucket) - - return buckets - - # --------------------------------------------------------- - # Statistics (counts per category + total size) - # 统计信息(各分类桶数量 + 总体积) - # --------------------------------------------------------- - async def get_stats(self) -> dict: - """ - Return memory bucket statistics (including domain subdirs). - 返回记忆桶的统计数据。 - """ - stats = { - "permanent_count": 0, - "dynamic_count": 0, - "archive_count": 0, - "total_size_kb": 0.0, - "domains": {}, - } - - for subdir, key in [ - (self.permanent_dir, "permanent_count"), - (self.dynamic_dir, "dynamic_count"), - (self.archive_dir, "archive_count"), - ]: - if not os.path.exists(subdir): - continue - for root, _, files in os.walk(subdir): - for f in files: - if f.endswith(".md"): - stats[key] += 1 - fpath = os.path.join(root, f) - try: - stats["total_size_kb"] += os.path.getsize(fpath) / 1024 - except OSError: - pass - # Per-domain counts / 每个域的桶数量 - domain_name = os.path.basename(root) - if domain_name != os.path.basename(subdir): - stats["domains"][domain_name] = stats["domains"].get(domain_name, 0) + 1 - - return stats - - # --------------------------------------------------------- - # Archive bucket (move from permanent/dynamic into archive) - # 归档桶(从 permanent/dynamic 移入 archive) - # Called by decay engine to simulate "forgetting" - # 由衰减引擎调用,模拟"遗忘" - # --------------------------------------------------------- - async def archive(self, bucket_id: str) -> bool: - """ - Move a bucket into the archive directory (preserving domain subdirs). - 将指定桶移入归档目录(保留域子目录结构)。 - """ - file_path = self._find_bucket_file(bucket_id) - if not file_path: - return False - - try: - # Read once, get domain info and update type / 一次性读取 - post = frontmatter.load(file_path) - domain = post.get("domain", ["未分类"]) - primary_domain = sanitize_name(domain[0]) if domain else "未分类" - archive_subdir = os.path.join(self.archive_dir, primary_domain) - os.makedirs(archive_subdir, exist_ok=True) - - dest = safe_path(archive_subdir, os.path.basename(file_path)) - - # Update type marker then move file / 更新类型标记后移动文件 - post["type"] = "archived" - with open(file_path, "w", encoding="utf-8") as f: - f.write(frontmatter.dumps(post)) - - # Use shutil.move for cross-filesystem safety - # 使用 shutil.move 保证跨文件系统安全 - shutil.move(file_path, str(dest)) - except Exception as e: - logger.error( - f"Failed to archive bucket / 归档桶失败: {bucket_id}: {e}" - ) - return False - - logger.info(f"Archived bucket / 归档记忆桶: {bucket_id} → archive/{primary_domain}/") - return True - - # --------------------------------------------------------- - # Internal: find bucket file across all three directories - # 内部:在三个目录中查找桶文件 - # --------------------------------------------------------- - def _find_bucket_file(self, bucket_id: str) -> Optional[str]: - """ - Recursively search permanent/dynamic/archive for a bucket file - matching the given ID. - 在 permanent/dynamic/archive 中递归查找指定 ID 的桶文件。 - """ - if not bucket_id: - return None - for dir_path in [self.permanent_dir, self.dynamic_dir, self.archive_dir]: - if not os.path.exists(dir_path): - continue - for root, _, files in os.walk(dir_path): - for fname in files: - if not fname.endswith(".md"): - continue - # Match by exact ID segment in filename - # 通过文件名中的 ID 片段精确匹配 - if bucket_id in fname: - return os.path.join(root, fname) - return None - - # --------------------------------------------------------- - # Internal: load bucket data from .md file - # 内部:从 .md 文件加载桶数据 - # --------------------------------------------------------- - def _load_bucket(self, file_path: str) -> Optional[dict]: - """ - Parse a Markdown file and return structured bucket data. - 解析 Markdown 文件,返回桶的结构化数据。 - """ - try: - post = frontmatter.load(file_path) - return { - "id": post.get("id", Path(file_path).stem), - "metadata": dict(post.metadata), - "content": post.content, - "path": file_path, - } - except Exception as e: - logger.warning( - f"Failed to load bucket file / 加载桶文件失败: {file_path}: {e}" - ) - return None diff --git a/backup_20260405_2124/decay_engine.py b/backup_20260405_2124/decay_engine.py deleted file mode 100644 index 93185d0..0000000 --- a/backup_20260405_2124/decay_engine.py +++ /dev/null @@ -1,242 +0,0 @@ -# ============================================================ -# Module: Memory Decay Engine (decay_engine.py) -# 模块:记忆衰减引擎 -# -# Simulates human forgetting curve; auto-decays inactive memories and archives them. -# 模拟人类遗忘曲线,自动衰减不活跃记忆并归档。 -# -# Core formula (improved Ebbinghaus + emotion coordinates): -# 核心公式(改进版艾宾浩斯遗忘曲线 + 情感坐标): -# Score = Importance × (activation_count^0.3) × e^(-λ×days) × emotion_weight -# -# Emotion weight (continuous coordinate, not discrete labels): -# 情感权重(基于连续坐标而非离散列举): -# emotion_weight = base + (arousal × arousal_boost) -# Higher arousal → higher emotion weight → slower decay -# 唤醒度越高 → 情感权重越大 → 记忆衰减越慢 -# -# Depended on by: server.py -# 被谁依赖:server.py -# ============================================================ - -import math -import asyncio -import logging -from datetime import datetime - -logger = logging.getLogger("ombre_brain.decay") - - -class DecayEngine: - """ - Memory decay engine — periodically scans all dynamic buckets, - calculates decay scores, auto-archives low-activity buckets - to simulate natural forgetting. - 记忆衰减引擎 —— 定期扫描所有动态桶, - 计算衰减得分,将低活跃桶自动归档,模拟自然遗忘。 - """ - - def __init__(self, config: dict, bucket_mgr): - # --- Load decay parameters / 加载衰减参数 --- - decay_cfg = config.get("decay", {}) - self.decay_lambda = decay_cfg.get("lambda", 0.05) - self.threshold = decay_cfg.get("threshold", 0.3) - self.check_interval = decay_cfg.get("check_interval_hours", 24) - - # --- Emotion weight params (continuous arousal coordinate) --- - # --- 情感权重参数(基于连续 arousal 坐标)--- - emotion_cfg = decay_cfg.get("emotion_weights", {}) - self.emotion_base = emotion_cfg.get("base", 1.0) - self.arousal_boost = emotion_cfg.get("arousal_boost", 0.8) - - self.bucket_mgr = bucket_mgr - - # --- Background task control / 后台任务控制 --- - self._task: asyncio.Task | None = None - self._running = False - - @property - def is_running(self) -> bool: - """Whether the decay engine is running in the background. - 衰减引擎是否正在后台运行。""" - return self._running - - # --------------------------------------------------------- - # Core: calculate decay score for a single bucket - # 核心:计算单个桶的衰减得分 - # - # Higher score = more vivid memory; below threshold → archive - # 得分越高 = 记忆越鲜活,低于阈值则归档 - # Permanent buckets never decay / 固化桶永远不衰减 - # --------------------------------------------------------- - def calculate_score(self, metadata: dict) -> float: - """ - Calculate current activity score for a memory bucket. - 计算一个记忆桶的当前活跃度得分。 - - Formula: Score = Importance × (act_count^0.3) × e^(-λ×days) × (base + arousal×boost) - """ - if not isinstance(metadata, dict): - return 0.0 - - # --- Permanent buckets never decay / 固化桶永不衰减 --- - if metadata.get("type") == "permanent": - return 999.0 - - importance = max(1, min(10, int(metadata.get("importance", 5)))) - activation_count = max(1, int(metadata.get("activation_count", 1))) - - # --- Days since last activation / 距离上次激活过了多少天 --- - last_active_str = metadata.get("last_active", metadata.get("created", "")) - try: - last_active = datetime.fromisoformat(str(last_active_str)) - days_since = max(0.0, (datetime.now() - last_active).total_seconds() / 86400) - except (ValueError, TypeError): - days_since = 30 # Parse failure → assume 30 days / 解析失败假设已过 30 天 - - # --- Emotion weight: continuous arousal coordinate --- - # --- 情感权重:基于连续 arousal 坐标计算 --- - # Higher arousal → stronger emotion → higher weight → slower decay - # arousal 越高 → 情感越强烈 → 权重越大 → 衰减越慢 - try: - arousal = max(0.0, min(1.0, float(metadata.get("arousal", 0.3)))) - except (ValueError, TypeError): - arousal = 0.3 - emotion_weight = self.emotion_base + arousal * self.arousal_boost - - # --- Apply decay formula / 套入衰减公式 --- - score = ( - importance - * (activation_count ** 0.3) - * math.exp(-self.decay_lambda * days_since) - * emotion_weight - ) - - # --- Weight pool modifiers / 权重池修正因子 --- - # Resolved events drop to 5%, sink to bottom awaiting keyword reactivation - # 已解决的事件权重骤降到 5%,沉底等待关键词激活 - resolved_factor = 0.05 if metadata.get("resolved", False) else 1.0 - # High-arousal unresolved buckets get urgency boost for priority surfacing - # 高唤醒未解决桶额外加成,优先浮现 - urgency_boost = 1.5 if (arousal > 0.7 and not metadata.get("resolved", False)) else 1.0 - - return round(score * resolved_factor * urgency_boost, 4) - - # --------------------------------------------------------- - # Execute one decay cycle - # 执行一轮衰减周期 - # Scan all dynamic buckets → score → archive those below threshold - # 扫描所有动态桶 → 算分 → 低于阈值的归档 - # --------------------------------------------------------- - async def run_decay_cycle(self) -> dict: - """ - Execute one decay cycle: iterate dynamic buckets, archive those - scoring below threshold. - 执行一轮衰减:遍历动态桶,归档得分低于阈值的桶。 - - Returns stats: {"checked": N, "archived": N, "lowest_score": X} - """ - try: - buckets = await self.bucket_mgr.list_all(include_archive=False) - except Exception as e: - logger.error(f"Failed to list buckets for decay / 衰减周期列桶失败: {e}") - return {"checked": 0, "archived": 0, "lowest_score": 0, "error": str(e)} - - checked = 0 - archived = 0 - lowest_score = float("inf") - - for bucket in buckets: - meta = bucket.get("metadata", {}) - - # Skip permanent buckets / 跳过固化桶 - if meta.get("type") == "permanent": - continue - - checked += 1 - try: - score = self.calculate_score(meta) - except Exception as e: - logger.warning( - f"Score calculation failed for {bucket.get('id', '?')} / " - f"计算得分失败: {e}" - ) - continue - - lowest_score = min(lowest_score, score) - - # --- Below threshold → archive (simulate forgetting) --- - # --- 低于阈值 → 归档(模拟遗忘)--- - if score < self.threshold: - try: - success = await self.bucket_mgr.archive(bucket["id"]) - if success: - archived += 1 - logger.info( - f"Decay archived / 衰减归档: " - f"{meta.get('name', bucket['id'])} " - f"(score={score:.4f}, threshold={self.threshold})" - ) - except Exception as e: - logger.warning( - f"Archive failed for {bucket.get('id', '?')} / " - f"归档失败: {e}" - ) - - result = { - "checked": checked, - "archived": archived, - "lowest_score": lowest_score if checked > 0 else 0, - } - logger.info(f"Decay cycle complete / 衰减周期完成: {result}") - return result - - # --------------------------------------------------------- - # Background decay task management - # 后台衰减任务管理 - # --------------------------------------------------------- - async def ensure_started(self) -> None: - """ - Ensure the decay engine is started (lazy init on first call). - 确保衰减引擎已启动(懒加载,首次调用时启动)。 - """ - if not self._running: - await self.start() - - async def start(self) -> None: - """Start the background decay loop. - 启动后台衰减循环。""" - if self._running: - return - self._running = True - self._task = asyncio.create_task(self._background_loop()) - logger.info( - f"Decay engine started, interval: {self.check_interval}h / " - f"衰减引擎已启动,检查间隔: {self.check_interval} 小时" - ) - - async def stop(self) -> None: - """Stop the background decay loop. - 停止后台衰减循环。""" - self._running = False - if self._task: - self._task.cancel() - try: - await self._task - except asyncio.CancelledError: - pass - logger.info("Decay engine stopped / 衰减引擎已停止") - - async def _background_loop(self) -> None: - """Background loop: run decay → sleep → repeat. - 后台循环体:执行衰减 → 睡眠 → 重复。""" - while self._running: - try: - await self.run_decay_cycle() - except Exception as e: - logger.error(f"Decay cycle error / 衰减周期出错: {e}") - # --- Wait for next cycle / 等待下一个周期 --- - try: - await asyncio.sleep(self.check_interval * 3600) - except asyncio.CancelledError: - break diff --git a/backup_20260405_2124/server.py b/backup_20260405_2124/server.py deleted file mode 100644 index 1333322..0000000 --- a/backup_20260405_2124/server.py +++ /dev/null @@ -1,536 +0,0 @@ -# ============================================================ -# Module: MCP Server Entry Point (server.py) -# 模块:MCP 服务器主入口 -# -# Starts the Ombre Brain MCP service and registers memory -# operation tools for Claude to call. -# 启动 Ombre Brain MCP 服务,注册记忆操作工具供 Claude 调用。 -# -# Core responsibilities: -# 核心职责: -# - Initialize config, bucket manager, dehydrator, decay engine -# 初始化配置、记忆桶管理器、脱水器、衰减引擎 -# - Expose 5 MCP tools: -# 暴露 5 个 MCP 工具: -# breath — Surface unresolved memories or search by keyword -# 浮现未解决记忆 或 按关键词检索 -# hold — Store a single memory -# 存储单条记忆 -# grow — Diary digest, auto-split into multiple buckets -# 日记归档,自动拆分多桶 -# trace — Modify metadata / resolved / delete -# 修改元数据 / resolved 标记 / 删除 -# pulse — System status + bucket listing -# 系统状态 + 所有桶列表 -# -# Startup: -# 启动方式: -# Local: python server.py -# Remote: OMBRE_TRANSPORT=streamable-http python server.py -# Docker: docker-compose up -# ============================================================ - -import os -import sys -import random -import logging -import asyncio -import httpx - -# --- Ensure same-directory modules can be imported --- -# --- 确保同目录下的模块能被正确导入 --- -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from mcp.server.fastmcp import FastMCP - -from bucket_manager import BucketManager -from dehydrator import Dehydrator -from decay_engine import DecayEngine -from utils import load_config, setup_logging - -# --- Load config & init logging / 加载配置 & 初始化日志 --- -config = load_config() -setup_logging(config.get("log_level", "INFO")) -logger = logging.getLogger("ombre_brain") - -# --- Initialize three core components / 初始化三大核心组件 --- -bucket_mgr = BucketManager(config) # Bucket manager / 记忆桶管理器 -dehydrator = Dehydrator(config) # Dehydrator / 脱水器 -decay_engine = DecayEngine(config, bucket_mgr) # Decay engine / 衰减引擎 - -# --- Create MCP server instance / 创建 MCP 服务器实例 --- -# host="0.0.0.0" so Docker container's SSE is externally reachable -# stdio mode ignores host (no network) -mcp = FastMCP( - "Ombre Brain", - host="0.0.0.0", - port=8000, -) - - -# ============================================================= -# /health endpoint: lightweight keepalive -# 轻量保活接口 -# For Cloudflare Tunnel or reverse proxy to ping, preventing idle timeout -# 供 Cloudflare Tunnel 或反代定期 ping,防止空闲超时断连 -# ============================================================= -@mcp.custom_route("/health", methods=["GET"]) -async def health_check(request): - from starlette.responses import JSONResponse - try: - stats = await bucket_mgr.get_stats() - return JSONResponse({ - "status": "ok", - "buckets": stats["permanent_count"] + stats["dynamic_count"], - "decay_engine": "running" if decay_engine.is_running else "stopped", - }) - except Exception as e: - return JSONResponse({"status": "error", "detail": str(e)}, status_code=500) - - -# ============================================================= -# Internal helper: merge-or-create -# 内部辅助:检查是否可合并,可以则合并,否则新建 -# Shared by hold and grow to avoid duplicate logic -# hold 和 grow 共用,避免重复逻辑 -# ============================================================= -async def _merge_or_create( - content: str, - tags: list, - importance: int, - domain: list, - valence: float, - arousal: float, - name: str = "", -) -> tuple[str, bool]: - """ - Check if a similar bucket exists for merging; merge if so, create if not. - Returns (bucket_id_or_name, is_merged). - 检查是否有相似桶可合并,有则合并,无则新建。 - 返回 (桶ID或名称, 是否合并)。 - """ - try: - existing = await bucket_mgr.search(content, limit=1) - except Exception as e: - logger.warning(f"Search for merge failed, creating new / 合并搜索失败,新建: {e}") - existing = [] - - if existing and existing[0].get("score", 0) > config.get("merge_threshold", 75): - bucket = existing[0] - try: - merged = await dehydrator.merge(bucket["content"], content) - await bucket_mgr.update( - bucket["id"], - content=merged, - tags=list(set(bucket["metadata"].get("tags", []) + tags)), - importance=max(bucket["metadata"].get("importance", 5), importance), - domain=list(set(bucket["metadata"].get("domain", []) + domain)), - valence=valence, - arousal=arousal, - ) - return bucket["metadata"].get("name", bucket["id"]), True - except Exception as e: - logger.warning(f"Merge failed, creating new / 合并失败,新建: {e}") - - bucket_id = await bucket_mgr.create( - content=content, - tags=tags, - importance=importance, - domain=domain, - valence=valence, - arousal=arousal, - name=name or None, - ) - return bucket_id, False - - -# ============================================================= -# Tool 1: breath — Breathe -# 工具 1:breath — 呼吸 -# -# No args: surface highest-weight unresolved memories (active push) -# 无参数:浮现权重最高的未解决记忆 -# With args: search by keyword + emotion coordinates -# 有参数:按关键词+情感坐标检索记忆 -# ============================================================= -@mcp.tool() -async def breath( - query: str = "", - max_results: int = 3, - domain: str = "", - valence: float = -1, - arousal: float = -1, -) -> str: - """检索记忆或浮现未解决记忆。query 为空时自动推送权重最高的未解决桶;有 query 时按关键词+情感检索。domain 逗号分隔,valence/arousal 传 0~1 启用情感共鸣,-1 忽略。""" - await decay_engine.ensure_started() - - # --- No args: surfacing mode (weight pool active push) --- - # --- 无参数:浮现模式(权重池主动推送)--- - if not query.strip(): - try: - all_buckets = await bucket_mgr.list_all(include_archive=False) - except Exception as e: - logger.error(f"Failed to list buckets for surfacing / 浮现列桶失败: {e}") - return "记忆系统暂时无法访问。" - - unresolved = [ - b for b in all_buckets - if not b["metadata"].get("resolved", False) - and b["metadata"].get("type") != "permanent" - ] - if not unresolved: - return "权重池平静,没有需要处理的记忆。" - - scored = sorted( - unresolved, - key=lambda b: decay_engine.calculate_score(b["metadata"]), - reverse=True, - ) - top = scored[:2] - results = [] - for b in top: - try: - summary = await dehydrator.dehydrate(b["content"], b["metadata"]) - await bucket_mgr.touch(b["id"]) - score = decay_engine.calculate_score(b["metadata"]) - results.append(f"[权重:{score:.2f}] {summary}") - except Exception as e: - logger.warning(f"Failed to dehydrate surfaced bucket / 浮现脱水失败: {e}") - continue - if not results: - return "权重池平静,没有需要处理的记忆。" - return "=== 浮现记忆 ===\n" + "\n---\n".join(results) - - # --- With args: search mode / 有参数:检索模式 --- - domain_filter = [d.strip() for d in domain.split(",") if d.strip()] or None - q_valence = valence if 0 <= valence <= 1 else None - q_arousal = arousal if 0 <= arousal <= 1 else None - - try: - matches = await bucket_mgr.search( - query, - limit=max_results, - domain_filter=domain_filter, - query_valence=q_valence, - query_arousal=q_arousal, - ) - except Exception as e: - logger.error(f"Search failed / 检索失败: {e}") - return "检索过程出错,请稍后重试。" - - results = [] - for bucket in matches: - try: - summary = await dehydrator.dehydrate(bucket["content"], bucket["metadata"]) - await bucket_mgr.touch(bucket["id"]) - results.append(summary) - except Exception as e: - logger.warning(f"Failed to dehydrate search result / 检索结果脱水失败: {e}") - continue - - # --- Random surfacing: when search returns < 3, 40% chance to float old memories --- - # --- 随机浮现:检索结果不足 3 条时,40% 概率从低权重旧桶里漂上来 --- - if len(matches) < 3 and random.random() < 0.4: - try: - all_buckets = await bucket_mgr.list_all(include_archive=False) - matched_ids = {b["id"] for b in matches} - low_weight = [ - b for b in all_buckets - if b["id"] not in matched_ids - and decay_engine.calculate_score(b["metadata"]) < 2.0 - ] - if low_weight: - drifted = random.sample(low_weight, min(random.randint(1, 3), len(low_weight))) - drift_results = [] - for b in drifted: - summary = await dehydrator.dehydrate(b["content"], b["metadata"]) - drift_results.append(f"[surface_type: random]\n{summary}") - results.append("--- 忽然想起来 ---\n" + "\n---\n".join(drift_results)) - except Exception as e: - logger.warning(f"Random surfacing failed / 随机浮现失败: {e}") - - if not results: - return "未找到相关记忆。" - - return "\n---\n".join(results) - - -# ============================================================= -# Tool 2: hold — Hold on to this -# 工具 2:hold — 握住,留下来 -# ============================================================= -@mcp.tool() -async def hold( - content: str, - tags: str = "", - importance: int = 5, -) -> str: - """存储单条记忆。自动打标+合并相似桶。tags 逗号分隔,importance 1-10。""" - await decay_engine.ensure_started() - - # --- Input validation / 输入校验 --- - if not content or not content.strip(): - return "内容为空,无法存储。" - - importance = max(1, min(10, importance)) - extra_tags = [t.strip() for t in tags.split(",") if t.strip()] - - # --- Step 1: auto-tagging / 自动打标 --- - try: - analysis = await dehydrator.analyze(content) - except Exception as e: - logger.warning(f"Auto-tagging failed, using defaults / 自动打标失败: {e}") - analysis = { - "domain": ["未分类"], "valence": 0.5, "arousal": 0.3, - "tags": [], "suggested_name": "", - } - - domain = analysis["domain"] - valence = analysis["valence"] - arousal = analysis["arousal"] - auto_tags = analysis["tags"] - suggested_name = analysis.get("suggested_name", "") - - all_tags = list(dict.fromkeys(auto_tags + extra_tags)) - - # --- Step 2: merge or create / 合并或新建 --- - result_name, is_merged = await _merge_or_create( - content=content, - tags=all_tags, - importance=importance, - domain=domain, - valence=valence, - arousal=arousal, - name=suggested_name, - ) - - if is_merged: - return ( - f"已合并到现有记忆桶: {result_name}\n" - f"主题域: {', '.join(domain)} | 情感: V{valence:.1f}/A{arousal:.1f}" - ) - return ( - f"已创建新记忆桶: {result_name}\n" - f"主题域: {', '.join(domain)} | 情感: V{valence:.1f}/A{arousal:.1f} | 标签: {', '.join(all_tags)}" - ) - - -# ============================================================= -# Tool 3: grow — Grow, fragments become memories -# 工具 3:grow — 生长,一天的碎片长成记忆 -# ============================================================= -@mcp.tool() -async def grow(content: str) -> str: - """日记归档。自动拆分长内容为多个记忆桶。""" - await decay_engine.ensure_started() - - if not content or not content.strip(): - return "内容为空,无法整理。" - - # --- Step 1: let API split and organize / 让 API 拆分整理 --- - try: - items = await dehydrator.digest(content) - except Exception as e: - logger.error(f"Diary digest failed / 日记整理失败: {e}") - return f"日记整理失败: {e}" - - if not items: - return "内容为空或整理失败。" - - results = [] - created = 0 - merged = 0 - - # --- Step 2: merge or create each item (with per-item error handling) --- - # --- 逐条合并或新建(单条失败不影响其他)--- - for item in items: - try: - result_name, is_merged = await _merge_or_create( - content=item["content"], - tags=item.get("tags", []), - importance=item.get("importance", 5), - domain=item.get("domain", ["未分类"]), - valence=item.get("valence", 0.5), - arousal=item.get("arousal", 0.3), - name=item.get("name", ""), - ) - - if is_merged: - results.append(f" 📎 合并 → {result_name}") - merged += 1 - else: - domains_str = ",".join(item.get("domain", [])) - results.append( - f" 📝 新建 [{item.get('name', result_name)}] " - f"主题:{domains_str} V{item.get('valence', 0.5):.1f}/A{item.get('arousal', 0.3):.1f}" - ) - created += 1 - except Exception as e: - logger.warning( - f"Failed to process diary item / 日记条目处理失败: " - f"{item.get('name', '?')}: {e}" - ) - results.append(f" ⚠️ 失败: {item.get('name', '未知条目')}") - - summary = f"=== 日记整理完成 ===\n拆分为 {len(items)} 条 | 新建 {created} 桶 | 合并 {merged} 桶\n" - return summary + "\n".join(results) - - -# ============================================================= -# Tool 4: trace — Trace, redraw the outline of a memory -# 工具 4:trace — 描摹,重新勾勒记忆的轮廓 -# Also handles deletion (delete=True) -# 同时承接删除功能 -# ============================================================= -@mcp.tool() -async def trace( - bucket_id: str, - name: str = "", - domain: str = "", - valence: float = -1, - arousal: float = -1, - importance: int = -1, - tags: str = "", - resolved: int = -1, - delete: bool = False, -) -> str: - """修改记忆元数据。resolved=1 标记已解决(桶权重骤降沉底),resolved=0 重新激活,delete=True 删除桶。其余字段只传需改的,-1 或空串表示不改。""" - - if not bucket_id or not bucket_id.strip(): - return "请提供有效的 bucket_id。" - - # --- Delete mode / 删除模式 --- - if delete: - success = await bucket_mgr.delete(bucket_id) - return f"已遗忘记忆桶: {bucket_id}" if success else f"未找到记忆桶: {bucket_id}" - - bucket = await bucket_mgr.get(bucket_id) - if not bucket: - return f"未找到记忆桶: {bucket_id}" - - # --- Collect only fields actually passed / 只收集用户实际传入的字段 --- - updates = {} - if name: - updates["name"] = name - if domain: - updates["domain"] = [d.strip() for d in domain.split(",") if d.strip()] - if 0 <= valence <= 1: - updates["valence"] = valence - if 0 <= arousal <= 1: - updates["arousal"] = arousal - if 1 <= importance <= 10: - updates["importance"] = importance - if tags: - updates["tags"] = [t.strip() for t in tags.split(",") if t.strip()] - if resolved in (0, 1): - updates["resolved"] = bool(resolved) - - if not updates: - return "没有任何字段需要修改。" - - success = await bucket_mgr.update(bucket_id, **updates) - if not success: - return f"修改失败: {bucket_id}" - - changed = ", ".join(f"{k}={v}" for k, v in updates.items()) - # Explicit hint about resolved state change semantics - # 特别提示 resolved 状态变化的语义 - if "resolved" in updates: - if updates["resolved"]: - changed += " → 已沉底,只在关键词触发时重新浮现" - else: - changed += " → 已重新激活,将参与浮现排序" - return f"已修改记忆桶 {bucket_id}: {changed}" - - -# ============================================================= -# Tool 5: pulse — Heartbeat, system status + memory listing -# 工具 5:pulse — 脉搏,系统状态 + 记忆列表 -# ============================================================= -@mcp.tool() -async def pulse(include_archive: bool = False) -> str: - """系统状态和所有记忆桶摘要。include_archive=True 时包含归档桶。""" - try: - stats = await bucket_mgr.get_stats() - except Exception as e: - return f"获取系统状态失败: {e}" - - status = ( - f"=== Ombre Brain 记忆系统 ===\n" - f"固化记忆桶: {stats['permanent_count']} 个\n" - f"动态记忆桶: {stats['dynamic_count']} 个\n" - f"归档记忆桶: {stats['archive_count']} 个\n" - f"总存储大小: {stats['total_size_kb']:.1f} KB\n" - f"衰减引擎: {'运行中' if decay_engine.is_running else '已停止'}\n" - ) - - # --- List all bucket summaries / 列出所有桶摘要 --- - try: - buckets = await bucket_mgr.list_all(include_archive=include_archive) - except Exception as e: - return status + f"\n列出记忆桶失败: {e}" - - if not buckets: - return status + "\n记忆库为空。" - - lines = [] - for b in buckets: - meta = b.get("metadata", {}) - if meta.get("type") == "permanent": - icon = "📦" - elif meta.get("type") == "archived": - icon = "🗄️" - elif meta.get("resolved", False): - icon = "✅" - else: - icon = "💭" - try: - score = decay_engine.calculate_score(meta) - except Exception: - score = 0.0 - domains = ",".join(meta.get("domain", [])) - val = meta.get("valence", 0.5) - aro = meta.get("arousal", 0.3) - resolved_tag = " [已解决]" if meta.get("resolved", False) else "" - lines.append( - f"{icon} [{meta.get('name', b['id'])}]{resolved_tag} " - f"主题:{domains} " - f"情感:V{val:.1f}/A{aro:.1f} " - f"重要:{meta.get('importance', '?')} " - f"权重:{score:.2f} " - f"标签:{','.join(meta.get('tags', []))}" - ) - - return status + "\n=== 记忆列表 ===\n" + "\n".join(lines) - - -# --- Entry point / 启动入口 --- -if __name__ == "__main__": - transport = config.get("transport", "stdio") - logger.info(f"Ombre Brain starting | transport: {transport}") - - # --- Application-level keepalive: remote mode only, ping /health every 60s --- - # --- 应用层保活:仅远程模式下启动,每 60 秒 ping 一次 /health --- - # Prevents Cloudflare Tunnel from dropping idle connections - if transport in ("sse", "streamable-http"): - async def _keepalive_loop(): - await asyncio.sleep(10) # Wait for server to fully start - async with httpx.AsyncClient() as client: - while True: - try: - await client.get("http://localhost:8000/health", timeout=5) - logger.debug("Keepalive ping OK / 保活 ping 成功") - except Exception as e: - logger.warning(f"Keepalive ping failed / 保活 ping 失败: {e}") - await asyncio.sleep(60) - - import threading - - def _start_keepalive(): - loop = asyncio.new_event_loop() - loop.run_until_complete(_keepalive_loop()) - - t = threading.Thread(target=_start_keepalive, daemon=True) - t.start() - - mcp.run(transport=transport) diff --git a/bucket_manager.py b/bucket_manager.py index 296dc73..4557c41 100644 --- a/bucket_manager.py +++ b/bucket_manager.py @@ -28,15 +28,12 @@ import os import math import logging -import re import shutil -from collections import Counter from datetime import datetime from pathlib import Path from typing import Optional import frontmatter -import jieba from rapidfuzz import fuzz from utils import generate_bucket_id, sanitize_name, safe_path, now_iso diff --git a/embedding_engine.py b/embedding_engine.py index 3c42a96..14523a5 100644 --- a/embedding_engine.py +++ b/embedding_engine.py @@ -16,8 +16,6 @@ import json import math import sqlite3 import logging -import asyncio -from pathlib import Path from openai import AsyncOpenAI diff --git a/import_memory.py b/import_memory.py index e55d8bd..a4aa47e 100644 --- a/import_memory.py +++ b/import_memory.py @@ -19,10 +19,8 @@ import os import json import hashlib import logging -import asyncio from datetime import datetime from pathlib import Path -from typing import Optional from utils import count_tokens_approx, now_iso diff --git a/migrate_to_domains.py b/migrate_to_domains.py index 52591a1..3befdd5 100644 --- a/migrate_to_domains.py +++ b/migrate_to_domains.py @@ -117,7 +117,7 @@ def migrate(): print(f" ✓ {filename}") print(f" → {primary_domain}/{new_filename}") - print(f"\n迁移完成。") + print("\n迁移完成。") # 展示新结构 print("\n=== 新目录结构 ===") diff --git a/reclassify_domains.py b/reclassify_domains.py index 20bb230..78a5d5a 100644 --- a/reclassify_domains.py +++ b/reclassify_domains.py @@ -165,7 +165,6 @@ def reclassify(): new_domains = classify(body, old_domains) primary = sanitize_name(new_domains[0]) - old_primary = sanitize_name(old_domains[0]) if old_domains else "未分类" if name and name != bucket_id: new_filename = f"{sanitize_name(name)}_{bucket_id}.md" @@ -197,7 +196,7 @@ def reclassify(): os.rmdir(dp) print(f"\n 🗑 删除空目录: {d}/") - print(f"\n重分类完成。\n") + print("\n重分类完成。\n") # 展示新结构 print("=== 新目录结构 ===") diff --git a/test_smoke.py b/test_smoke.py deleted file mode 100644 index e20f071..0000000 --- a/test_smoke.py +++ /dev/null @@ -1,126 +0,0 @@ -"""Ombre Brain 冒烟测试:验证核心功能链路""" -import asyncio -import os - -# 确保模块路径 -import sys -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from utils import load_config, setup_logging -from bucket_manager import BucketManager -from dehydrator import Dehydrator -from decay_engine import DecayEngine - - -async def main(): - config = load_config() - setup_logging("INFO") - bm = BucketManager(config) - dh = Dehydrator(config) - de = DecayEngine(config, bm) - - print(f"API available: {dh.api_available}") - print(f"base_url: {dh.base_url}") - print() - - # ===== 1. 自动打标 ===== - print("=== 1. analyze (自动打标) ===") - try: - result = await dh.analyze("今天学了 Python 的 asyncio,感觉收获很大,心情不错") - print(f" domain: {result['domain']}") - print(f" valence: {result['valence']}, arousal: {result['arousal']}") - print(f" tags: {result['tags']}") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - print() - - # ===== 2. 建桶 ===== - print("=== 2. create (建桶) ===") - try: - bid = await bm.create( - content="P酱喜欢猫,家里养了一只橘猫叫小橘", - tags=["猫", "宠物"], - importance=7, - domain=["生活"], - valence=0.8, - arousal=0.4, - ) - print(f" bucket_id: {bid}") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - return - print() - - # ===== 3. 搜索 ===== - print("=== 3. search (检索) ===") - try: - hits = await bm.search("猫", limit=3) - print(f" found {len(hits)} results") - for h in hits: - name = h["metadata"].get("name", h["id"]) - print(f" - {name} (score={h['score']:.1f})") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - print() - - # ===== 4. 脱水压缩 ===== - print("=== 4. dehydrate (脱水压缩) ===") - try: - text = ( - "这是一段很长的内容用来测试脱水功能。" - "P酱今天去了咖啡厅,点了一杯拿铁,然后坐在窗边看书看了两个小时。" - "期间遇到了一个朋友,聊了聊最近的工作情况。回家之后写了会代码。" - ) - summary = await dh.dehydrate(text, {}) - print(f" summary: {summary[:120]}...") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - print() - - # ===== 5. 衰减评分 ===== - print("=== 5. decay score (衰减评分) ===") - try: - bucket = await bm.get(bid) - score = de.calculate_score(bucket["metadata"]) - print(f" score: {score:.3f}") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - print() - - # ===== 6. 日记整理 ===== - print("=== 6. digest (日记整理) ===") - try: - diary = ( - "今天上午写了个 Python 脚本处理数据,下午和朋友去吃了火锅很开心," - "晚上失眠了有点焦虑,想了想明天的面试。" - ) - items = await dh.digest(diary) - print(f" 拆分出 {len(items)} 条记忆:") - for it in items: - print(f" - [{it.get('name','')}] domain={it['domain']} V{it['valence']:.1f}/A{it['arousal']:.1f}") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - print() - - # ===== 7. 清理测试数据 ===== - print("=== 7. cleanup (删除测试桶) ===") - try: - ok = await bm.delete(bid) - print(f" deleted: {ok}") - print(" [OK]") - except Exception as e: - print(f" [FAIL] {e}") - print() - - print("=" * 40) - print("冒烟测试完成!") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/test_tools.py b/test_tools.py deleted file mode 100644 index 6d6b453..0000000 --- a/test_tools.py +++ /dev/null @@ -1,159 +0,0 @@ -"""Ombre Brain MCP tool-level end-to-end test: direct calls to @mcp.tool() functions - Ombre Brain MCP 工具层端到端测试:直接调用 @mcp.tool() 函数""" -import asyncio -import os -import sys - -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from utils import load_config, setup_logging - -config = load_config() -setup_logging("INFO") - -# Must import after config is set, since server.py does module-level init -# 必须在配置好后导入,因为 server.py 有模块级初始化 -from server import breath, hold, trace, pulse, grow - - -async def main(): - passed = 0 - failed = 0 - - # ===== pulse ===== - print("=== [1/6] pulse ===") - try: - r = await pulse() - assert "Ombre Brain" in r - print(f" {r.splitlines()[0]}") - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # ===== hold ===== - print("=== [2/6] hold ===") - try: - r = await hold(content="P酱最喜欢的编程语言是 Python,喜欢用 FastAPI 写后端", tags="编程,偏好", importance=8) - print(f" {r.splitlines()[0]}") - assert any(kw in r for kw in ["新建", "合并", "📌"]) - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # ===== hold (merge test / 合并测试) ===== - print("=== [2b/6] hold (合并测试) ===") - try: - r = await hold(content="P酱也喜欢用 Python 写爬虫和数据分析", tags="编程", importance=6) - print(f" {r.splitlines()[0]}") - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # ===== breath ===== - print("=== [3/6] breath ===") - try: - r = await breath(query="Python 编程", max_results=3) - print(f" 结果前80字: {r[:80]}...") - assert "未找到" not in r - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # ===== breath (emotion resonance / 情感共鸣) ===== - print("=== [3b/6] breath (情感共鸣检索) ===") - try: - r = await breath(query="编程", domain="编程", valence=0.8, arousal=0.5) - print(f" 结果前80字: {r[:80]}...") - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # --- Get a bucket ID for subsequent tests / 取一个桶 ID 用于后续测试 --- - bucket_id = None - from bucket_manager import BucketManager - bm = BucketManager(config) - all_buckets = await bm.list_all() - if all_buckets: - bucket_id = all_buckets[0]["id"] - - # ===== trace ===== - print("=== [4/6] trace ===") - if bucket_id: - try: - r = await trace(bucket_id=bucket_id, domain="编程,创作", importance=9) - print(f" {r}") - assert "已修改" in r - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - else: - print(" [SKIP] 没有可编辑的桶") - print() - - # ===== grow ===== - print("=== [5/6] grow ===") - try: - diary = ( - "今天早上复习了线性代数,搞懂了特征值分解。" - "中午和室友去吃了拉面,聊了聊暑假实习的事。" - "下午写了一个 Flask 项目的 API 接口。" - "晚上看了部电影叫《星际穿越》,被结尾感动哭了。" - ) - r = await grow(content=diary) - print(f" {r.splitlines()[0]}") - for line in r.splitlines()[1:]: - if line.strip(): - print(f" {line}") - assert "条|新" in r or "整理" in r - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # ===== cleanup via trace(delete=True) / 清理测试数据 ===== - print("=== [6/6] cleanup (清理全部测试数据) ===") - try: - all_buckets = await bm.list_all() - for b in all_buckets: - r = await trace(bucket_id=b["id"], delete=True) - print(f" {r}") - print(" [OK]") - passed += 1 - except Exception as e: - print(f" [FAIL] {e}") - failed += 1 - print() - - # ===== Confirm cleanup / 确认清理干净 ===== - final = await pulse() - print(f"清理后: {final.splitlines()[0]}") - print() - print("=" * 50) - print(f"MCP tool test complete / 工具测试完成: {passed} passed / {failed} failed") - if failed == 0: - print("All passed ✓") - else: - print(f"{failed} failed ✗") - - -if __name__ == "__main__": - asyncio.run(main())