From d4740f0d1fca0e746e76e2be9cfd3868f7ffcd6a Mon Sep 17 00:00:00 2001 From: P0luz Date: Sun, 19 Apr 2026 13:12:44 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20dehydrate=E4=BC=98=E5=85=88=E8=B5=B0API?= =?UTF-8?q?=20+=20SQLite=E7=BC=93=E5=AD=98,=20breath/pulse=E6=98=BE?= =?UTF-8?q?=E7=A4=BAbucket=5Fid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dehydrate() 现在仅通过 API 脱水,不再有本地 fallback - 新增 SQLite 持久缓存 (dehydration_cache.db),避免重复 API 调用 - 删除 _local_dehydrate() 和 _extract_keywords(),移除 jieba 依赖 - breath 三种模式 (surfacing/search/feel) 输出添加 [bucket_id:xxx] - pulse 输出每行添加 bucket_id:xxx --- dehydrator.py | 158 +++++++++++++++++++++++--------------------------- server.py | 11 ++-- 2 files changed, 81 insertions(+), 88 deletions(-) diff --git a/dehydrator.py b/dehydrator.py index faae140..98c8b77 100644 --- a/dehydrator.py +++ b/dehydrator.py @@ -13,21 +13,22 @@ # # Operating modes: # 工作模式: -# - Primary: OpenAI-compatible API (DeepSeek/Ollama/LM Studio/vLLM/Gemini etc.) -# 主路径:通过 OpenAI 兼容客户端调用 LLM API -# - Fallback: local keyword extraction when API is unavailable -# 备用路径:API 不可用时用本地关键词提取 +# - API only: OpenAI-compatible API (DeepSeek/Ollama/LM Studio/vLLM/Gemini etc.) +# 仅 API:通过 OpenAI 兼容客户端调用 LLM API +# - Dehydration cache: SQLite persistent cache to avoid redundant API calls +# 脱水缓存:SQLite 持久缓存,避免重复调用 API # # Depended on by: server.py # 被谁依赖:server.py # ============================================================ +import os import re import json +import hashlib +import sqlite3 import logging -from collections import Counter -import jieba from openai import AsyncOpenAI @@ -171,8 +172,6 @@ class Dehydrator: # --- Initialize OpenAI-compatible client --- # --- 初始化 OpenAI 兼容客户端 --- - # Supports any OpenAI-format API: DeepSeek / Ollama / LM Studio / vLLM / Gemini etc. - # User only needs to set base_url in config.yaml if self.api_available: self.client = AsyncOpenAI( api_key=self.api_key, @@ -182,6 +181,57 @@ class Dehydrator: else: self.client = None + # --- SQLite dehydration cache --- + # --- SQLite 脱水缓存:content hash → summary --- + db_path = os.path.join(config["buckets_dir"], "dehydration_cache.db") + self.cache_db_path = db_path + self._init_cache_db() + + def _init_cache_db(self): + """Create dehydration cache table if not exists.""" + os.makedirs(os.path.dirname(self.cache_db_path), exist_ok=True) + conn = sqlite3.connect(self.cache_db_path) + conn.execute(""" + CREATE TABLE IF NOT EXISTS dehydration_cache ( + content_hash TEXT PRIMARY KEY, + summary TEXT NOT NULL, + model TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + """) + conn.commit() + conn.close() + + def _get_cached_summary(self, content: str) -> str | None: + """Look up cached dehydration result by content hash.""" + content_hash = hashlib.sha256(content.encode()).hexdigest() + conn = sqlite3.connect(self.cache_db_path) + row = conn.execute( + "SELECT summary FROM dehydration_cache WHERE content_hash = ?", + (content_hash,) + ).fetchone() + conn.close() + return row[0] if row else None + + def _set_cached_summary(self, content: str, summary: str): + """Store dehydration result in cache.""" + content_hash = hashlib.sha256(content.encode()).hexdigest() + conn = sqlite3.connect(self.cache_db_path) + conn.execute( + "INSERT OR REPLACE INTO dehydration_cache (content_hash, summary, model) VALUES (?, ?, ?)", + (content_hash, summary, self.model) + ) + conn.commit() + conn.close() + + def invalidate_cache(self, content: str): + """Remove cached summary for specific content (call when bucket content changes).""" + content_hash = hashlib.sha256(content.encode()).hexdigest() + conn = sqlite3.connect(self.cache_db_path) + conn.execute("DELETE FROM dehydration_cache WHERE content_hash = ?", (content_hash,)) + conn.commit() + conn.close() + # --------------------------------------------------------- # Dehydrate: compress raw content into concise summary # 脱水:将原始内容压缩为精简摘要 @@ -192,8 +242,10 @@ class Dehydrator: """ Dehydrate/compress memory content. Returns formatted summary string ready for Claude context injection. + Uses SQLite cache to avoid redundant API calls. 对记忆内容做脱水压缩。 返回格式化的摘要字符串,可直接注入 Claude 上下文。 + 使用 SQLite 缓存避免重复调用 API。 """ if not content or not content.strip(): return "(空记忆 / empty memory)" @@ -203,9 +255,20 @@ class Dehydrator: if count_tokens_approx(content) < 100: return self._format_output(content, metadata) - # --- Local compression (Always used as requested) --- - # --- 本地压缩 --- - result = self._local_dehydrate(content) + # --- Check cache first --- + # --- 先查缓存 --- + cached = self._get_cached_summary(content) + if cached: + return self._format_output(cached, metadata) + + # --- API dehydration (no local fallback) --- + # --- API 脱水(无本地降级)--- + if not self.api_available: + raise RuntimeError("脱水 API 不可用,请配置 OMBRE_API_KEY") + + result = await self._api_dehydrate(content) + # --- Cache the result --- + self._set_cached_summary(content, result) return self._format_output(result, metadata) # --------------------------------------------------------- @@ -282,80 +345,7 @@ class Dehydrator: return "" return response.choices[0].message.content or "" - # --------------------------------------------------------- - # Local dehydration (fallback when API is unavailable) - # 本地脱水(无 API 时的兜底方案) - # Keyword frequency + sentence position weighting - # 基于关键词频率 + 句子位置权重 - # --------------------------------------------------------- - def _local_dehydrate(self, content: str) -> str: - """ - Local keyword extraction + position-weighted simple compression. - 本地关键词提取 + 位置加权的简单压缩。 - """ - # --- Split into sentences / 分句 --- - sentences = re.split(r"[。!?\n.!?]+", content) - sentences = [s.strip() for s in sentences if len(s.strip()) > 5] - if not sentences: - return content[:200] - - # --- Extract high-frequency keywords / 提取高频关键词 --- - keywords = self._extract_keywords(content) - - # --- Score sentences: position weight + keyword hits --- - # --- 句子评分:开头结尾权重高 + 关键词命中加分 --- - scored = [] - for i, sent in enumerate(sentences): - position_weight = 1.5 if i < 3 else (1.2 if i > len(sentences) - 3 else 1.0) - keyword_hits = sum(1 for kw in keywords if kw in sent) - score = position_weight * (1 + keyword_hits) - scored.append((score, sent)) - - scored.sort(key=lambda x: x[0], reverse=True) - - # --- Top-8 sentences + keyword list / 取高分句 + 关键词列表 --- - selected = [s for _, s in scored[:8]] - summary = "。".join(selected) - - if len(summary) > 1000: - summary = summary[:1000] + "…" - return summary - - # --------------------------------------------------------- - # Keyword extraction - # 关键词提取 - # Chinese + English tokenization → stopword filter → frequency sort - # 中英文分词 + 停用词过滤 + 词频排序 - # --------------------------------------------------------- - def _extract_keywords(self, text: str) -> list[str]: - """ - Extract high-frequency keywords using jieba (Chinese + English mixed). - 用 jieba 分词提取高频关键词。 - """ - try: - words = jieba.lcut(text) - except Exception: - words = [] - # English words / 英文单词 - english_words = re.findall(r"[a-zA-Z]{3,}", text.lower()) - words += english_words - - # Stopwords / 停用词 - stopwords = { - "的", "了", "在", "是", "我", "有", "和", "就", "不", "人", - "都", "一个", "上", "也", "很", "到", "说", "要", "去", - "你", "会", "着", "没有", "看", "好", "自己", "这", "他", "她", - "the", "and", "for", "are", "but", "not", "you", "all", "can", - "had", "her", "was", "one", "our", "out", "has", "have", "with", - "this", "that", "from", "they", "been", "said", "will", "each", - } - filtered = [ - w for w in words - if w not in stopwords and len(w.strip()) > 1 and not re.match(r"^[0-9]+$", w) - ] - counter = Counter(filtered) - return [word for word, _ in counter.most_common(15)] # --------------------------------------------------------- # Output formatting diff --git a/server.py b/server.py index 5d993bc..a55bcca 100644 --- a/server.py +++ b/server.py @@ -300,7 +300,7 @@ async def breath( try: clean_meta = {k: v for k, v in b["metadata"].items() if k != "tags"} summary = await dehydrator.dehydrate(strip_wikilinks(b["content"]), clean_meta) - pinned_results.append(f"📌 [核心准则] {summary}") + pinned_results.append(f"📌 [核心准则] [bucket_id:{b['id']}] {summary}") except Exception as e: logger.warning(f"Failed to dehydrate pinned bucket / 钉选桶脱水失败: {e}") continue @@ -359,7 +359,7 @@ async def breath( break # NOTE: no touch() here — surfacing should NOT reset decay timer score = decay_engine.calculate_score(b["metadata"]) - dynamic_results.append(f"[权重:{score:.2f}] {summary}") + dynamic_results.append(f"[权重:{score:.2f}] [bucket_id:{b['id']}] {summary}") token_budget -= summary_tokens except Exception as e: logger.warning(f"Failed to dehydrate surfaced bucket / 浮现脱水失败: {e}") @@ -387,7 +387,7 @@ async def breath( results = [] for f in feels: created = f["metadata"].get("created", "") - entry = f"[{created}]\n{strip_wikilinks(f['content'])}" + entry = f"[{created}] [bucket_id:{f['id']}]\n{strip_wikilinks(f['content'])}" results.append(entry) if count_tokens_approx("\n---\n".join(results)) > max_tokens: break @@ -453,7 +453,9 @@ async def breath( break await bucket_mgr.touch(bucket["id"]) if bucket.get("vector_match"): - summary = f"[语义关联] {summary}" + summary = f"[语义关联] [bucket_id:{bucket['id']}] {summary}" + else: + summary = f"[bucket_id:{bucket['id']}] {summary}" results.append(summary) token_used += summary_tokens except Exception as e: @@ -829,6 +831,7 @@ async def pulse(include_archive: bool = False) -> str: resolved_tag = " [已解决]" if meta.get("resolved", False) else "" lines.append( f"{icon} [{meta.get('name', b['id'])}]{resolved_tag} " + f"bucket_id:{b['id']} " f"主题:{domains} " f"情感:V{val:.1f}/A{aro:.1f} " f"重要:{meta.get('importance', '?')} "