Przeglądaj źródła

8.1-3缓存设置

Zzcoded 2 dni temu
rodzic
commit
9da664b418

+ 52 - 0
main/server/core/utils/cathe/config.py

@@ -1,3 +1,55 @@
 """
 缓存配置管理
 """
+
+from enum import Enum
+from typing import Dict, Any, Optional
+from dataclasses import dataclass
+from .strategies import CacheStrategy
+
+class CacheType(Enum):
+    """缓存类型枚举"""
+    LOCATION = "location"
+    WEATHER = "weather"
+    LUNAR = "lunar"
+    INTENT = "intent"
+    CONFIG = "config"
+    DEVICE_PROMPT = "device_prompt"
+
+
+@dataclass
+class CacheConfig:
+    """缓存配置类"""
+
+    strategy: CacheStrategy = CacheStrategy.TTL
+    ttl: Optional[float] = 300  # 默认5分钟
+    max_size: Optional[int] = 1000  # 默认最大1000条
+    cleanup_interval: float = 60  # 清理间隔(秒)
+
+    @classmethod
+    def for_type(cls, cache_type: CacheType) -> "CacheConfig":
+        """根据缓存类型返回预设配置"""
+        configs = {
+            CacheType.LOCATION: cls(
+                strategy=CacheStrategy.TTL, ttl=None, max_size=1000  # 手动失效
+            ),
+            CacheType.IP_INFO: cls(
+                strategy=CacheStrategy.TTL, ttl=86400, max_size=1000  # 24小时
+            ),
+            CacheType.WEATHER: cls(
+                strategy=CacheStrategy.TTL, ttl=28800, max_size=1000  # 8小时
+            ),
+            CacheType.LUNAR: cls(
+                strategy=CacheStrategy.TTL, ttl=2592000, max_size=365  # 30天过期
+            ),
+            CacheType.INTENT: cls(
+                strategy=CacheStrategy.TTL_LRU, ttl=600, max_size=1000  # 10分钟
+            ),
+            CacheType.CONFIG: cls(
+                strategy=CacheStrategy.FIXED_SIZE, ttl=None, max_size=20  # 手动失效
+            ),
+            CacheType.DEVICE_PROMPT: cls(
+                strategy=CacheStrategy.TTL, ttl=None, max_size=1000  # 手动失效
+            ),
+        }
+        return configs.get(cache_type, cls())

+ 212 - 0
main/server/core/utils/cathe/manager.py

