Files
Ombre_Brain/dehydrator.py
P0luz d4740f0d1f
Some checks failed
Build & Push Docker Image / build-and-push (push) Has been cancelled
Tests / test (push) Has been cancelled
fix: dehydrate优先走API + SQLite缓存, breath/pulse显示bucket_id
- dehydrate() 现在仅通过 API 脱水,不再有本地 fallback
- 新增 SQLite 持久缓存 (dehydration_cache.db),避免重复 API 调用
- 删除 _local_dehydrate() 和 _extract_keywords(),移除 jieba 依赖
- breath 三种模式 (surfacing/search/feel) 输出添加 [bucket_id:xxx]
- pulse 输出每行添加 bucket_id:xxx
2026-04-19 13:12:44 +08:00

597 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ============================================================
# Module: Dehydration & Auto-tagging (dehydrator.py)
# 模块:数据脱水压缩 + 自动打标
#
# Capabilities:
# 能力:
# 1. Dehydrate: compress memory content into high-density summaries (save tokens)
# 脱水:将记忆桶的原始内容压缩为高密度摘要,省 token
# 2. Merge: blend old and new content, keeping bucket size constant
# 合并:揉合新旧内容,控制桶体积恒定
# 3. Analyze: auto-analyze content for domain/emotion/tags
# 打标:自动分析内容,输出主题域/情感坐标/标签
#
# Operating modes:
# 工作模式:
# - API only: OpenAI-compatible API (DeepSeek/Ollama/LM Studio/vLLM/Gemini etc.)
# 仅 API通过 OpenAI 兼容客户端调用 LLM API
# - Dehydration cache: SQLite persistent cache to avoid redundant API calls
# 脱水缓存SQLite 持久缓存,避免重复调用 API
#
# Depended on by: server.py
# 被谁依赖server.py
# ============================================================
import os
import re
import json
import hashlib
import sqlite3
import logging
from openai import AsyncOpenAI
from utils import count_tokens_approx
logger = logging.getLogger("ombre_brain.dehydrator")
# --- Dehydration prompt: instructs cheap LLM to compress information ---
# --- 脱水提示词:指导廉价 LLM 压缩信息 ---
DEHYDRATE_PROMPT = """你是一个信息压缩专家。请将以下内容脱水为紧凑摘要。
压缩规则:
1. 提取所有核心事实,去除冗余修饰和重复
2. 保留最新的情绪状态和态度
3. 保留所有待办/未完成事项
4. 关键数字、日期、名称必须保留
5. 目标压缩率 > 70%
输出格式(纯 JSON无其他内容
{
"core_facts": ["事实1", "事实2"],
"emotion_state": "当前情绪关键词",
"todos": ["待办1", "待办2"],
"keywords": ["关键词1", "关键词2"],
"summary": "50字以内的核心总结"
}"""
# --- Diary digest prompt: split daily notes into independent memory entries ---
# --- 日记整理提示词:把一大段日常拆分成多个独立记忆条目 ---
DIGEST_PROMPT = """你是一个日记整理专家。用户会发送一段包含今天各种事情的文本(可能很杂乱),请你将其拆分成多个独立的记忆条目。
整理规则:
1. 每个条目应该是一个独立的主题/事件(不要混在一起)
2. 为每个条目自动分析元数据
3. 去除无意义的口水话和重复信息,保留核心内容
4. 同一主题的零散信息应合并为一个条目
5. 如果有待办事项,单独提取为一个条目
6. 单个条目内容不少于50字过短的零碎信息合并到最相关的条目中
7. 总条目数控制在 2~6 个,避免过度碎片化
8. 在 content 中对人名、地名、专有名词用 [[双链]] 标记(如 [[婷易]]、[[Obsidian]]),普通词汇不要加
输出格式(纯 JSON 数组,无其他内容):
[
{
"name": "条目标题10字以内",
"content": "整理后的内容",
"domain": ["主题域1"],
"valence": 0.7,
"arousal": 0.4,
"tags": ["核心词1", "核心词2", "扩展词1", "扩展词2"],
"importance": 5
}
]
tags 生成规则:先从原文精准提取 3~5 个核心词,再引申扩展 5~8 个语义相关词(近义词、上位词、关联场景词),合并为一个数组。
主题域可选(选最精确的 1~2 个,只选真正相关的):
日常: ["饮食", "穿搭", "出行", "居家", "购物"]
人际: ["家庭", "恋爱", "友谊", "社交"]
成长: ["工作", "学习", "考试", "求职"]
身心: ["健康", "心理", "睡眠", "运动"]
兴趣: ["游戏", "影视", "音乐", "阅读", "创作", "手工"]
数字: ["编程", "AI", "硬件", "网络"]
事务: ["财务", "计划", "待办"]
内心: ["情绪", "回忆", "梦境", "自省"]
importance: 1-10根据内容重要程度判断
valence: 0~10=消极, 0.5=中性, 1=积极)
arousal: 0~10=平静, 0.5=普通, 1=激动)"""
# --- Merge prompt: instruct LLM to blend old and new memories ---
# --- 合并提示词:指导 LLM 揉合新旧记忆 ---
MERGE_PROMPT = """你是一个信息合并专家。请将旧记忆与新内容合并为一份统一的简洁记录。
合并规则:
1. 新内容与旧记忆冲突时,以新内容为准
2. 去除重复信息
3. 保留所有重要事实
4. 总长度尽量不超过旧记忆的 120%
5. 对出现的人名、地名、专有名词用 [[双链]] 标记(如 [[婷易]]、[[Obsidian]]),普通词汇不要加
直接输出合并后的文本,不要加额外说明。"""
# --- Auto-tagging prompt: analyze content for domain and emotion coords ---
# --- 自动打标提示词:分析内容的主题域和情感坐标 ---
ANALYZE_PROMPT = """你是一个内容分析器。请分析以下文本,输出结构化的元数据。
分析规则:
1. domain主题域选最精确的 1~2 个,只选真正相关的
日常: ["饮食", "穿搭", "出行", "居家", "购物"]
人际: ["家庭", "恋爱", "友谊", "社交"]
成长: ["工作", "学习", "考试", "求职"]
身心: ["健康", "心理", "睡眠", "运动"]
兴趣: ["游戏", "影视", "音乐", "阅读", "创作", "手工"]
数字: ["编程", "AI", "硬件", "网络"]
事务: ["财务", "计划", "待办"]
内心: ["情绪", "回忆", "梦境", "自省"]
2. valence情感效价0.0~1.00=极度消极 → 0.5=中性 → 1.0=极度积极
3. arousal情感唤醒度0.0~1.00=非常平静 → 0.5=普通 → 1.0=非常激动
4. tags关键词标签分两步生成合并为一个数组
第一步—精准提取:从原文抽取 3~5 个真正的核心词,不泛化、不遗漏
第二步—引申扩展:自动补充 8~10 个与当前场景语义相关的词,包括近义词、上位词、关联场景词、用户可能用不同措辞搜索的词
两步合并为一个 tags 数组,总计 10~15 个
5. suggested_name建议桶名10字以内的简短标题
6. 在 tags 和 suggested_name 中不要使用 [[]] 双链标记
输出格式(纯 JSON无其他内容
{
"domain": ["主题域1", "主题域2"],
"valence": 0.7,
"arousal": 0.4,
"tags": ["核心词1", "核心词2", "扩展词1", "扩展词2", "..."],
"suggested_name": "简短标题"
}"""
class Dehydrator:
"""
Data dehydrator + content analyzer.
Three capabilities: dehydration / merge / auto-tagging (domain + emotion).
Prefers API (better quality); auto-degrades to local (guaranteed availability).
数据脱水器 + 内容分析器。
三大能力:脱水压缩 / 新旧合并 / 自动打标。
优先走 APIAPI 挂了自动降级到本地。
"""
def __init__(self, config: dict):
# --- Read dehydration API config / 读取脱水 API 配置 ---
dehy_cfg = config.get("dehydration", {})
self.api_key = dehy_cfg.get("api_key", "")
self.model = dehy_cfg.get("model", "deepseek-chat")
self.base_url = dehy_cfg.get("base_url", "https://api.deepseek.com/v1")
self.max_tokens = dehy_cfg.get("max_tokens", 1024)
self.temperature = dehy_cfg.get("temperature", 0.1)
# --- API availability / 是否有可用的 API ---
self.api_available = bool(self.api_key)
# --- Initialize OpenAI-compatible client ---
# --- 初始化 OpenAI 兼容客户端 ---
if self.api_available:
self.client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
timeout=60.0,
)
else:
self.client = None
# --- SQLite dehydration cache ---
# --- SQLite 脱水缓存content hash → summary ---
db_path = os.path.join(config["buckets_dir"], "dehydration_cache.db")
self.cache_db_path = db_path
self._init_cache_db()
def _init_cache_db(self):
"""Create dehydration cache table if not exists."""
os.makedirs(os.path.dirname(self.cache_db_path), exist_ok=True)
conn = sqlite3.connect(self.cache_db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS dehydration_cache (
content_hash TEXT PRIMARY KEY,
summary TEXT NOT NULL,
model TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
conn.commit()
conn.close()
def _get_cached_summary(self, content: str) -> str | None:
"""Look up cached dehydration result by content hash."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
conn = sqlite3.connect(self.cache_db_path)
row = conn.execute(
"SELECT summary FROM dehydration_cache WHERE content_hash = ?",
(content_hash,)
).fetchone()
conn.close()
return row[0] if row else None
def _set_cached_summary(self, content: str, summary: str):
"""Store dehydration result in cache."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
conn = sqlite3.connect(self.cache_db_path)
conn.execute(
"INSERT OR REPLACE INTO dehydration_cache (content_hash, summary, model) VALUES (?, ?, ?)",
(content_hash, summary, self.model)
)
conn.commit()
conn.close()
def invalidate_cache(self, content: str):
"""Remove cached summary for specific content (call when bucket content changes)."""
content_hash = hashlib.sha256(content.encode()).hexdigest()
conn = sqlite3.connect(self.cache_db_path)
conn.execute("DELETE FROM dehydration_cache WHERE content_hash = ?", (content_hash,))
conn.commit()
conn.close()
# ---------------------------------------------------------
# Dehydrate: compress raw content into concise summary
# 脱水:将原始内容压缩为精简摘要
# Try API first, fallback to local
# 先尝试 API失败则回退本地
# ---------------------------------------------------------
async def dehydrate(self, content: str, metadata: dict = None) -> str:
"""
Dehydrate/compress memory content.
Returns formatted summary string ready for Claude context injection.
Uses SQLite cache to avoid redundant API calls.
对记忆内容做脱水压缩。
返回格式化的摘要字符串,可直接注入 Claude 上下文。
使用 SQLite 缓存避免重复调用 API。
"""
if not content or not content.strip():
return "(空记忆 / empty memory"
# --- Content is short enough, no compression needed ---
# --- 内容已经很短,不需要压缩 ---
if count_tokens_approx(content) < 100:
return self._format_output(content, metadata)
# --- Check cache first ---
# --- 先查缓存 ---
cached = self._get_cached_summary(content)
if cached:
return self._format_output(cached, metadata)
# --- API dehydration (no local fallback) ---
# --- API 脱水(无本地降级)---
if not self.api_available:
raise RuntimeError("脱水 API 不可用,请配置 OMBRE_API_KEY")
result = await self._api_dehydrate(content)
# --- Cache the result ---
self._set_cached_summary(content, result)
return self._format_output(result, metadata)
# ---------------------------------------------------------
# Merge: blend new content into existing bucket
# 合并:将新内容揉入已有桶,保持体积恒定
# ---------------------------------------------------------
async def merge(self, old_content: str, new_content: str) -> str:
"""
Merge new content with old memory, preventing infinite bucket growth.
将新内容与旧记忆合并,避免桶无限膨胀。
"""
if not old_content and not new_content:
return ""
if not old_content:
return new_content or ""
if not new_content:
return old_content
# --- API merge (no local fallback) ---
if not self.api_available:
raise RuntimeError("脱水 API 不可用,请检查 config.yaml 中的 dehydration 配置")
try:
result = await self._api_merge(old_content, new_content)
if result:
return result
raise RuntimeError("API 合并返回空结果")
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"API 合并失败,请检查 API 连接: {e}") from e
# ---------------------------------------------------------
# API call: dehydration
# API 调用:脱水压缩
# ---------------------------------------------------------
async def _api_dehydrate(self, content: str) -> str:
"""
Call LLM API for intelligent dehydration (via OpenAI-compatible client).
调用 LLM API 执行智能脱水。
"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": DEHYDRATE_PROMPT},
{"role": "user", "content": content[:3000]},
],
max_tokens=self.max_tokens,
temperature=self.temperature,
)
if not response.choices:
return ""
return response.choices[0].message.content or ""
# ---------------------------------------------------------
# API call: merge
# API 调用:合并
# ---------------------------------------------------------
async def _api_merge(self, old_content: str, new_content: str) -> str:
"""
Call LLM API for intelligent merge (via OpenAI-compatible client).
调用 LLM API 执行智能合并。
"""
user_msg = f"旧记忆:\n{old_content[:2000]}\n\n新内容:\n{new_content[:2000]}"
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": MERGE_PROMPT},
{"role": "user", "content": user_msg},
],
max_tokens=self.max_tokens,
temperature=self.temperature,
)
if not response.choices:
return ""
return response.choices[0].message.content or ""
# ---------------------------------------------------------
# Output formatting
# 输出格式化
# Wraps dehydrated result with bucket name, tags, emotion coords
# 把脱水结果包装成带桶名、标签、情感坐标的可读文本
# ---------------------------------------------------------
def _format_output(self, content: str, metadata: dict = None) -> str:
"""
Format dehydrated result into context-injectable text.
将脱水结果格式化为可注入上下文的文本。
"""
header = ""
if metadata and isinstance(metadata, dict):
name = metadata.get("name", "未命名")
domains = ", ".join(metadata.get("domain", []))
try:
valence = float(metadata.get("valence", 0.5))
arousal = float(metadata.get("arousal", 0.3))
except (ValueError, TypeError):
valence, arousal = 0.5, 0.3
header = f"📌 记忆桶: {name}"
if domains:
header += f" [主题:{domains}]"
header += f" [情感:V{valence:.1f}/A{arousal:.1f}]"
# Show model's perspective if available (valence drift)
model_v = metadata.get("model_valence")
if model_v is not None:
try:
header += f" [我的视角:V{float(model_v):.1f}]"
except (ValueError, TypeError):
pass
if metadata.get("digested"):
header += " [已消化]"
header += "\n"
content = re.sub(r'\[\[([^\]]+)\]\]', r'\1', content)
return f"{header}{content}"
# ---------------------------------------------------------
# Auto-tagging: analyze content for domain + emotion + tags
# 自动打标:分析内容,输出主题域 + 情感坐标 + 标签
# Called by server.py when storing new memories
# 存新记忆时由 server.py 调用
# ---------------------------------------------------------
async def analyze(self, content: str) -> dict:
"""
Analyze content and return structured metadata.
分析内容,返回结构化元数据。
Returns: {"domain", "valence", "arousal", "tags", "suggested_name"}
"""
if not content or not content.strip():
return self._default_analysis()
# --- API analyze (no local fallback) ---
if not self.api_available:
raise RuntimeError("脱水 API 不可用,请检查 config.yaml 中的 dehydration 配置")
try:
result = await self._api_analyze(content)
if result:
return result
raise RuntimeError("API 打标返回空结果")
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"API 打标失败,请检查 API 连接: {e}") from e
# ---------------------------------------------------------
# API call: auto-tagging
# API 调用:自动打标
# ---------------------------------------------------------
async def _api_analyze(self, content: str) -> dict:
"""
Call LLM API for content analysis / tagging.
调用 LLM API 执行内容分析打标。
"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": ANALYZE_PROMPT},
{"role": "user", "content": content[:2000]},
],
max_tokens=256,
temperature=0.1,
)
if not response.choices:
return self._default_analysis()
raw = response.choices[0].message.content or ""
if not raw.strip():
return self._default_analysis()
return self._parse_analysis(raw)
# ---------------------------------------------------------
# Parse API JSON response with safety checks
# 解析 API 返回的 JSON做安全校验
# Ensure valence/arousal in 0~1, domain/tags valid
# ---------------------------------------------------------
def _parse_analysis(self, raw: str) -> dict:
"""
Parse and validate API tagging result.
解析并校验 API 返回的打标结果。
"""
try:
# Handle potential markdown code block wrapping
# 处理可能的 markdown 代码块包裹
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0]
result = json.loads(cleaned)
except (json.JSONDecodeError, IndexError, ValueError):
logger.warning(f"API tagging JSON parse failed / JSON 解析失败: {raw[:200]}")
return self._default_analysis()
if not isinstance(result, dict):
return self._default_analysis()
# --- Validate and clamp value ranges / 校验并钳制数值范围 ---
try:
valence = max(0.0, min(1.0, float(result.get("valence", 0.5))))
arousal = max(0.0, min(1.0, float(result.get("arousal", 0.3))))
except (ValueError, TypeError):
valence, arousal = 0.5, 0.3
return {
"domain": result.get("domain", ["未分类"])[:3],
"valence": valence,
"arousal": arousal,
"tags": result.get("tags", [])[:15],
"suggested_name": str(result.get("suggested_name", ""))[:20],
}
# ---------------------------------------------------------
# Default analysis result (empty content or total failure)
# 默认分析结果(内容为空或完全失败时用)
# ---------------------------------------------------------
def _default_analysis(self) -> dict:
"""
Return default neutral analysis result.
返回默认的中性分析结果。
"""
return {
"domain": ["未分类"],
"valence": 0.5,
"arousal": 0.3,
"tags": [],
"suggested_name": "",
}
# ---------------------------------------------------------
# Diary digest: split daily notes into independent memory entries
# 日记整理:把一大段日常拆分成多个独立记忆条目
# For the "grow" tool — "dump a day's content and it gets organized"
# 给 grow 工具用,"一天结束发一坨内容"靠这个
# ---------------------------------------------------------
async def digest(self, content: str) -> list[dict]:
"""
Split a large chunk of daily content into independent memory entries.
将一大段日常内容拆分成多个独立记忆条目。
Returns: [{"name", "content", "domain", "valence", "arousal", "tags", "importance"}, ...]
"""
if not content or not content.strip():
return []
# --- API digest (no local fallback) ---
if not self.api_available:
raise RuntimeError("脱水 API 不可用,请检查 config.yaml 中的 dehydration 配置")
try:
result = await self._api_digest(content)
if result:
return result
raise RuntimeError("API 日记整理返回空结果")
except RuntimeError:
raise
except Exception as e:
raise RuntimeError(f"API 日记整理失败,请检查 API 连接: {e}") from e
# ---------------------------------------------------------
# API call: diary digest
# API 调用:日记整理
# ---------------------------------------------------------
async def _api_digest(self, content: str) -> list[dict]:
"""
Call LLM API for diary organization.
调用 LLM API 执行日记整理。
"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": DIGEST_PROMPT},
{"role": "user", "content": content[:5000]},
],
max_tokens=2048,
temperature=0.0,
)
if not response.choices:
return []
raw = response.choices[0].message.content or ""
if not raw.strip():
return []
return self._parse_digest(raw)
# ---------------------------------------------------------
# Parse diary digest result with safety checks
# 解析日记整理结果,做安全校验
# ---------------------------------------------------------
def _parse_digest(self, raw: str) -> list[dict]:
"""
Parse and validate API diary digest result.
解析并校验 API 返回的日记整理结果。
"""
try:
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1].rsplit("```", 1)[0]
items = json.loads(cleaned)
except (json.JSONDecodeError, IndexError, ValueError):
logger.warning(f"Diary digest JSON parse failed / JSON 解析失败: {raw[:200]}")
return []
if not isinstance(items, list):
return []
validated = []
for item in items:
if not isinstance(item, dict) or not item.get("content"):
continue
try:
importance = max(1, min(10, int(item.get("importance", 5))))
except (ValueError, TypeError):
importance = 5
try:
valence = max(0.0, min(1.0, float(item.get("valence", 0.5))))
arousal = max(0.0, min(1.0, float(item.get("arousal", 0.3))))
except (ValueError, TypeError):
valence, arousal = 0.5, 0.3
validated.append({
"name": str(item.get("name", ""))[:20],
"content": str(item.get("content", "")),
"domain": item.get("domain", ["未分类"])[:3],
"valence": valence,
"arousal": arousal,
"tags": item.get("tags", [])[:15],
"importance": importance,
})
return validated