""" EzVibe Agent Brain ================== 设计文档对应章节:核心模块结构 - AgentBrain(LLM 推理引擎 + 行为决策中心) 核心职责 • 整合记忆上下文(Memory) + 情绪状态(Emotion) + 用户输入 • 调用 LLM(Ollama / OpenAI)生成自然语言回复 • 决策是否触发主动行为(结合情绪 + 活跃度) • 管理会话历史(短期上下文窗口) 与设计文档对照 • 感知层触发 → Agent Brain(think) → 返回 Text / Emotion / Animation • 主动行为触发(异步推送) → 满足条件时由 decide_action() 返回行为指令 """ from __future__ import annotations import json import logging import time from typing import Any, Optional logger = logging.getLogger(__name__) # ================================================================ # 1. 系统提示词模板(可外部注入 / 覆盖) # ================================================================ DEFAULT_SYSTEM_PROMPT = """你是「EzVibe」,一个运行在用户桌面上的 AI 桌宠。 你住在一个可爱的小窗口里,有着自己的情绪和性格。 【当前情绪状态】 {emotion_display} 【性格设定】 - 友善、活泼,偶尔会犯懒或者闹小脾气 - 会主动关心用户的健康(久坐提醒、喝水提醒) - 有记忆能力,会记住用户告诉你的偏好和习惯 - 用中文交流,语气自然生动,偶尔带点 emoji 【情绪驱动行为规则】 - 当你「开心 (happy)」时:更愿意主动搭话、夸奖用户 - 当你「专注 (focused)」时:减少主动打扰 - 当你「烦躁 (annoyed)」时:语气带点情绪,可能会吐槽 - 当你「困倦 (sleepy)」时:话变少、回复简短 - 当你「空闲 (idle)」时:最自然的状态,可以主动闲聊 【主动行为能力】 当你认为时机合适时(结合情绪状态),你可以决定是否: - 发起一个闲聊话题 - 提醒用户喝水 / 休息 - 做出一个可爱的小动作描述 请用自然的对话风格回复用户。如果你想触发主动行为,请在回复末尾加上: [ACTION: :] 例如:[ACTION: remind:喝水时间到了,记得喝杯水!]""" # ================================================================ # 2. LLM 后端适配器(策略模式) # ================================================================ class LLMBackend: """LLM 后端基类(策略接口)。""" def __init__(self, model: str = "qwen2.5", **kwargs: Any) -> None: self.model = model async def generate(self, prompt: str, system_prompt: str = "", **kwargs: Any) -> str: raise NotImplementedError class OllamaBackend(LLMBackend): """ Ollama 本地 LLM 后端。 依赖:本地运行 Ollama 服务(默认 http://localhost:11434)。 推荐模型:qwen2.5, llama3.2, deepseek-r1 等。 """ DEFAULT_URL = "http://localhost:11434" def __init__( self, model: str = "qwen2.5", base_url: str = DEFAULT_URL, timeout: float = 60.0, **kwargs: Any, ) -> None: super().__init__(model=model, **kwargs) self.base_url = base_url.rstrip("/") self.timeout = timeout async def generate( self, prompt: str, system_prompt: str = "", **kwargs: Any, ) -> str: """调用 Ollama /api/generate 接口。""" import asyncio, aiohttp full_prompt = f"{system_prompt}\n\n{prompt}" if system_prompt else prompt payload = { "model": self.model, "prompt": full_prompt, "stream": False, **kwargs, } try: async with asyncio.timeout(self.timeout): async with aiohttp.ClientSession() as session: async with session.post( f"{self.base_url}/api/generate", json=payload, timeout=aiohttp.ClientTimeout(total=self.timeout), ) as resp: if resp.status != 200: text = await resp.text() raise RuntimeError(f"Ollama 返回错误 {resp.status}: {text}") data = await resp.json() return data.get("response", "").strip() except aiohttp.ClientConnectorError: raise RuntimeError( f"无法连接 Ollama({self.base_url})。" "请确保 Ollama 服务已启动:ollama serve" ) except asyncio.TimeoutError: raise TimeoutError(f"Ollama 生成超时(>{self.timeout}s)") class OpenAIBackend(LLMBackend): """ OpenAI API 后端(也兼容兼容 API 的第三方服务如 Groq、VLLM)。 依赖:pip install openai 环境变量:OPENAI_API_KEY(也可在初始化时传入) """ def __init__( self, model: str = "gpt-4o-mini", api_key: str | None = None, base_url: str | None = None, timeout: float = 60.0, **kwargs: Any, ) -> None: super().__init__(model=model, **kwargs) self._api_key = api_key or _env("OPENAI_API_KEY", "") self._base_url = base_url self._timeout = timeout async def generate( self, prompt: str, system_prompt: str = "", **kwargs: Any, ) -> str: """调用 OpenAI Chat Completions 接口。""" import asyncio, os try: from openai import AsyncOpenAI except ImportError as exc: raise ImportError( "OpenAI SDK 未安装。运行: pip install openai" ) from exc client_kwargs: dict[str, Any] = {"api_key": self._api_key} if self._base_url: client_kwargs["base_url"] = self._base_url client = AsyncOpenAI(**client_kwargs) messages: list[dict[str, str]] = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) try: async with asyncio.timeout(self._timeout): completion = await client.chat.completions.create( model=self.model, messages=messages, **kwargs, ) return completion.choices[0].message.content or "" except asyncio.TimeoutError: raise TimeoutError(f"OpenAI 生成超时(>{self._timeout}s)") def _env(key: str, default: str) -> str: import os return os.environ.get(key, default) # ================================================================ # 3. Agent Brain 主类 # ================================================================ class AgentBrain: """ LLM 推理引擎 + 行为决策中心。 设计文档定位 智能层(Agent Core)的核心模块, 负责将「感知输入 + 记忆上下文 + 情绪状态」 整合后交给 LLM 推理,并决策主动行为。 参数 ---- llm_backend : str LLM 后端类型:ollama | openai | dummy(仅返回固定回复) llm_config : dict 透传给后端的配置(如 model, base_url, api_key 等) emotion_engine : EmotionEngine 情绪引擎引用(用于状态注入和行为决策) memory : VectorMemory | None 记忆系统引用(用于 RAG 上下文注入) session_history : int 保留最近 N 轮对话作为上下文(默认 10 轮) system_prompt : str | None 自定义系统提示词(None 使用 DEFAULT_SYSTEM_PROMPT) activity_threshold : float 触发主动行为的最低用户活跃度(0.0~1.0),默认 0.3 示例 ---- >>> brain = AgentBrain( ... llm_backend="ollama", ... llm_config={"model": "qwen2.5"}, ... emotion_engine=emotion_engine, ... memory=memory, ... ) >>> result = await brain.think("今天天气真好!") >>> # result = {"text": "...", "emotion_trigger": None, "action": None} """ def __init__( self, llm_backend: str = "ollama", llm_config: dict | None = None, emotion_engine: Any = None, memory: Any = None, session_history: int = 10, system_prompt: str | None = None, activity_threshold: float = 0.3, ) -> None: self._backend_type = llm_backend.lower() self._llm_config = llm_config or {} self._emotion = emotion_engine self._memory = memory self._session_history_limit = session_history self._activity_threshold = activity_threshold # 构建系统提示词 self._system_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT # 初始化 LLM 后端 self._llm = self._make_backend(llm_backend, llm_config) # 短期会话历史 self._history: list[dict[str, str]] = [] # 主动行为冷却记录(行为名 → 上次触发时间戳) self._action_cooldown: dict[str, float] = {} # 默认配置 self._default_llm_kwargs: dict[str, Any] = { "temperature": 0.8, "max_tokens": 512, } logger.info( "AgentBrain initialized | backend=%s | model=%s | emotion=%s | memory=%s", llm_backend, self._llm_config.get("model", "unknown"), "linked" if emotion_engine else "none", "linked" if memory else "none", ) # ---------------------------------------------------------------- # LLM 后端工厂 # ---------------------------------------------------------------- @staticmethod def _make_backend(backend: str, config: dict | None) -> LLMBackend: """根据后端类型创建 LLM 实例。""" cfg = config or {} if backend == "ollama": return OllamaBackend( model=cfg.get("model", "qwen2.5"), base_url=cfg.get("base_url", OllamaBackend.DEFAULT_URL), timeout=cfg.get("timeout", 60.0), ) elif backend in ("openai", "openai-compatible"): return OpenAIBackend( model=cfg.get("model", "gpt-4o-mini"), api_key=cfg.get("api_key"), base_url=cfg.get("base_url"), timeout=cfg.get("timeout", 60.0), ) elif backend == "dummy": return DummyLLMBackend(model=cfg.get("model", "dummy")) else: raise ValueError( f"Unknown LLM backend: {backend!r}. " "Supported: ollama, openai, dummy" ) # ---------------------------------------------------------------- # 对话入口 # ---------------------------------------------------------------- async def think( self, user_input: str, emotion_state: str | None = None, context: dict | None = None, ) -> dict[str, Any]: """ 接收用户输入,生成回复。 这是 Agent 的主入口,会: 1. 注入情绪状态到 system prompt 2. 从记忆中检索相关上下文 3. 追加到对话历史 4. 调用 LLM 生成回复 5. 解析回复中的 [ACTION: ...] 标签 参数 ---- user_input : str 用户输入文本。 emotion_state : str | None 当前情绪状态字符串(None 时从 emotion_engine 读取)。 context : dict | None 额外上下文(如用户活跃度 `activity_level`)。 返回 ---- dict 包含: - text: str — LLM 回复文本 - emotion_trigger: str | None — 是否触发情绪转移(如 "user_praise") - action: dict | None — 主动行为指令 - emotion_state: str — 回复后的情绪状态 - memory_id: str | None — 记忆入库后的 ID """ # 1. 获取情绪状态 emotion = emotion_state or ( self._emotion.get_state() if self._emotion else "idle" ) emotion_display = self._emotion.get_display_name() if self._emotion else emotion # 2. 构建系统提示词(注入情绪) system_prompt = self._system_prompt.format( emotion_display=emotion_display, emotion_state=emotion, ) # 3. 检索记忆上下文(RAG) memory_context = "" memory_id: str | None = None if self._memory and user_input.strip(): try: # 先存储当前输入为记忆 memory_id = await self._memory.add( text=user_input, tags=["对话", "用户输入"], metadata={"source": "user", "channel": "chat"}, ) # 检索相关记忆(top-3) results = await self._memory._search_async( user_input, top_k=3, min_similarity=0.1 ) if results: ctx_lines = [ f"- {r['text']} (相关度 {r['similarity']:.2f})" for r in results ] memory_context = "\n\n【相关记忆】\n" + "\n".join(ctx_lines) except Exception as exc: logger.warning("[Brain] 记忆检索失败: %s", exc) # 4. 追加到历史 self._history.append({"role": "user", "content": user_input}) # 5. 构建 prompt(含记忆上下文) prompt = self._build_prompt(user_input, memory_context, context) # 6. 调用 LLM try: response_text = await self._llm.generate( prompt=prompt, system_prompt=system_prompt, **self._default_llm_kwargs, ) except Exception as exc: logger.error("[Brain] LLM 调用失败: %s", exc) response_text = f"(EzVibe 走神了... {exc})" # 7. 追加回复到历史 self._history.append({"role": "assistant", "content": response_text}) # 8. 截断历史 if len(self._history) > self._session_history_limit * 2: self._history = self._history[-(self._session_history_limit * 2):] # 9. 解析 [ACTION: ...] 标签 action = self._parse_action(response_text) # 10. 检查是否触发主动行为(基于活跃度 + 情绪) proactive = self._decide_proactive_action(emotion, context, action) return { "text": response_text, "emotion_trigger": None, # 可由调用方在 think 后触发 emotion.update() "action": proactive or action, "emotion_state": emotion, "memory_id": memory_id, } def _build_prompt( self, user_input: str, memory_context: str, context: dict | None, ) -> str: """构建发送给 LLM 的 prompt(不含 system prompt)。""" ctx_parts = [f"【用户说】{user_input}"] if memory_context: ctx_parts.append(memory_context) if context: if "activity_level" in context: level = context["activity_level"] activity_desc = ( "【用户当前状态】用户非常忙碌" if level < 0.2 else "【用户当前状态】用户比较空闲" if level > 0.7 else "【用户当前状态】用户适度活跃" ) ctx_parts.append(activity_desc) if "recent_topics" in context: topics = ", ".join(context["recent_topics"]) ctx_parts.append(f"【近期话题】{topics}") # 历史对话摘要 if self._history: history_lines = self._format_history() ctx_parts.append(f"【最近对话】\n{history_lines}") return "\n\n".join(ctx_parts) def _format_history(self, max_turns: int = 5) -> str: """将最近 N 轮对话格式化为字符串。""" lines: list[str] = [] # 跳过第一条 user(已在 user_input 中) for msg in self._history[-max_turns * 2 - 1:-1]: role = "用户" if msg["role"] == "user" else "EzVibe" lines.append(f"{role}:{msg['content'][:200]}") return "\n".join(lines) # ---------------------------------------------------------------- # 主动行为决策 # ---------------------------------------------------------------- def decide_action( self, emotion: str | None = None, user_context: dict | None = None, ) -> dict | None: """ 决策是否触发主动行为。 设计文档优先级: P0 > P1 > P2 > P3 P0: 健康/高危提醒(打断当前动作) P1: 用户主动输入(已有 think() 处理) P2: 系统主动闲聊/行为(最低优先级) 参数 ---- emotion : str | None 当前情绪状态。 user_context : dict | None 包含 activity_level (0.0~1.0) 等。 返回 ---- dict | None 行为指令字典,包含 type, message 等字段。 无可触发行为时返回 None。 """ return self._decide_proactive_action( emotion_state=emotion, context=user_context, current_action=None, ) def _decide_proactive_action( self, emotion_state: str | None, context: dict | None, current_action: dict | None, ) -> dict | None: """ 内部决策:基于情绪 + 活跃度 + 冷却时间决定是否主动行为。 行为类型定义: - remind_water : 喝水提醒 - remind_stretch : 起身伸展提醒 - nudge_continue : 轻拍用户继续对话 - nudge_idle : 用户空闲时的闲聊触发 - mood_reaction : 基于情绪的反应动画描述 """ if context is None: context = {} activity = context.get("activity_level", 0.5) emotion = emotion_state or "idle" # P0 规则:高频工作 + 非烦躁状态 → 强制健康提醒 if activity < 0.15 and emotion != "annoyed": if self._check_cooldown("remind_health"): return {"type": "remind_stretch", "message": "你坐了好久啦,要不要站起来伸个懒腰?", "priority": 0} # 喝水提醒(更低优先级) if activity < 0.4 and self._check_cooldown("remind_water"): return {"type": "remind_water", "message": "记得喝水哦~", "priority": 1} # 情绪驱动的闲聊触发 if activity > self._activity_threshold: trigger_prob = self._get_emotion_trigger_prob(emotion) import random if random.random() < trigger_prob: nudge = self._emotion_nudge_message(emotion) if nudge and self._check_cooldown(f"nudge_{emotion}"): return {"type": "nudge", "message": nudge, "priority": 2} return None def _get_emotion_trigger_prob(self, emotion: str) -> float: """ 基于情绪状态返回主动行为的触发概率。 设计文档:概率触发 = 结合情绪状态,采用非确定性概率触发主动行为。 """ prob_map = { "happy": 0.25, # 开心时更爱搭话 "idle": 0.20, # 空闲时中等概率 "focused": 0.05, # 专注时极少打扰 "annoyed": 0.10, # 烦躁时不确定 "sleepy": 0.08, # 困倦时话少 } return prob_map.get(emotion, 0.10) def _emotion_nudge_message(self, emotion: str) -> str | None: """根据情绪返回闲聊触发消息。""" messages = { "happy": [ "嘿,今天心情不错吧~有什么事想聊吗?", "看到你开心我也好开心!", ], "idle": [ "发呆中...要不我们聊聊天?", "你好像有点无聊?要不要我给你讲个笑话?", ], "annoyed": [ "怎么看起来不太高兴的样子?", "遇到什么烦心事了吗?", ], "sleepy": [ "(打了个小哈欠)我也困了...", ], "focused": None, # 专注时不主动打扰 } import random opts = messages.get(emotion) if opts: return random.choice(opts) return None def _check_cooldown(self, action_type: str, cooldown: float = 120.0) -> bool: """ 检查行为是否在冷却中。 参数 ---- action_type : str 行为类型。 cooldown : float 冷却时间(秒),默认 120s(2分钟)。 返回 ---- bool True = 可以触发(不在冷却中);False = 冷却中。 """ now = time.time() last = self._action_cooldown.get(action_type, 0.0) if now - last < cooldown: return False self._action_cooldown[action_type] = now return True # ---------------------------------------------------------------- # Action 解析 # ---------------------------------------------------------------- def _parse_action(self, response_text: str) -> dict | None: """ 从 LLM 回复中解析 [ACTION: type:description] 标签。 参数 ---- response_text : str LLM 原始回复。 返回 ---- dict | None 包含 type, message。None 表示无 ACTION 标签。 """ import re # 使用 [^\]]+ 匹配任意非 ] 字符,避免非贪婪匹配在中文后的空格处提前停止 match = re.search( r"\[ACTION:\s*(\w+)\s*:\s*([^\]]+)", response_text, re.DOTALL, ) if not match: return None action_type = match.group(1).strip() message = match.group(2).strip() return { "type": action_type, "message": message, "priority": 3, # LLM 触发的行为优先级最低 } # ---------------------------------------------------------------- # 辅助方法 # ---------------------------------------------------------------- def get_history(self, last_n: int = 10) -> list[dict[str, str]]: """返回最近 N 轮对话历史。""" return self._history[-last_n * 2:] def clear_history(self) -> None: """清空会话历史。""" self._history.clear() logger.debug("[Brain] 对话历史已清空") def get_status(self) -> dict[str, Any]: """返回 Brain 运行状态(用于调试/监控)。""" return { "backend": self._backend_type, "model": self._llm_config.get("model"), "history_turns": len(self._history) // 2, "cooldowns": { k: round(time.time() - v, 1) for k, v in self._action_cooldown.items() }, "emotion": self._emotion.get_state() if self._emotion else None, } # ================================================================ # 4. Dummy LLM(测试用) # ================================================================ class DummyLLMBackend(LLMBackend): """ 测试用 Dummy LLM。 返回固定的预设回复,不调用任何远程服务。 用于无 LLM 环境的开发/测试。 """ RESPONSES = [ "好呀~有什么想聊的吗?", "嗯嗯,我听着呢!", "(歪头)不太明白你的意思,能再说一遍吗?", "你知道吗,我今天心情特别好!", "(打了个小哈欠)有点困了...", ] def __init__(self, model: str = "dummy", **kwargs: Any) -> None: super().__init__(model=model, **kwargs) self._counter = 0 async def generate(self, prompt: str, system_prompt: str = "", **kwargs: Any) -> str: import random, asyncio await asyncio.sleep(0.05) # 模拟延迟 resp = self.RESPONSES[self._counter % len(self.RESPONSES)] self._counter += 1 return resp