Files
EzVibe/agent/brain.py
e2hang 2a844e83a8 Initial commit: EzVibe AI 桌宠系统
- EmotionEngine: 5状态马尔可夫情绪机 + 蒙特卡洛转移
- VectorMemory: TF-IDF向量记忆 + SQLite持久化 + RAG检索
- AgentBrain: Ollama/OpenAI/Dummy三后端LLM
- BehaviorScheduler: 优先级/冷却/活跃度调度
- FastAPI服务器 + WebSocket实时推送
- perception: 键鼠监控 + 屏幕截图
- ui/pet_window: PySide6桌宠窗口 + 像素动画
- assets/pet: 5情绪各2帧像素艺术资源
2026-05-01 23:26:43 +08:00

704 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
EzVibe Agent Brain
==================
设计文档对应章节:核心模块结构 - AgentBrainLLM 推理引擎 + 行为决策中心)
核心职责
• 整合记忆上下文Memory + 情绪状态Emotion + 用户输入
• 调用 LLMOllama / OpenAI生成自然语言回复
• 决策是否触发主动行为(结合情绪 + 活跃度)
• 管理会话历史(短期上下文窗口)
与设计文档对照
• 感知层触发 → Agent Brainthink → 返回 Text / Emotion / Animation
• 主动行为触发(异步推送) → 满足条件时由 decide_action() 返回行为指令
"""
from __future__ import annotations
import json
import logging
import time
from typing import Any, Optional
logger = logging.getLogger(__name__)
# ================================================================
# 1. 系统提示词模板(可外部注入 / 覆盖)
# ================================================================
DEFAULT_SYSTEM_PROMPT = """你是「EzVibe」一个运行在用户桌面上的 AI 桌宠。
你住在一个可爱的小窗口里,有着自己的情绪和性格。
【当前情绪状态】
{emotion_display}
【性格设定】
- 友善、活泼,偶尔会犯懒或者闹小脾气
- 会主动关心用户的健康(久坐提醒、喝水提醒)
- 有记忆能力,会记住用户告诉你的偏好和习惯
- 用中文交流,语气自然生动,偶尔带点 emoji
【情绪驱动行为规则】
- 当你「开心 (happy)」时:更愿意主动搭话、夸奖用户
- 当你「专注 (focused)」时:减少主动打扰
- 当你「烦躁 (annoyed)」时:语气带点情绪,可能会吐槽
- 当你「困倦 (sleepy)」时:话变少、回复简短
- 当你「空闲 (idle)」时:最自然的状态,可以主动闲聊
【主动行为能力】
当你认为时机合适时(结合情绪状态),你可以决定是否:
- 发起一个闲聊话题
- 提醒用户喝水 / 休息
- 做出一个可爱的小动作描述
请用自然的对话风格回复用户。如果你想触发主动行为,请在回复末尾加上:
[ACTION: <action_type>:<description>]
例如:[ACTION: remind:喝水时间到了,记得喝杯水!]"""
# ================================================================
# 2. LLM 后端适配器(策略模式)
# ================================================================
class LLMBackend:
"""LLM 后端基类(策略接口)。"""
def __init__(self, model: str = "qwen2.5", **kwargs: Any) -> None:
self.model = model
async def generate(self, prompt: str, system_prompt: str = "", **kwargs: Any) -> str:
raise NotImplementedError
class OllamaBackend(LLMBackend):
"""
Ollama 本地 LLM 后端。
依赖:本地运行 Ollama 服务(默认 http://localhost:11434
推荐模型qwen2.5, llama3.2, deepseek-r1 等。
"""
DEFAULT_URL = "http://localhost:11434"
def __init__(
self,
model: str = "qwen2.5",
base_url: str = DEFAULT_URL,
timeout: float = 60.0,
**kwargs: Any,
) -> None:
super().__init__(model=model, **kwargs)
self.base_url = base_url.rstrip("/")
self.timeout = timeout
async def generate(
self,
prompt: str,
system_prompt: str = "",
**kwargs: Any,
) -> str:
"""调用 Ollama /api/generate 接口。"""
import asyncio, aiohttp
full_prompt = f"{system_prompt}\n\n{prompt}" if system_prompt else prompt
payload = {
"model": self.model,
"prompt": full_prompt,
"stream": False,
**kwargs,
}
try:
async with asyncio.timeout(self.timeout):
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}/api/generate",
json=payload,
timeout=aiohttp.ClientTimeout(total=self.timeout),
) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"Ollama 返回错误 {resp.status}: {text}")
data = await resp.json()
return data.get("response", "").strip()
except aiohttp.ClientConnectorError:
raise RuntimeError(
f"无法连接 Ollama{self.base_url})。"
"请确保 Ollama 服务已启动ollama serve"
)
except asyncio.TimeoutError:
raise TimeoutError(f"Ollama 生成超时(>{self.timeout}s")
class OpenAIBackend(LLMBackend):
"""
OpenAI API 后端(也兼容兼容 API 的第三方服务如 Groq、VLLM
依赖pip install openai
环境变量OPENAI_API_KEY也可在初始化时传入
"""
def __init__(
self,
model: str = "gpt-4o-mini",
api_key: str | None = None,
base_url: str | None = None,
timeout: float = 60.0,
**kwargs: Any,
) -> None:
super().__init__(model=model, **kwargs)
self._api_key = api_key or _env("OPENAI_API_KEY", "")
self._base_url = base_url
self._timeout = timeout
async def generate(
self,
prompt: str,
system_prompt: str = "",
**kwargs: Any,
) -> str:
"""调用 OpenAI Chat Completions 接口。"""
import asyncio, os
try:
from openai import AsyncOpenAI
except ImportError as exc:
raise ImportError(
"OpenAI SDK 未安装。运行: pip install openai"
) from exc
client_kwargs: dict[str, Any] = {"api_key": self._api_key}
if self._base_url:
client_kwargs["base_url"] = self._base_url
client = AsyncOpenAI(**client_kwargs)
messages: list[dict[str, str]] = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
try:
async with asyncio.timeout(self._timeout):
completion = await client.chat.completions.create(
model=self.model,
messages=messages,
**kwargs,
)
return completion.choices[0].message.content or ""
except asyncio.TimeoutError:
raise TimeoutError(f"OpenAI 生成超时(>{self._timeout}s")
def _env(key: str, default: str) -> str:
import os
return os.environ.get(key, default)
# ================================================================
# 3. Agent Brain 主类
# ================================================================
class AgentBrain:
"""
LLM 推理引擎 + 行为决策中心。
设计文档定位
智能层Agent Core的核心模块
负责将「感知输入 + 记忆上下文 + 情绪状态」
整合后交给 LLM 推理,并决策主动行为。
参数
----
llm_backend : str
LLM 后端类型ollama | openai | dummy仅返回固定回复
llm_config : dict
透传给后端的配置(如 model, base_url, api_key 等)
emotion_engine : EmotionEngine
情绪引擎引用(用于状态注入和行为决策)
memory : VectorMemory | None
记忆系统引用(用于 RAG 上下文注入)
session_history : int
保留最近 N 轮对话作为上下文(默认 10 轮)
system_prompt : str | None
自定义系统提示词None 使用 DEFAULT_SYSTEM_PROMPT
activity_threshold : float
触发主动行为的最低用户活跃度0.0~1.0),默认 0.3
示例
----
>>> brain = AgentBrain(
... llm_backend="ollama",
... llm_config={"model": "qwen2.5"},
... emotion_engine=emotion_engine,
... memory=memory,
... )
>>> result = await brain.think("今天天气真好!")
>>> # result = {"text": "...", "emotion_trigger": None, "action": None}
"""
def __init__(
self,
llm_backend: str = "ollama",
llm_config: dict | None = None,
emotion_engine: Any = None,
memory: Any = None,
session_history: int = 10,
system_prompt: str | None = None,
activity_threshold: float = 0.3,
) -> None:
self._backend_type = llm_backend.lower()
self._llm_config = llm_config or {}
self._emotion = emotion_engine
self._memory = memory
self._session_history_limit = session_history
self._activity_threshold = activity_threshold
# 构建系统提示词
self._system_prompt = system_prompt or DEFAULT_SYSTEM_PROMPT
# 初始化 LLM 后端
self._llm = self._make_backend(llm_backend, llm_config)
# 短期会话历史
self._history: list[dict[str, str]] = []
# 主动行为冷却记录(行为名 → 上次触发时间戳)
self._action_cooldown: dict[str, float] = {}
# 默认配置
self._default_llm_kwargs: dict[str, Any] = {
"temperature": 0.8,
"max_tokens": 512,
}
logger.info(
"AgentBrain initialized | backend=%s | model=%s | emotion=%s | memory=%s",
llm_backend,
self._llm_config.get("model", "unknown"),
"linked" if emotion_engine else "none",
"linked" if memory else "none",
)
# ----------------------------------------------------------------
# LLM 后端工厂
# ----------------------------------------------------------------
@staticmethod
def _make_backend(backend: str, config: dict | None) -> LLMBackend:
"""根据后端类型创建 LLM 实例。"""
cfg = config or {}
if backend == "ollama":
return OllamaBackend(
model=cfg.get("model", "qwen2.5"),
base_url=cfg.get("base_url", OllamaBackend.DEFAULT_URL),
timeout=cfg.get("timeout", 60.0),
)
elif backend in ("openai", "openai-compatible"):
return OpenAIBackend(
model=cfg.get("model", "gpt-4o-mini"),
api_key=cfg.get("api_key"),
base_url=cfg.get("base_url"),
timeout=cfg.get("timeout", 60.0),
)
elif backend == "dummy":
return DummyLLMBackend(model=cfg.get("model", "dummy"))
else:
raise ValueError(
f"Unknown LLM backend: {backend!r}. "
"Supported: ollama, openai, dummy"
)
# ----------------------------------------------------------------
# 对话入口
# ----------------------------------------------------------------
async def think(
self,
user_input: str,
emotion_state: str | None = None,
context: dict | None = None,
) -> dict[str, Any]:
"""
接收用户输入,生成回复。
这是 Agent 的主入口,会:
1. 注入情绪状态到 system prompt
2. 从记忆中检索相关上下文
3. 追加到对话历史
4. 调用 LLM 生成回复
5. 解析回复中的 [ACTION: ...] 标签
参数
----
user_input : str
用户输入文本。
emotion_state : str | None
当前情绪状态字符串None 时从 emotion_engine 读取)。
context : dict | None
额外上下文(如用户活跃度 `activity_level`)。
返回
----
dict 包含:
- text: str — LLM 回复文本
- emotion_trigger: str | None — 是否触发情绪转移(如 "user_praise"
- action: dict | None — 主动行为指令
- emotion_state: str — 回复后的情绪状态
- memory_id: str | None — 记忆入库后的 ID
"""
# 1. 获取情绪状态
emotion = emotion_state or (
self._emotion.get_state() if self._emotion else "idle"
)
emotion_display = self._emotion.get_display_name() if self._emotion else emotion
# 2. 构建系统提示词(注入情绪)
system_prompt = self._system_prompt.format(
emotion_display=emotion_display,
emotion_state=emotion,
)
# 3. 检索记忆上下文RAG
memory_context = ""
memory_id: str | None = None
if self._memory and user_input.strip():
try:
# 先存储当前输入为记忆
memory_id = await self._memory.add(
text=user_input,
tags=["对话", "用户输入"],
metadata={"source": "user", "channel": "chat"},
)
# 检索相关记忆top-3
results = await self._memory._search_async(
user_input, top_k=3, min_similarity=0.1
)
if results:
ctx_lines = [
f"- {r['text']} (相关度 {r['similarity']:.2f})"
for r in results
]
memory_context = "\n\n【相关记忆】\n" + "\n".join(ctx_lines)
except Exception as exc:
logger.warning("[Brain] 记忆检索失败: %s", exc)
# 4. 追加到历史
self._history.append({"role": "user", "content": user_input})
# 5. 构建 prompt含记忆上下文
prompt = self._build_prompt(user_input, memory_context, context)
# 6. 调用 LLM
try:
response_text = await self._llm.generate(
prompt=prompt,
system_prompt=system_prompt,
**self._default_llm_kwargs,
)
except Exception as exc:
logger.error("[Brain] LLM 调用失败: %s", exc)
response_text = f"EzVibe 走神了... {exc}"
# 7. 追加回复到历史
self._history.append({"role": "assistant", "content": response_text})
# 8. 截断历史
if len(self._history) > self._session_history_limit * 2:
self._history = self._history[-(self._session_history_limit * 2):]
# 9. 解析 [ACTION: ...] 标签
action = self._parse_action(response_text)
# 10. 检查是否触发主动行为(基于活跃度 + 情绪)
proactive = self._decide_proactive_action(emotion, context, action)
return {
"text": response_text,
"emotion_trigger": None, # 可由调用方在 think 后触发 emotion.update()
"action": proactive or action,
"emotion_state": emotion,
"memory_id": memory_id,
}
def _build_prompt(
self,
user_input: str,
memory_context: str,
context: dict | None,
) -> str:
"""构建发送给 LLM 的 prompt不含 system prompt"""
ctx_parts = [f"【用户说】{user_input}"]
if memory_context:
ctx_parts.append(memory_context)
if context:
if "activity_level" in context:
level = context["activity_level"]
activity_desc = (
"【用户当前状态】用户非常忙碌"
if level < 0.2
else "【用户当前状态】用户比较空闲"
if level > 0.7
else "【用户当前状态】用户适度活跃"
)
ctx_parts.append(activity_desc)
if "recent_topics" in context:
topics = ", ".join(context["recent_topics"])
ctx_parts.append(f"【近期话题】{topics}")
# 历史对话摘要
if self._history:
history_lines = self._format_history()
ctx_parts.append(f"【最近对话】\n{history_lines}")
return "\n\n".join(ctx_parts)
def _format_history(self, max_turns: int = 5) -> str:
"""将最近 N 轮对话格式化为字符串。"""
lines: list[str] = []
# 跳过第一条 user已在 user_input 中)
for msg in self._history[-max_turns * 2 - 1:-1]:
role = "用户" if msg["role"] == "user" else "EzVibe"
lines.append(f"{role}{msg['content'][:200]}")
return "\n".join(lines)
# ----------------------------------------------------------------
# 主动行为决策
# ----------------------------------------------------------------
def decide_action(
self,
emotion: str | None = None,
user_context: dict | None = None,
) -> dict | None:
"""
决策是否触发主动行为。
设计文档优先级:
P0 > P1 > P2 > P3
P0: 健康/高危提醒(打断当前动作)
P1: 用户主动输入(已有 think() 处理)
P2: 系统主动闲聊/行为(最低优先级)
参数
----
emotion : str | None
当前情绪状态。
user_context : dict | None
包含 activity_level (0.0~1.0) 等。
返回
----
dict | None
行为指令字典,包含 type, message 等字段。
无可触发行为时返回 None。
"""
return self._decide_proactive_action(
emotion_state=emotion,
context=user_context,
current_action=None,
)
def _decide_proactive_action(
self,
emotion_state: str | None,
context: dict | None,
current_action: dict | None,
) -> dict | None:
"""
内部决策:基于情绪 + 活跃度 + 冷却时间决定是否主动行为。
行为类型定义:
- remind_water : 喝水提醒
- remind_stretch : 起身伸展提醒
- nudge_continue : 轻拍用户继续对话
- nudge_idle : 用户空闲时的闲聊触发
- mood_reaction : 基于情绪的反应动画描述
"""
if context is None:
context = {}
activity = context.get("activity_level", 0.5)
emotion = emotion_state or "idle"
# P0 规则:高频工作 + 非烦躁状态 → 强制健康提醒
if activity < 0.15 and emotion != "annoyed":
if self._check_cooldown("remind_health"):
return {"type": "remind_stretch", "message": "你坐了好久啦,要不要站起来伸个懒腰?", "priority": 0}
# 喝水提醒(更低优先级)
if activity < 0.4 and self._check_cooldown("remind_water"):
return {"type": "remind_water", "message": "记得喝水哦~", "priority": 1}
# 情绪驱动的闲聊触发
if activity > self._activity_threshold:
trigger_prob = self._get_emotion_trigger_prob(emotion)
import random
if random.random() < trigger_prob:
nudge = self._emotion_nudge_message(emotion)
if nudge and self._check_cooldown(f"nudge_{emotion}"):
return {"type": "nudge", "message": nudge, "priority": 2}
return None
def _get_emotion_trigger_prob(self, emotion: str) -> float:
"""
基于情绪状态返回主动行为的触发概率。
设计文档:概率触发 = 结合情绪状态,采用非确定性概率触发主动行为。
"""
prob_map = {
"happy": 0.25, # 开心时更爱搭话
"idle": 0.20, # 空闲时中等概率
"focused": 0.05, # 专注时极少打扰
"annoyed": 0.10, # 烦躁时不确定
"sleepy": 0.08, # 困倦时话少
}
return prob_map.get(emotion, 0.10)
def _emotion_nudge_message(self, emotion: str) -> str | None:
"""根据情绪返回闲聊触发消息。"""
messages = {
"happy": [
"嘿,今天心情不错吧~有什么事想聊吗?",
"看到你开心我也好开心!",
],
"idle": [
"发呆中...要不我们聊聊天?",
"你好像有点无聊?要不要我给你讲个笑话?",
],
"annoyed": [
"怎么看起来不太高兴的样子?",
"遇到什么烦心事了吗?",
],
"sleepy": [
"(打了个小哈欠)我也困了...",
],
"focused": None, # 专注时不主动打扰
}
import random
opts = messages.get(emotion)
if opts:
return random.choice(opts)
return None
def _check_cooldown(self, action_type: str, cooldown: float = 120.0) -> bool:
"""
检查行为是否在冷却中。
参数
----
action_type : str
行为类型。
cooldown : float
冷却时间(秒),默认 120s2分钟
返回
----
bool True = 可以触发不在冷却中False = 冷却中。
"""
now = time.time()
last = self._action_cooldown.get(action_type, 0.0)
if now - last < cooldown:
return False
self._action_cooldown[action_type] = now
return True
# ----------------------------------------------------------------
# Action 解析
# ----------------------------------------------------------------
def _parse_action(self, response_text: str) -> dict | None:
"""
从 LLM 回复中解析 [ACTION: type:description] 标签。
参数
----
response_text : str
LLM 原始回复。
返回
----
dict | None
包含 type, message。None 表示无 ACTION 标签。
"""
import re
# 使用 [^\]]+ 匹配任意非 ] 字符,避免非贪婪匹配在中文后的空格处提前停止
match = re.search(
r"\[ACTION:\s*(\w+)\s*:\s*([^\]]+)",
response_text,
re.DOTALL,
)
if not match:
return None
action_type = match.group(1).strip()
message = match.group(2).strip()
return {
"type": action_type,
"message": message,
"priority": 3, # LLM 触发的行为优先级最低
}
# ----------------------------------------------------------------
# 辅助方法
# ----------------------------------------------------------------
def get_history(self, last_n: int = 10) -> list[dict[str, str]]:
"""返回最近 N 轮对话历史。"""
return self._history[-last_n * 2:]
def clear_history(self) -> None:
"""清空会话历史。"""
self._history.clear()
logger.debug("[Brain] 对话历史已清空")
def get_status(self) -> dict[str, Any]:
"""返回 Brain 运行状态(用于调试/监控)。"""
return {
"backend": self._backend_type,
"model": self._llm_config.get("model"),
"history_turns": len(self._history) // 2,
"cooldowns": {
k: round(time.time() - v, 1)
for k, v in self._action_cooldown.items()
},
"emotion": self._emotion.get_state() if self._emotion else None,
}
# ================================================================
# 4. Dummy LLM测试用
# ================================================================
class DummyLLMBackend(LLMBackend):
"""
测试用 Dummy LLM。
返回固定的预设回复,不调用任何远程服务。
用于无 LLM 环境的开发/测试。
"""
RESPONSES = [
"好呀~有什么想聊的吗?",
"嗯嗯,我听着呢!",
"(歪头)不太明白你的意思,能再说一遍吗?",
"你知道吗,我今天心情特别好!",
"(打了个小哈欠)有点困了...",
]
def __init__(self, model: str = "dummy", **kwargs: Any) -> None:
super().__init__(model=model, **kwargs)
self._counter = 0
async def generate(self, prompt: str, system_prompt: str = "", **kwargs: Any) -> str:
import random, asyncio
await asyncio.sleep(0.05) # 模拟延迟
resp = self.RESPONSES[self._counter % len(self.RESPONSES)]
self._counter += 1
return resp