Files
EzVibe/agent/scheduler.py
e2hang 2a844e83a8 Initial commit: EzVibe AI 桌宠系统
- EmotionEngine: 5状态马尔可夫情绪机 + 蒙特卡洛转移
- VectorMemory: TF-IDF向量记忆 + SQLite持久化 + RAG检索
- AgentBrain: Ollama/OpenAI/Dummy三后端LLM
- BehaviorScheduler: 优先级/冷却/活跃度调度
- FastAPI服务器 + WebSocket实时推送
- perception: 键鼠监控 + 屏幕截图
- ui/pet_window: PySide6桌宠窗口 + 像素动画
- assets/pet: 5情绪各2帧像素艺术资源
2026-05-01 23:26:43 +08:00

705 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
EzVibe Behavior Scheduler
==========================
设计文档对应章节:调度与行为策略
核心职责
• 优先级控制P0 健康提醒 > P1 用户输入响应 > P2 主动闲聊
• 冷却时间控制:同类行为存在 CD 限制,防止重复打扰
• 活跃度判断:基于键鼠频率检测用户是否处于高强度工作
• 概率触发:结合情绪状态,采用非确定性概率触发主动行为
与设计文档对照
• 打扰控制三要素:冷却时间 + 活跃度判断 + 概率触发
• 优先级完全按文档分层P0 可打断 P1/P2
行为优先级定义
P0 (priority=0) : 健康/高危提醒(打断所有当前动作)
P1 (priority=1) : 用户主动输入响应(已在 AgentBrain.think() 处理)
P2 (priority=2) : 系统主动闲聊/行为(最低优先级)
P3 (priority=3) : LLM 自触发的闲聊行为
"""
from __future__ import annotations
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable
logger = logging.getLogger(__name__)
# ================================================================
# 1. 行为数据结构
# ================================================================
@dataclass
class Behavior:
"""
单个行为的定义。
参数
----
name : str
行为唯一标识(如 "remind_water")。
action_fn : callable
实际执行的行为函数,签名: () -> dict | None。
返回 dict 表示执行了行为(包含 message 等字段None 表示跳过。
priority : int
优先级0=最高3=最低)。
cooldown : float
冷却时间(秒)。同类行为触发后需等待这么久才能再次触发。
enabled : bool
是否启用。
tags : list[str]
标签分组(如 ["health", "reminder"]),用于跨行为冷却。
probability : float
基础触发概率0.0~1.00.0=从不触发1.0=总是触发。
description : str
行为的人类可读描述(用于调试/日志)。
"""
name: str
action_fn: Callable[[], dict | None]
priority: int = 2
cooldown: float = 120.0
enabled: bool = True
tags: list[str] = field(default_factory=list)
probability: float = 1.0
description: str = ""
# 内部状态(运行时不暴露给构造器)
_last_triggered: float = field(default=0.0, repr=False)
def is_ready(self, current_time: float) -> bool:
"""判断行为是否可触发(冷却已过)。"""
return current_time - self._last_triggered >= self.cooldown
def mark_triggered(self, current_time: float) -> None:
"""标记为已触发。"""
self._last_triggered = current_time
@dataclass
class Reminder:
"""
提醒事项(延迟执行的任务)。
参数
----
id : str
唯一标识。
message : str
提醒内容。
trigger_at : float
触发时间戳Unix timestamp
priority : int
优先级。
dismissed : bool
是否已忽略。
"""
id: str
message: str
trigger_at: float
priority: int = 0
dismissed: bool = False
@property
def is_due(self) -> bool:
return time.time() >= self.trigger_at and not self.dismissed
# ================================================================
# 2. 活跃度检测器
# ================================================================
class ActivityDetector:
"""
基于键鼠事件频率判断用户活跃度。
逻辑
----
1. 每次 detect() 调用传入事件类型key_press / mouse_click
2. 维护一个滑动时间窗口(如最近 60 秒)
3. 计算事件频率F = 事件数 / 窗口秒数)
4. 归一化到 0.0~1.0
阈值参考(可配置)
----
- activity < 0.15 : 极度专注(可能正在深度工作/游戏中)
- activity < 0.3 : 比较专注
- 0.3 ≤ activity < 0.7 : 适度活跃
- activity ≥ 0.7 : 非常活跃
"""
DEFAULT_WINDOW = 60.0 # 滑动时间窗口(秒)
DEFAULT_MAX_RATE = 3.0 # 归一化上限3次/秒 = 极高活跃度)
def __init__(
self,
window_seconds: float = DEFAULT_WINDOW,
max_event_rate: float = DEFAULT_MAX_RATE,
) -> None:
self._window = window_seconds
self._max_rate = max_event_rate
self._events: list[float] = [] # 时间戳列表
def detect(self, event_type: str = "any") -> float:
"""
记录一个事件,并返回当前活跃度。
参数
----
event_type : str
事件类型key_press / mouse_click / any
返回
----
float 活跃度 0.0(完全空闲)~ 1.0(极度活跃)。
"""
now = time.time()
self._events.append(now)
self._prune(now)
return self.activity_level
def _prune(self, now: float) -> None:
"""清理超过时间窗口的事件。"""
cutoff = now - self._window
self._events = [t for t in self._events if t > cutoff]
@property
def activity_level(self) -> float:
"""
返回当前活跃度0.0~1.0)。
计算: F = 事件数 / 窗口秒数,归一化到 max_rate。
"""
self._prune(time.time())
if not self._events:
return 0.0
rate = len(self._events) / self._window
return min(rate / self._max_rate, 1.0)
def is_highly_engaged(self, threshold: float = 0.15) -> bool:
"""判断用户是否处于高强度专注状态(不主动打扰)。"""
return self.activity_level < threshold
def is_idle(self, threshold: float = 0.7) -> bool:
"""判断用户是否空闲(可以主动搭话)。"""
return self.activity_level > threshold
def get_status(self) -> dict[str, Any]:
"""返回检测器状态(用于调试)。"""
return {
"window_seconds": self._window,
"event_count": len(self._events),
"activity_level": round(self.activity_level, 4),
}
# ================================================================
# 3. 行为调度器(主类)
# ================================================================
class BehaviorScheduler:
"""
行为调度器:优先级控制 + 冷却时间 + 活跃度判断 + 概率触发。
设计文档定位
智能层Agent Core的行为策略引擎
与 AgentBrain、EmotionEngine 协同工作。
工作流程
--------
1. 定时调用 check_and_trigger()(如每 5~10 秒一次)
2. 收集所有 enabled 行为
3. 按优先级排序P0 → P3
4. 对每个行为:
- 检查冷却状态
- 检查活跃度限制
- 概率掷骰
- 执行 action_fn()
5. 返回所有可执行的行为列表
参数
----
emotion_engine : EmotionEngine | None
情绪引擎引用(用于行为触发概率调制)。
activity_detector : ActivityDetector | None
活跃度检测器None 时使用默认实例)。
default_cooldown : float
默认冷却时间(秒),默认 120s。
示例
----
>>> sched = BehaviorScheduler()
>>> sched.register_behavior(
... name="remind_water",
... action_fn=lambda: {"message": "喝点水吧!", "type": "remind_water"},
... priority=0,
... cooldown=300.0,
... tags=["health", "reminder"],
... )
>>> await sched.check_and_trigger(user_activity_level=0.5)
"""
PRIORITY_HIGHEST = 0 # 健康/高危提醒
PRIORITY_USER_INPUT = 1 # 用户主动输入(通常不在 scheduler 层面处理)
PRIORITY_PROACTIVE = 2 # 系统主动闲聊/行为
PRIORITY_LLM_TRIGGER = 3 # LLM 自触发
def __init__(
self,
emotion_engine: Any = None,
activity_detector: ActivityDetector | None = None,
default_cooldown: float = 120.0,
) -> None:
self._behaviors: dict[str, Behavior] = {}
self._reminders: dict[str, Reminder] = {}
self._emotion = emotion_engine
self._activity = activity_detector or ActivityDetector()
self._default_cooldown = default_cooldown
self._last_triggered_behavior: str | None = None
# 注册内置行为
self._register_builtin_behaviors()
logger.info(
"BehaviorScheduler initialized | behaviors=%d | default_cooldown=%.0fs",
len(self._behaviors),
default_cooldown,
)
# ----------------------------------------------------------------
# 行为注册
# ----------------------------------------------------------------
def register_behavior(
self,
name: str,
action_fn: Callable[[], dict | None],
priority: int = 2,
cooldown: float | None = None,
enabled: bool = True,
tags: list[str] | None = None,
probability: float = 1.0,
description: str = "",
) -> None:
"""
注册一个新行为。
参数
----
name : str
行为唯一标识。
action_fn : callable
执行函数,返回行为结果 dict 或 None。
priority : int
优先级0=最高3=最低)。
cooldown : float | None
冷却时间None 使用调度器的默认值。
enabled : bool
是否启用。
tags : list[str] | None
标签(用于跨行为共享冷却)。
probability : float
基础触发概率0.0~1.0)。
description : str
人类可读描述。
"""
behavior = Behavior(
name=name,
action_fn=action_fn,
priority=priority,
cooldown=cooldown if cooldown is not None else self._default_cooldown,
enabled=enabled,
tags=tags or [],
probability=probability,
description=description or name,
)
self._behaviors[name] = behavior
logger.debug(
"[Scheduler] Registered behavior: name=%s priority=%d cooldown=%.0fs",
name, priority, behavior.cooldown,
)
def unregister_behavior(self, name: str) -> bool:
"""注销一个行为。"""
if name in self._behaviors:
del self._behaviors[name]
return True
return False
def enable_behavior(self, name: str) -> None:
"""启用行为。"""
if name in self._behaviors:
self._behaviors[name].enabled = True
def disable_behavior(self, name: str) -> None:
"""禁用行为(软删除,不注销)。"""
if name in self._behaviors:
self._behaviors[name].enabled = False
# ----------------------------------------------------------------
# 内置行为
# ----------------------------------------------------------------
def _register_builtin_behaviors(self) -> None:
"""注册内置提醒行为。"""
def _water_reminder() -> dict | None:
return {
"type": "remind_water",
"message": "记得喝水哦~",
"priority": 0,
}
def _stretch_reminder() -> dict | None:
return {
"type": "remind_stretch",
"message": "坐了好久啦,站起来伸展一下?",
"priority": 0,
}
def _idle_nudge() -> dict | None:
import random
messages = [
"嘿,发呆中... 要不要聊聊天?",
"(偷偷观察)看起来有点无聊?",
"有什么想说的吗?我在听~",
]
return {
"type": "nudge",
"message": random.choice(messages),
"priority": 2,
}
def _happy_nudge() -> dict | None:
import random
messages = [
"今天心情不错的样子!有什么好事吗?",
"嘿,看到你开心我也开心~",
]
return {
"type": "nudge",
"message": random.choice(messages),
"priority": 2,
}
self.register_behavior(
name="remind_water",
action_fn=_water_reminder,
priority=0,
cooldown=1800.0, # 30 分钟
tags=["health", "hydration"],
probability=1.0,
description="喝水提醒",
)
self.register_behavior(
name="remind_stretch",
action_fn=_stretch_reminder,
priority=0,
cooldown=3600.0, # 60 分钟
tags=["health", "posture"],
probability=1.0,
description="久坐伸展提醒",
)
self.register_behavior(
name="idle_nudge",
action_fn=_idle_nudge,
priority=2,
cooldown=300.0, # 5 分钟
tags=["social", "engagement"],
probability=0.3, # 概率触发
description="空闲闲聊触发",
)
self.register_behavior(
name="happy_nudge",
action_fn=_happy_nudge,
priority=2,
cooldown=600.0, # 10 分钟
tags=["social", "positive"],
probability=0.25,
description="开心时主动搭话",
)
# ----------------------------------------------------------------
# 核心调度
# ----------------------------------------------------------------
async def check_and_trigger(
self,
user_activity_level: float | None = None,
) -> list[dict]:
"""
检查并触发可执行的行为。
这是调度器的主入口,应定时调用(如每 5~10 秒一次)。
参数
----
user_activity_level : float | None
用户活跃度0.0~1.0None 时使用 ActivityDetector 的值。
返回
----
list[dict] 所有被触发的行为列表。
"""
now = time.time()
activity = (
user_activity_level
if user_activity_level is not None
else self._activity.activity_level
)
emotion = self._emotion.get_state() if self._emotion else "idle"
triggered: list[dict] = []
# 按优先级排序0 最高)
sorted_behaviors = sorted(
[b for b in self._behaviors.values() if b.enabled],
key=lambda b: b.priority,
)
for behavior in sorted_behaviors:
# 1. 冷却检查
if not behavior.is_ready(now):
continue
# 2. 活跃度检查P0 健康提醒不受此限制)
if behavior.priority > 0:
if self._is_activity_restricted(behavior, activity, emotion):
continue
# 3. 概率触发(结合情绪调制)
effective_prob = self._modulate_probability(behavior.probability, emotion)
if effective_prob <= 0.0:
continue
import random
if random.random() > effective_prob:
continue
# 4. 执行行为
result = behavior.action_fn()
if result is not None:
behavior.mark_triggered(now)
result.setdefault("behavior_name", behavior.name)
result.setdefault("priority", behavior.priority)
triggered.append(result)
self._last_triggered_behavior = behavior.name
logger.info(
"[Scheduler] Triggered: name=%s type=%s priority=%d emotion=%s activity=%.2f",
behavior.name,
result.get("type"),
behavior.priority,
emotion,
activity,
)
# 5. 处理到期的提醒
due_reminders = self._collect_due_reminders()
triggered.extend(due_reminders)
return triggered
def _is_activity_restricted(
self,
behavior: Behavior,
activity: float,
emotion: str,
) -> bool:
"""
判断行为是否因活跃度/情绪原因被限制。
规则(设计文档):
- 极度专注activity < 0.15)→ 禁止 P2/P3 主动打扰
- 专注状态activity < 0.3)→ 减少 P2 打扰
- 烦躁状态 → 禁止 P0 健康提醒(避免激怒用户)
"""
# 极度专注时不主动打扰P1 及以下)
if behavior.priority >= 1 and activity < 0.15:
return True
# 专注时减少闲聊打扰P2/P3
if behavior.priority >= 2 and activity < 0.3:
return True
# 烦躁时禁止所有主动打扰P0 健康提醒 + P2 闲聊)
if emotion == "annoyed" and behavior.priority <= 1:
return True
return False
def _modulate_probability(
self,
base_prob: float,
emotion: str,
) -> float:
"""
基于情绪状态调制触发概率。
设计文档:概率触发 = 结合情绪状态,采用非确定性概率触发。
规则
----
- happy → 概率 × 1.3(更爱搭话)
- idle → 概率 × 1.0(基准)
- focused → 概率 × 0.2(专注时不打扰)
- annoyed → 概率 × 0.5(烦躁时减少打扰)
- sleepy → 概率 × 0.3(困倦时话少)
"""
multipliers = {
"happy": 1.3,
"idle": 1.0,
"focused": 0.2,
"annoyed": 0.5,
"sleepy": 0.3,
}
mult = multipliers.get(emotion, 1.0)
return min(base_prob * mult, 1.0)
# ----------------------------------------------------------------
# 提醒管理
# ----------------------------------------------------------------
def add_reminder(
self,
message: str,
delay_seconds: float,
priority: int = 0,
) -> str:
"""
添加一个延迟执行的提醒。
参数
----
message : str
提醒内容。
delay_seconds : float
延迟秒数。
priority : int
优先级。
返回
----
str 提醒 ID可用于 cancel_reminder
"""
reminder_id = str(uuid.uuid4())[:8]
reminder = Reminder(
id=reminder_id,
message=message,
trigger_at=time.time() + delay_seconds,
priority=priority,
)
self._reminders[reminder_id] = reminder
logger.debug(
"[Scheduler] Reminder added: id=%s delay=%.0fs message=%r",
reminder_id, delay_seconds, message,
)
return reminder_id
def cancel_reminder(self, reminder_id: str) -> bool:
"""取消一个提醒。"""
if reminder_id in self._reminders:
self._reminders[reminder_id].dismissed = True
return True
return False
def dismiss_reminder(self, reminder_id: str) -> bool:
"""将提醒标记为已处理。"""
return self.cancel_reminder(reminder_id)
def get_active_reminders(self) -> list[dict]:
"""
返回所有未处理且未到期的提醒。
返回
----
list[dict] 每项包含 id, message, trigger_at, is_due 等。
"""
results = []
for rem in self._reminders.values():
if rem.dismissed:
continue
results.append({
"id": rem.id,
"message": rem.message,
"trigger_at": rem.trigger_at,
"is_due": rem.is_due,
"priority": rem.priority,
})
return sorted(results, key=lambda r: r["trigger_at"])
def _collect_due_reminders(self) -> list[dict]:
"""收集所有到期提醒并从队列移除。"""
now = time.time()
due = []
for rem_id, rem in list(self._reminders.items()):
if rem.is_due:
due.append({
"type": "reminder",
"message": rem.message,
"priority": rem.priority,
"reminder_id": rem_id,
})
rem.dismissed = True # 标记为已处理
return due
# ----------------------------------------------------------------
# 辅助方法
# ----------------------------------------------------------------
def get_active_behaviors(self) -> list[str]:
"""返回所有已注册且已启用的行为名称。"""
return [b.name for b in self._behaviors.values() if b.enabled]
def get_behavior_status(self, name: str) -> dict[str, Any] | None:
"""返回指定行为的详细状态(含冷却倒计时)。"""
behavior = self._behaviors.get(name)
if not behavior:
return None
elapsed = time.time() - behavior._last_triggered
cooldown_remaining = max(0.0, behavior.cooldown - elapsed)
return {
"name": behavior.name,
"priority": behavior.priority,
"enabled": behavior.enabled,
"probability": behavior.probability,
"cooldown_seconds": behavior.cooldown,
"cooldown_remaining": round(cooldown_remaining, 1),
"is_ready": behavior.is_ready(time.time()),
"tags": behavior.tags,
"description": behavior.description,
}
def get_status(self) -> dict[str, Any]:
"""
返回调度器完整状态(用于调试/监控面板)。
返回
----
dict 包含 behaviors、activity、reminders 等子状态。
"""
now = time.time()
behavior_summaries = {}
for name, b in self._behaviors.items():
elapsed = now - b._last_triggered
behavior_summaries[name] = {
"enabled": b.enabled,
"priority": b.priority,
"cooldown_remaining": round(max(0.0, b.cooldown - elapsed), 1),
"is_ready": b.is_ready(now),
}
return {
"behaviors": behavior_summaries,
"activity": self._activity.get_status(),
"reminders": self.get_active_reminders(),
"last_triggered_behavior": self._last_triggered_behavior,
"total_behaviors": len(self._behaviors),
"active_behaviors": len(self.get_active_behaviors()),
}