|
@@ -7,6 +7,8 @@ import os
|
|
|
import cnlunar
|
|
|
from typing import Dict, Any
|
|
|
from config.logger import setup_logging
|
|
|
+from jinjia2 import Template
|
|
|
+
|
|
|
|
|
|
TAG = __name__
|
|
|
|
|
@@ -53,11 +55,153 @@ class PromptManager:
|
|
|
self.last_update_time = 0
|
|
|
|
|
|
# 导入全局缓存管理器
|
|
|
- from core.utils.cathe.manager import cache_manager, CacheType
|
|
|
+ from core.utils.cache.manager import cache_manager, CacheType
|
|
|
|
|
|
self.cache_manager = cache_manager
|
|
|
self.CacheType = CacheType
|
|
|
|
|
|
self._load_base_template()
|
|
|
|
|
|
- def
|
|
|
+ def _load_base_template(self):
|
|
|
+ """加载基础提示词模版"""
|
|
|
+ try:
|
|
|
+ template_path = "agent-base-prompt.txt"
|
|
|
+ cache_key = f"prompt_template:{template_path}"
|
|
|
+
|
|
|
+ # 先从缓存获取
|
|
|
+ cached_template = self.cache_manager.get(self.CacheType.CONFIG, cache_key)
|
|
|
+ if cached_template is not None:
|
|
|
+ self.base_prompt_template = cached_template
|
|
|
+ self.logger.bind(tag=TAG).info(f"成功从缓存中加载基础提示词模板")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 缓存未命中,从文件读取
|
|
|
+ if os.path.exists(template_path):
|
|
|
+ with open(template_path, "r", encoding="utf-8") as f:
|
|
|
+ template_content = f.read()
|
|
|
+
|
|
|
+ # 存入缓存(CONFIG类型默认不自动过期,需要手动失效)
|
|
|
+ self.cache_manager.set(self.CacheType.CONFIG,cache_key,template_content)
|
|
|
+ self.base_prompt_template = template_content
|
|
|
+ self.logger.bind(tag=TAG).info(f"成功从文件加载基础提示词模板")
|
|
|
+ else:
|
|
|
+ self.logger.bind(tag=TAG).error(f"未找到agent-base-prompt.txt文件")
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.bind(tag=TAG).error(f"加载基础提示词模板失败: {str(e)}")
|
|
|
+
|
|
|
+ def get_quick_prompt(self, user_prompt: str, device_id: str = None) -> str:
|
|
|
+ """快速获取系统提示词(使用用户配置)"""
|
|
|
+ device_cache_key = f"device_prompt:{device_id}"
|
|
|
+ cached_device_prompt = self.cache_manager.get(self.CacheType.DEVICE_PROMPT, device_cache_key)
|
|
|
+ if cached_device_prompt is not None:
|
|
|
+ return cached_device_prompt
|
|
|
+ else:
|
|
|
+ self.logger.bind(tag=TAG).info(f"未找到设备{device_id}的提示词缓存,使用传入的提示词")
|
|
|
+
|
|
|
+ # 使用传入的提示词并缓存(如果有设备ID)
|
|
|
+ if device_id:
|
|
|
+ device_cache_key = f"device_prompt:{device_id}"
|
|
|
+ self.cache_manager.set(self.CacheType.CONFIG, device_cache_key, user_prompt)
|
|
|
+ self.logger.bind(tag=TAG).info(f"缓存设备{device_id}的提示词")
|
|
|
+
|
|
|
+ self.logger.bind(tag=TAG).info(f"使用传入的提示词")
|
|
|
+ return user_prompt
|
|
|
+
|
|
|
+ def _get_current_time_info(self) -> tuple:
|
|
|
+ """获取当前时间信息"""
|
|
|
+ from datetime import datetime
|
|
|
+
|
|
|
+ now = datetime.now()
|
|
|
+ current_time = now.strftime("%H:%M")
|
|
|
+ today_date = now.strftime("%Y-%m-%d")
|
|
|
+ today_weekday = WEEKDAY_MAP[now.strftime("%A")]
|
|
|
+ today_lunar = cnlunar.Lunar(now, godType="8char")
|
|
|
+ lunar_date = "%s年%s%s\n" % (
|
|
|
+ today_lunar.lunarYearCn,
|
|
|
+ today_lunar.lunarMonthCn[:-1],
|
|
|
+ today_lunar.lunarDayCn,
|
|
|
+ )
|
|
|
+
|
|
|
+ return current_time, today_date, today_weekday, lunar_date
|
|
|
+
|
|
|
+ def _get_location_info(self, client_ip: str) -> str:
|
|
|
+ """获取位置信息"""
|
|
|
+ try:
|
|
|
+ # 先从缓存中获取
|
|
|
+ cached_location = self.cache_manager.get(self.CacheType.LOCATION, client_ip)
|
|
|
+ if cached_location is not None:
|
|
|
+ return cached_location
|
|
|
+
|
|
|
+ from core.utils.util import get_ip_info
|
|
|
+
|
|
|
+ ip_info = get_ip_info(client_ip, self.logger)
|
|
|
+ city = ip_info.get("city", "未知城市")
|
|
|
+ location = f"{city}"
|
|
|
+
|
|
|
+ self.cache_manager.set(self.CacheType.LOCATION, client_ip, location)
|
|
|
+ return location
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.bind(tag=TAG).error(f"获取位置信息失败: {str(e)}")
|
|
|
+ return "未知位置"
|
|
|
+
|
|
|
+ def _get_weather_info(self, conn, location: str) -> str:
|
|
|
+ """获取天气信息"""
|
|
|
+ try:
|
|
|
+ cached_weather = self.cache_manager.get(self.CacheType.WEATHER, location)
|
|
|
+ if cached_weather is not None:
|
|
|
+ return cached_weather
|
|
|
+
|
|
|
+ from plugins_func.functions.get_weather import get_weather
|
|
|
+ from plugins_func.register import ActionResponse
|
|
|
+
|
|
|
+ result = get_weather(conn, location=location, lang="zn_CN")
|
|
|
+ if isinstance(result, ActionResponse):
|
|
|
+ weather_report = result.result
|
|
|
+ self.cache_manager.set(self.CacheType.WEATHER, location, weather_report)
|
|
|
+ return weather_report
|
|
|
+ return "获取天气信息失败"
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.bind(tag=TAG).error(f"获取天气信息失败: {str(e)}")
|
|
|
+ return "获取天气信息失败"
|
|
|
+
|
|
|
+ def update_context_info(self, conn, client_ip: str):
|
|
|
+ """同步更新上下文信息"""
|
|
|
+ try:
|
|
|
+ local_address = self._get_location_info(client_ip)
|
|
|
+ self._get_weather_info(conn, local_address)
|
|
|
+ self.logger.bind(tag=TAG).info(f"更新上下文信息成功")
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.bind(tag=TAG).error(f"更新上下文信息失败: {str(e)}")
|
|
|
+
|
|
|
+ def build_enhanced_prompt(self, user_prompt: str, device_id: str = None, client_ip: str = None):
|
|
|
+ """构建增强提示词"""
|
|
|
+ if not self.base_prompt_template:
|
|
|
+ return user_prompt
|
|
|
+
|
|
|
+ try:
|
|
|
+ current_time, today_date, today_weekday, lunar_date = self._get_current_time_info()
|
|
|
+ local_address = ""
|
|
|
+ weather_info = ""
|
|
|
+ if client_ip:
|
|
|
+ local_address = self.cache_manager.get(self.CacheType.LOCATION, client_ip) or ""
|
|
|
+ if local_address:
|
|
|
+ weather_info = self.cache_manager.get(self.CacheType.WEATHER, local_address) or ""
|
|
|
+ template = Template(self.base_prompt_template)
|
|
|
+ enhance_prompt = template.render(
|
|
|
+ base_prompt=user_prompt,
|
|
|
+ current_time=current_time,
|
|
|
+ today_date=today_date,
|
|
|
+ today_weekday=today_weekday,
|
|
|
+ lunar_date=lunar_date,
|
|
|
+ local_address=local_address,
|
|
|
+ weather_info=weather_info,
|
|
|
+ emojilist=EMOJI_List,
|
|
|
+ )
|
|
|
+ device_cache_key = f"device_prompt:{device_id}"
|
|
|
+ self.cache_manager.set(self.CacheType.DEVICE_PROMPT, device_cache_key, enhance_prompt)
|
|
|
+ self.logger.bind(tag=TAG).info(f"构建增强提示词成功")
|
|
|
+ return enhance_prompt
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.bind(tag=TAG).error(f"构建增强提示词失败: {str(e)}")
|
|
|
+ return user_prompt
|