""" EzVibe Behavior Scheduler ========================== 设计文档对应章节:调度与行为策略 核心职责 • 优先级控制:P0 健康提醒 > P1 用户输入响应 > P2 主动闲聊 • 冷却时间控制:同类行为存在 CD 限制,防止重复打扰 • 活跃度判断:基于键鼠频率检测用户是否处于高强度工作 • 概率触发:结合情绪状态,采用非确定性概率触发主动行为 与设计文档对照 • 打扰控制三要素:冷却时间 + 活跃度判断 + 概率触发 • 优先级完全按文档分层,P0 可打断 P1/P2 行为优先级定义 P0 (priority=0) : 健康/高危提醒(打断所有当前动作) P1 (priority=1) : 用户主动输入响应(已在 AgentBrain.think() 处理) P2 (priority=2) : 系统主动闲聊/行为(最低优先级) P3 (priority=3) : LLM 自触发的闲聊行为 """ from __future__ import annotations import logging import time import uuid from dataclasses import dataclass, field from typing import Any, Callable logger = logging.getLogger(__name__) # ================================================================ # 1. 行为数据结构 # ================================================================ @dataclass class Behavior: """ 单个行为的定义。 参数 ---- name : str 行为唯一标识(如 "remind_water")。 action_fn : callable 实际执行的行为函数,签名: () -> dict | None。 返回 dict 表示执行了行为(包含 message 等字段),None 表示跳过。 priority : int 优先级(0=最高,3=最低)。 cooldown : float 冷却时间(秒)。同类行为触发后需等待这么久才能再次触发。 enabled : bool 是否启用。 tags : list[str] 标签分组(如 ["health", "reminder"]),用于跨行为冷却。 probability : float 基础触发概率(0.0~1.0),0.0=从不触发,1.0=总是触发。 description : str 行为的人类可读描述(用于调试/日志)。 """ name: str action_fn: Callable[[], dict | None] priority: int = 2 cooldown: float = 120.0 enabled: bool = True tags: list[str] = field(default_factory=list) probability: float = 1.0 description: str = "" # 内部状态(运行时不暴露给构造器) _last_triggered: float = field(default=0.0, repr=False) def is_ready(self, current_time: float) -> bool: """判断行为是否可触发(冷却已过)。""" return current_time - self._last_triggered >= self.cooldown def mark_triggered(self, current_time: float) -> None: """标记为已触发。""" self._last_triggered = current_time @dataclass class Reminder: """ 提醒事项(延迟执行的任务)。 参数 ---- id : str 唯一标识。 message : str 提醒内容。 trigger_at : float 触发时间戳(Unix timestamp)。 priority : int 优先级。 dismissed : bool 是否已忽略。 """ id: str message: str trigger_at: float priority: int = 0 dismissed: bool = False @property def is_due(self) -> bool: return time.time() >= self.trigger_at and not self.dismissed # ================================================================ # 2. 活跃度检测器 # ================================================================ class ActivityDetector: """ 基于键鼠事件频率判断用户活跃度。 逻辑 ---- 1. 每次 detect() 调用传入事件类型(key_press / mouse_click) 2. 维护一个滑动时间窗口(如最近 60 秒) 3. 计算事件频率(F = 事件数 / 窗口秒数) 4. 归一化到 0.0~1.0 阈值参考(可配置) ---- - activity < 0.15 : 极度专注(可能正在深度工作/游戏中) - activity < 0.3 : 比较专注 - 0.3 ≤ activity < 0.7 : 适度活跃 - activity ≥ 0.7 : 非常活跃 """ DEFAULT_WINDOW = 60.0 # 滑动时间窗口(秒) DEFAULT_MAX_RATE = 3.0 # 归一化上限(3次/秒 = 极高活跃度) def __init__( self, window_seconds: float = DEFAULT_WINDOW, max_event_rate: float = DEFAULT_MAX_RATE, ) -> None: self._window = window_seconds self._max_rate = max_event_rate self._events: list[float] = [] # 时间戳列表 def detect(self, event_type: str = "any") -> float: """ 记录一个事件,并返回当前活跃度。 参数 ---- event_type : str 事件类型(key_press / mouse_click / any)。 返回 ---- float 活跃度 0.0(完全空闲)~ 1.0(极度活跃)。 """ now = time.time() self._events.append(now) self._prune(now) return self.activity_level def _prune(self, now: float) -> None: """清理超过时间窗口的事件。""" cutoff = now - self._window self._events = [t for t in self._events if t > cutoff] @property def activity_level(self) -> float: """ 返回当前活跃度(0.0~1.0)。 计算: F = 事件数 / 窗口秒数,归一化到 max_rate。 """ self._prune(time.time()) if not self._events: return 0.0 rate = len(self._events) / self._window return min(rate / self._max_rate, 1.0) def is_highly_engaged(self, threshold: float = 0.15) -> bool: """判断用户是否处于高强度专注状态(不主动打扰)。""" return self.activity_level < threshold def is_idle(self, threshold: float = 0.7) -> bool: """判断用户是否空闲(可以主动搭话)。""" return self.activity_level > threshold def get_status(self) -> dict[str, Any]: """返回检测器状态(用于调试)。""" return { "window_seconds": self._window, "event_count": len(self._events), "activity_level": round(self.activity_level, 4), } # ================================================================ # 3. 行为调度器(主类) # ================================================================ class BehaviorScheduler: """ 行为调度器:优先级控制 + 冷却时间 + 活跃度判断 + 概率触发。 设计文档定位 智能层(Agent Core)的行为策略引擎, 与 AgentBrain、EmotionEngine 协同工作。 工作流程 -------- 1. 定时调用 check_and_trigger()(如每 5~10 秒一次) 2. 收集所有 enabled 行为 3. 按优先级排序(P0 → P3) 4. 对每个行为: - 检查冷却状态 - 检查活跃度限制 - 概率掷骰 - 执行 action_fn() 5. 返回所有可执行的行为列表 参数 ---- emotion_engine : EmotionEngine | None 情绪引擎引用(用于行为触发概率调制)。 activity_detector : ActivityDetector | None 活跃度检测器(None 时使用默认实例)。 default_cooldown : float 默认冷却时间(秒),默认 120s。 示例 ---- >>> sched = BehaviorScheduler() >>> sched.register_behavior( ... name="remind_water", ... action_fn=lambda: {"message": "喝点水吧!", "type": "remind_water"}, ... priority=0, ... cooldown=300.0, ... tags=["health", "reminder"], ... ) >>> await sched.check_and_trigger(user_activity_level=0.5) """ PRIORITY_HIGHEST = 0 # 健康/高危提醒 PRIORITY_USER_INPUT = 1 # 用户主动输入(通常不在 scheduler 层面处理) PRIORITY_PROACTIVE = 2 # 系统主动闲聊/行为 PRIORITY_LLM_TRIGGER = 3 # LLM 自触发 def __init__( self, emotion_engine: Any = None, activity_detector: ActivityDetector | None = None, default_cooldown: float = 120.0, ) -> None: self._behaviors: dict[str, Behavior] = {} self._reminders: dict[str, Reminder] = {} self._emotion = emotion_engine self._activity = activity_detector or ActivityDetector() self._default_cooldown = default_cooldown self._last_triggered_behavior: str | None = None # 注册内置行为 self._register_builtin_behaviors() logger.info( "BehaviorScheduler initialized | behaviors=%d | default_cooldown=%.0fs", len(self._behaviors), default_cooldown, ) # ---------------------------------------------------------------- # 行为注册 # ---------------------------------------------------------------- def register_behavior( self, name: str, action_fn: Callable[[], dict | None], priority: int = 2, cooldown: float | None = None, enabled: bool = True, tags: list[str] | None = None, probability: float = 1.0, description: str = "", ) -> None: """ 注册一个新行为。 参数 ---- name : str 行为唯一标识。 action_fn : callable 执行函数,返回行为结果 dict 或 None。 priority : int 优先级(0=最高,3=最低)。 cooldown : float | None 冷却时间(秒),None 使用调度器的默认值。 enabled : bool 是否启用。 tags : list[str] | None 标签(用于跨行为共享冷却)。 probability : float 基础触发概率(0.0~1.0)。 description : str 人类可读描述。 """ behavior = Behavior( name=name, action_fn=action_fn, priority=priority, cooldown=cooldown if cooldown is not None else self._default_cooldown, enabled=enabled, tags=tags or [], probability=probability, description=description or name, ) self._behaviors[name] = behavior logger.debug( "[Scheduler] Registered behavior: name=%s priority=%d cooldown=%.0fs", name, priority, behavior.cooldown, ) def unregister_behavior(self, name: str) -> bool: """注销一个行为。""" if name in self._behaviors: del self._behaviors[name] return True return False def enable_behavior(self, name: str) -> None: """启用行为。""" if name in self._behaviors: self._behaviors[name].enabled = True def disable_behavior(self, name: str) -> None: """禁用行为(软删除,不注销)。""" if name in self._behaviors: self._behaviors[name].enabled = False # ---------------------------------------------------------------- # 内置行为 # ---------------------------------------------------------------- def _register_builtin_behaviors(self) -> None: """注册内置提醒行为。""" def _water_reminder() -> dict | None: return { "type": "remind_water", "message": "记得喝水哦~", "priority": 0, } def _stretch_reminder() -> dict | None: return { "type": "remind_stretch", "message": "坐了好久啦,站起来伸展一下?", "priority": 0, } def _idle_nudge() -> dict | None: import random messages = [ "嘿,发呆中... 要不要聊聊天?", "(偷偷观察)看起来有点无聊?", "有什么想说的吗?我在听~", ] return { "type": "nudge", "message": random.choice(messages), "priority": 2, } def _happy_nudge() -> dict | None: import random messages = [ "今天心情不错的样子!有什么好事吗?", "嘿,看到你开心我也开心~", ] return { "type": "nudge", "message": random.choice(messages), "priority": 2, } self.register_behavior( name="remind_water", action_fn=_water_reminder, priority=0, cooldown=1800.0, # 30 分钟 tags=["health", "hydration"], probability=1.0, description="喝水提醒", ) self.register_behavior( name="remind_stretch", action_fn=_stretch_reminder, priority=0, cooldown=3600.0, # 60 分钟 tags=["health", "posture"], probability=1.0, description="久坐伸展提醒", ) self.register_behavior( name="idle_nudge", action_fn=_idle_nudge, priority=2, cooldown=300.0, # 5 分钟 tags=["social", "engagement"], probability=0.3, # 概率触发 description="空闲闲聊触发", ) self.register_behavior( name="happy_nudge", action_fn=_happy_nudge, priority=2, cooldown=600.0, # 10 分钟 tags=["social", "positive"], probability=0.25, description="开心时主动搭话", ) # ---------------------------------------------------------------- # 核心调度 # ---------------------------------------------------------------- async def check_and_trigger( self, user_activity_level: float | None = None, ) -> list[dict]: """ 检查并触发可执行的行为。 这是调度器的主入口,应定时调用(如每 5~10 秒一次)。 参数 ---- user_activity_level : float | None 用户活跃度(0.0~1.0),None 时使用 ActivityDetector 的值。 返回 ---- list[dict] 所有被触发的行为列表。 """ now = time.time() activity = ( user_activity_level if user_activity_level is not None else self._activity.activity_level ) emotion = self._emotion.get_state() if self._emotion else "idle" triggered: list[dict] = [] # 按优先级排序(0 最高) sorted_behaviors = sorted( [b for b in self._behaviors.values() if b.enabled], key=lambda b: b.priority, ) for behavior in sorted_behaviors: # 1. 冷却检查 if not behavior.is_ready(now): continue # 2. 活跃度检查(P0 健康提醒不受此限制) if behavior.priority > 0: if self._is_activity_restricted(behavior, activity, emotion): continue # 3. 概率触发(结合情绪调制) effective_prob = self._modulate_probability(behavior.probability, emotion) if effective_prob <= 0.0: continue import random if random.random() > effective_prob: continue # 4. 执行行为 result = behavior.action_fn() if result is not None: behavior.mark_triggered(now) result.setdefault("behavior_name", behavior.name) result.setdefault("priority", behavior.priority) triggered.append(result) self._last_triggered_behavior = behavior.name logger.info( "[Scheduler] Triggered: name=%s type=%s priority=%d emotion=%s activity=%.2f", behavior.name, result.get("type"), behavior.priority, emotion, activity, ) # 5. 处理到期的提醒 due_reminders = self._collect_due_reminders() triggered.extend(due_reminders) return triggered def _is_activity_restricted( self, behavior: Behavior, activity: float, emotion: str, ) -> bool: """ 判断行为是否因活跃度/情绪原因被限制。 规则(设计文档): - 极度专注(activity < 0.15)→ 禁止 P2/P3 主动打扰 - 专注状态(activity < 0.3)→ 减少 P2 打扰 - 烦躁状态 → 禁止 P0 健康提醒(避免激怒用户) """ # 极度专注时不主动打扰(P1 及以下) if behavior.priority >= 1 and activity < 0.15: return True # 专注时减少闲聊打扰(P2/P3) if behavior.priority >= 2 and activity < 0.3: return True # 烦躁时禁止所有主动打扰(P0 健康提醒 + P2 闲聊) if emotion == "annoyed" and behavior.priority <= 1: return True return False def _modulate_probability( self, base_prob: float, emotion: str, ) -> float: """ 基于情绪状态调制触发概率。 设计文档:概率触发 = 结合情绪状态,采用非确定性概率触发。 规则 ---- - happy → 概率 × 1.3(更爱搭话) - idle → 概率 × 1.0(基准) - focused → 概率 × 0.2(专注时不打扰) - annoyed → 概率 × 0.5(烦躁时减少打扰) - sleepy → 概率 × 0.3(困倦时话少) """ multipliers = { "happy": 1.3, "idle": 1.0, "focused": 0.2, "annoyed": 0.5, "sleepy": 0.3, } mult = multipliers.get(emotion, 1.0) return min(base_prob * mult, 1.0) # ---------------------------------------------------------------- # 提醒管理 # ---------------------------------------------------------------- def add_reminder( self, message: str, delay_seconds: float, priority: int = 0, ) -> str: """ 添加一个延迟执行的提醒。 参数 ---- message : str 提醒内容。 delay_seconds : float 延迟秒数。 priority : int 优先级。 返回 ---- str 提醒 ID(可用于 cancel_reminder)。 """ reminder_id = str(uuid.uuid4())[:8] reminder = Reminder( id=reminder_id, message=message, trigger_at=time.time() + delay_seconds, priority=priority, ) self._reminders[reminder_id] = reminder logger.debug( "[Scheduler] Reminder added: id=%s delay=%.0fs message=%r", reminder_id, delay_seconds, message, ) return reminder_id def cancel_reminder(self, reminder_id: str) -> bool: """取消一个提醒。""" if reminder_id in self._reminders: self._reminders[reminder_id].dismissed = True return True return False def dismiss_reminder(self, reminder_id: str) -> bool: """将提醒标记为已处理。""" return self.cancel_reminder(reminder_id) def get_active_reminders(self) -> list[dict]: """ 返回所有未处理且未到期的提醒。 返回 ---- list[dict] 每项包含 id, message, trigger_at, is_due 等。 """ results = [] for rem in self._reminders.values(): if rem.dismissed: continue results.append({ "id": rem.id, "message": rem.message, "trigger_at": rem.trigger_at, "is_due": rem.is_due, "priority": rem.priority, }) return sorted(results, key=lambda r: r["trigger_at"]) def _collect_due_reminders(self) -> list[dict]: """收集所有到期提醒并从队列移除。""" now = time.time() due = [] for rem_id, rem in list(self._reminders.items()): if rem.is_due: due.append({ "type": "reminder", "message": rem.message, "priority": rem.priority, "reminder_id": rem_id, }) rem.dismissed = True # 标记为已处理 return due # ---------------------------------------------------------------- # 辅助方法 # ---------------------------------------------------------------- def get_active_behaviors(self) -> list[str]: """返回所有已注册且已启用的行为名称。""" return [b.name for b in self._behaviors.values() if b.enabled] def get_behavior_status(self, name: str) -> dict[str, Any] | None: """返回指定行为的详细状态(含冷却倒计时)。""" behavior = self._behaviors.get(name) if not behavior: return None elapsed = time.time() - behavior._last_triggered cooldown_remaining = max(0.0, behavior.cooldown - elapsed) return { "name": behavior.name, "priority": behavior.priority, "enabled": behavior.enabled, "probability": behavior.probability, "cooldown_seconds": behavior.cooldown, "cooldown_remaining": round(cooldown_remaining, 1), "is_ready": behavior.is_ready(time.time()), "tags": behavior.tags, "description": behavior.description, } def get_status(self) -> dict[str, Any]: """ 返回调度器完整状态(用于调试/监控面板)。 返回 ---- dict 包含 behaviors、activity、reminders 等子状态。 """ now = time.time() behavior_summaries = {} for name, b in self._behaviors.items(): elapsed = now - b._last_triggered behavior_summaries[name] = { "enabled": b.enabled, "priority": b.priority, "cooldown_remaining": round(max(0.0, b.cooldown - elapsed), 1), "is_ready": b.is_ready(now), } return { "behaviors": behavior_summaries, "activity": self._activity.get_status(), "reminders": self.get_active_reminders(), "last_triggered_behavior": self._last_triggered_behavior, "total_behaviors": len(self._behaviors), "active_behaviors": len(self.get_active_behaviors()), }