- decay_engine: keep activation_count as float (B-03); refresh local meta after auto_resolve so resolved_factor applies in the same cycle (B-08) - server.hold(): user-supplied valence/arousal now takes priority over analyze() output (B-09) - utils.load_config: support OMBRE_DEHYDRATION_MODEL (with OMBRE_MODEL alias) and OMBRE_EMBEDDING_MODEL - ENV_VARS.md: document new model env vars - tests/conftest.py: align fixture with spec-correct weights (time_proximity=1.5, content_weight=1.0) and feel subdir layout
198 lines
6.3 KiB
Python
198 lines
6.3 KiB
Python
# ============================================================
|
|
# Shared test fixtures — isolated temp environment for all tests
|
|
# 共享测试 fixtures —— 为所有测试提供隔离的临时环境
|
|
#
|
|
# IMPORTANT: All tests run against a temp directory.
|
|
# Your real /data or local buckets are NEVER touched.
|
|
# 重要:所有测试在临时目录运行,绝不触碰真实记忆数据。
|
|
# ============================================================
|
|
|
|
import os
|
|
import sys
|
|
import math
|
|
import pytest
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
# Ensure project root importable
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
|
|
@pytest.fixture
|
|
def test_config(tmp_path):
|
|
"""
|
|
Minimal config pointing to a temp directory.
|
|
Uses spec-correct scoring weights (after B-05, B-06, B-07 fixes).
|
|
"""
|
|
buckets_dir = str(tmp_path / "buckets")
|
|
os.makedirs(os.path.join(buckets_dir, "permanent"), exist_ok=True)
|
|
os.makedirs(os.path.join(buckets_dir, "dynamic"), exist_ok=True)
|
|
os.makedirs(os.path.join(buckets_dir, "archive"), exist_ok=True)
|
|
os.makedirs(os.path.join(buckets_dir, "feel"), exist_ok=True)
|
|
|
|
return {
|
|
"buckets_dir": buckets_dir,
|
|
"merge_threshold": 75,
|
|
"matching": {"fuzzy_threshold": 50, "max_results": 10},
|
|
"wikilink": {"enabled": False},
|
|
# Spec-correct weights (post B-05/B-06/B-07 fix)
|
|
"scoring_weights": {
|
|
"topic_relevance": 4.0,
|
|
"emotion_resonance": 2.0,
|
|
"time_proximity": 1.5, # spec: 1.5 (was 2.5 in buggy code)
|
|
"importance": 1.0,
|
|
"content_weight": 1.0, # spec: 1.0 (was 3.0 in buggy code)
|
|
},
|
|
"decay": {
|
|
"lambda": 0.05,
|
|
"threshold": 0.3,
|
|
"check_interval_hours": 24,
|
|
"emotion_weights": {"base": 1.0, "arousal_boost": 0.8},
|
|
},
|
|
"dehydration": {
|
|
"api_key": os.environ.get("OMBRE_API_KEY", "test-key"),
|
|
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
|
|
"model": "gemini-2.5-flash-lite",
|
|
},
|
|
"embedding": {
|
|
"api_key": os.environ.get("OMBRE_API_KEY", ""),
|
|
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
|
|
"model": "gemini-embedding-001",
|
|
"enabled": False,
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def buggy_config(tmp_path):
|
|
"""
|
|
Config using the PRE-FIX (buggy) scoring weights.
|
|
Used in regression tests to document the old broken behaviour.
|
|
"""
|
|
buckets_dir = str(tmp_path / "buckets")
|
|
for d in ["permanent", "dynamic", "archive", "feel"]:
|
|
os.makedirs(os.path.join(buckets_dir, d), exist_ok=True)
|
|
|
|
return {
|
|
"buckets_dir": buckets_dir,
|
|
"merge_threshold": 75,
|
|
"matching": {"fuzzy_threshold": 50, "max_results": 10},
|
|
"wikilink": {"enabled": False},
|
|
# Buggy weights (before B-05/B-06/B-07 fixes)
|
|
"scoring_weights": {
|
|
"topic_relevance": 4.0,
|
|
"emotion_resonance": 2.0,
|
|
"time_proximity": 2.5, # B-06: was too high
|
|
"importance": 1.0,
|
|
"content_weight": 3.0, # B-07: was too high
|
|
},
|
|
"decay": {
|
|
"lambda": 0.05,
|
|
"threshold": 0.3,
|
|
"check_interval_hours": 24,
|
|
"emotion_weights": {"base": 1.0, "arousal_boost": 0.8},
|
|
},
|
|
"dehydration": {
|
|
"api_key": "",
|
|
"base_url": "https://example.com",
|
|
"model": "test-model",
|
|
},
|
|
"embedding": {"enabled": False, "api_key": ""},
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def bucket_mgr(test_config):
|
|
from bucket_manager import BucketManager
|
|
return BucketManager(test_config)
|
|
|
|
|
|
@pytest.fixture
|
|
def decay_eng(test_config, bucket_mgr):
|
|
from decay_engine import DecayEngine
|
|
return DecayEngine(test_config, bucket_mgr)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_dehydrator():
|
|
"""
|
|
Mock Dehydrator that returns deterministic results without any API calls.
|
|
Suitable for integration tests that do not test LLM behaviour.
|
|
"""
|
|
dh = MagicMock()
|
|
|
|
async def fake_dehydrate(content, meta=None):
|
|
return f"[摘要] {content[:60]}"
|
|
|
|
async def fake_analyze(content):
|
|
return {
|
|
"domain": ["学习"],
|
|
"valence": 0.7,
|
|
"arousal": 0.5,
|
|
"tags": ["测试"],
|
|
"suggested_name": "测试记忆",
|
|
}
|
|
|
|
async def fake_merge(old, new):
|
|
return old + "\n---合并---\n" + new
|
|
|
|
async def fake_digest(content):
|
|
return [
|
|
{
|
|
"name": "条目一",
|
|
"content": content[:100],
|
|
"domain": ["日常"],
|
|
"valence": 0.6,
|
|
"arousal": 0.4,
|
|
"tags": ["测试"],
|
|
"importance": 5,
|
|
}
|
|
]
|
|
|
|
dh.dehydrate = AsyncMock(side_effect=fake_dehydrate)
|
|
dh.analyze = AsyncMock(side_effect=fake_analyze)
|
|
dh.merge = AsyncMock(side_effect=fake_merge)
|
|
dh.digest = AsyncMock(side_effect=fake_digest)
|
|
dh.api_available = True
|
|
return dh
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_embedding_engine():
|
|
"""Mock EmbeddingEngine that returns empty results — no network calls."""
|
|
ee = MagicMock()
|
|
ee.enabled = False
|
|
ee.generate_and_store = AsyncMock(return_value=None)
|
|
ee.search_similar = AsyncMock(return_value=[])
|
|
ee.delete_embedding = AsyncMock(return_value=True)
|
|
ee.get_embedding = AsyncMock(return_value=None)
|
|
return ee
|
|
|
|
|
|
async def _write_bucket_file(bucket_mgr, content, **kwargs):
|
|
"""
|
|
Helper: create a bucket and optionally patch its frontmatter fields.
|
|
Accepts extra kwargs like created/last_active/resolved/digested/pinned.
|
|
Returns bucket_id.
|
|
"""
|
|
import frontmatter as fm
|
|
|
|
direct_fields = {
|
|
k: kwargs.pop(k) for k in list(kwargs.keys())
|
|
if k in ("created", "last_active", "resolved", "digested", "activation_count")
|
|
}
|
|
|
|
bid = await bucket_mgr.create(content=content, **kwargs)
|
|
|
|
if direct_fields:
|
|
fpath = bucket_mgr._find_bucket_file(bid)
|
|
post = fm.load(fpath)
|
|
for k, v in direct_fields.items():
|
|
post[k] = v
|
|
with open(fpath, "w", encoding="utf-8") as f:
|
|
f.write(fm.dumps(post))
|
|
|
|
return bid
|