@@ -0,0 +1,212 @@
+"""
+全局缓存管理器
+"""
+
+import time
+import threading
+from typing import Any, Optional, Dict
+from collections import OrderedDict
+from .strategies import CacheStrategy, CacheEntry
+from .config import CacheType, CacheConfig
+
+class GlobalCacheManager:
+    """全局缓存管理器"""
+
+    def __init__(self):
+        self._logger = None
+        self._caches: Dict[str, Dict[str, CacheEntry]] = {}
+        self._configs: Dict[str, CacheConfig] = {}
+        self._locks = Dict[str, threading.Lock] = {}
+        self._global_lock = threading.RLock()
+        self._last_cleanup = time.time()
+        self._stats = {"hits": 0, "misses": 0, "evictions": 0, "cleanups": 0}
+
+    @property
+    def logger(self):
+        """延迟初始化 logger 以避免循环导入"""
+        if self._logger is None:
+            from config.logger import setup_logger
+            self._logger = setup_logger()
+        return self._logger
+    
+    def _get_cache_name(self, cache_type: CacheType, namespace: str) -> str:
+        """生成缓存名称"""
+        if namespace:
+            return f"{cache_type.value}:{namespace}"
+        return cache_type.value
+    
+    def _get_or_create_cache(self, cache_name: str, config: CacheConfig) -> Dict[str, CacheEntry]:
+        """获取或创建缓存"""
+        with self._global_lock:
+            if cache_name not in self._caches:
+                self._caches[cache_name] = {
+                    OrderedDict()
+                    if config.strategy in [CacheStrategy.LRU, CacheStrategy.TTL_LRU]
+                    else {}
+                }
+                self._configs[cache_name] = config
+                self._locks[cache_name] = threading.RLock()
+            return self._caches[cache_name]
+        
+    def set(
+        self,
+        cache_type: CacheType,
+        key: str,
+        value: Any,
+        ttl: Optional[float] = None,
+        namespace: str = "",
+    ) -> None:
+        """设置缓存值"""
+        cache_name = self._get_cache_name(cache_type, namespace)
+        config = self._configs.get(cache_name) or CacheConfig.for_type(cache_type)
+        cache = self._get_or_create_cache(cache_name, config)
+
+        # 使用配置的TTL或传入的TTL
+        effective_ttl = ttl if ttl is not None else config.ttl
+
+        with self._locks[cache_name]:
+            # 创建缓存条目
+            entry = CacheEntry(value=value, timestamp=time.time(), ttl=effective_ttl)
+
+            # 处理不同策略
+            if config.strategy in [CacheStrategy.LRU, CacheStrategy.TTL_LRU]:
+                # LRU策略:如果已存在则移动到末尾
+                if key in cache:
+                    del cache[key]
+                cache[key] = entry
+
+                # 检查大小限制
+                if config.max_size and len(cache) > config.max_size:
+                    # 移除最旧的条目
+                    oldest_key = next(iter(cache))
+                    del cache[oldest_key]
+                    self._stats["evictions"] += 1
+
+            else:
+                cache[key] = entry
+
+                # 检查大小限制
+                if config.max_size and len(cache) > config.max_size:
+                    # 简单策略:随机移除一个条目
+                    victim_key = next(iter(cache))
+                    del cache[victim_key]
+                    self._stats["evictions"] += 1
+
+        # 定期清理过期条目
+        self._maybe_cleanup(cache_name)
+
+    def get(
+        self, cache_type: CacheType, key: str, namespace: str = ""
+    ) -> Optional[Any]:
+        """获取缓存值"""
+        cache_name = self._get_cache_name(cache_type, namespace)
+
+        if cache_name not in self._caches:
+            self._stats["misses"] += 1
+            return None
+
+        cache = self._caches[cache_name]
+        config = self._configs[cache_name]
+
+        with self._locks[cache_name]:
+            if key not in cache:
+                self._stats["misses"] += 1
+                return None
+
+            entry = cache[key]
+
+            # 检查过期
+            if entry.is_expired():
+                del cache[key]
+                self._stats["misses"] += 1
+                return None
+
+            # 更新访问信息
+            entry.touch()
+
+            # LRU策略:移动到末尾
+            if config.strategy in [CacheStrategy.LRU, CacheStrategy.TTL_LRU]:
+                del cache[key]
+                cache[key] = entry
+
+            self._stats["hits"] += 1
+            return entry.value
+
+    def delete(self, cache_type: CacheType, key: str, namespace: str = "") -> bool:
+        """删除缓存条目"""
+        cache_name = self._get_cache_name(cache_type, namespace)
+
+        if cache_name not in self._caches:
+            return False
+
+        cache = self._caches[cache_name]
+
+        with self._locks[cache_name]:
+            if key in cache:
+                del cache[key]
+                return True
+            return False
+
+    def clear(self, cache_type: CacheType, namespace: str = "") -> None:
+        """清空指定缓存"""
+        cache_name = self._get_cache_name(cache_type, namespace)
+
+        if cache_name not in self._caches:
+            return
+
+        with self._locks[cache_name]:
+            self._caches[cache_name].clear()
+
+    def invalidate_pattern(
+        self, cache_type: CacheType, pattern: str, namespace: str = ""
+    ) -> int:
+        """按模式失效缓存条目"""
+        cache_name = self._get_cache_name(cache_type, namespace)
+
+        if cache_name not in self._caches:
+            return 0
+
+        cache = self._caches[cache_name]
+        deleted_count = 0
+
+        with self._locks[cache_name]:
+            keys_to_delete = [key for key in cache.keys() if pattern in key]
+            for key in keys_to_delete:
+                del cache[key]
+                deleted_count += 1
+
+        return deleted_count
+
+    def _cleanup_expired(self, cache_name: str) -> int:
+        """清理过期条目"""
+        if cache_name not in self._caches:
+            return 0
+
+        cache = self._caches[cache_name]
+        deleted_count = 0
+
+        with self._locks[cache_name]:
+            expired_keys = [key for key, entry in cache.items() if entry.is_expired()]
+            for key in expired_keys:
+                del cache[key]
+                deleted_count += 1
+
+        return deleted_count
+
+    def _maybe_cleanup(self, cache_name: str):
+        """定期清理检查"""
+        config = self._configs.get(cache_name)
+        if not config:
+            return
+
+        now = time.time()
+        if now - self._last_cleanup > config.cleanup_interval:
+            self._last_cleanup = now
+            deleted = self._cleanup_expired(cache_name)
+            if deleted > 0:
+                self._stats["cleanups"] += 1
+                self.logger.debug(f"清理缓存 {cache_name}: 删除 {deleted} 个过期条目")
+
+
+# 创建全局缓存管理器实例
+cache_manager = GlobalCacheManager()

+ 41 - 0
main/server/core/utils/cathe/strategies.py

@@ -0,0 +1,41 @@
+"""
+缓存策略和数据结构定义
+"""
+
+import time
+from enum import Enum
+from typing import Any, Optional
+from dataclasses import dataclass
+
+class CacheStrategy(Enum):
+    """缓存策略枚举"""
+
+    TTL = "ttl"  # 基于时间过期
+    LRU = "lru"  # 最近最少使用
+    FIXED_SIZE = "fixed_size"  # 固定大小
+    TTL_LRU = "ttl_lru"  # TTL+LRU混合策略
+
+
+@dataclass
+class CacheEntry:
+    """缓存项数据结构"""
+
+    value: Any
+    timestamp: float
+    ttl: Optional[float] = None   # 生存时间(秒)
+    last_access: float = None
+    access_count: int = 0
+    
+    def __post_init__(self):
+        if self.last_access is None:
+            self.last_access = self.timestamp
+
+    def is_expired(self) -> bool:
+        """检查是否过期"""
+        if self.ttl is None:
+            return False
+        return time.time() - self.timestamp > self.ttl
+    def touch(self):
+        """更新访问时间和次数"""
+        self.last_access = time.time()
+        self.access_count += 1