Files
EzVibe/perception/keyboard_mouse_monitor.py
e2hang 2a844e83a8 Initial commit: EzVibe AI 桌宠系统
- EmotionEngine: 5状态马尔可夫情绪机 + 蒙特卡洛转移
- VectorMemory: TF-IDF向量记忆 + SQLite持久化 + RAG检索
- AgentBrain: Ollama/OpenAI/Dummy三后端LLM
- BehaviorScheduler: 优先级/冷却/活跃度调度
- FastAPI服务器 + WebSocket实时推送
- perception: 键鼠监控 + 屏幕截图
- ui/pet_window: PySide6桌宠窗口 + 像素动画
- assets/pet: 5情绪各2帧像素艺术资源
2026-05-01 23:26:43 +08:00

490 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
perception/keyboard_mouse_monitor.py
=====================================
全局键盘/鼠标监听器 —— 使用 pynput 全局钩子捕获所有键鼠事件,
通过滑动窗口统计活跃度,并将事件传递给 ActivityDetector来自 scheduler.py
设计要点
--------
- pynput.Listener 运行在独立线程,通过 queue.Queue 传递事件到主线程
- ActivityDetector来自 agent.scheduler负责滑动窗口统计
- 支持暂停/恢复监听pause/resume
- 可选:事件触发回调(用于记录到 memory
- Dummy 模式:不依赖 pynput用于 headless 测试
用法
----
from perception.keyboard_mouse_monitor import KeyboardMouseMonitor
monitor = KeyboardMouseMonitor(
window_seconds=300,
idle_timeout_seconds=60,
on_event=None, # 可选:每次键鼠事件回调 (event_type, timestamp)
use_dummy=False, # True=模拟事件,用于测试
)
monitor.start() # 后台线程开始监听
monitor.stop() # 停止监听
# 查询活跃度
activity = monitor.get_activity() # float 0.0~1.0
is_idle = monitor.is_idle() # bool
is_engaged = monitor.is_highly_engaged() # bool
"""
from __future__ import annotations
import atexit
import queue
import threading
import time
import os
import sys
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Callable, Literal, Any
# ---------------------------------------------------------------------------
# pynput 延迟导入Dummy 模式不需要)
# ---------------------------------------------------------------------------
_pynput_available: bool | None = None
def _check_pynput() -> bool:
global _pynput_available
if _pynput_available is None:
try:
from pynput import keyboard, mouse # noqa: F401
_pynput_available = True
except ImportError:
_pynput_available = False
return _pynput_available
# ---------------------------------------------------------------------------
# ActivityDetector — 滑动窗口活跃度统计(与 scheduler.py 共享逻辑)
# ---------------------------------------------------------------------------
class ActivityLevel(Enum):
IDLE = auto() # 电脑空闲(无键鼠)
LOW = auto() # 低活跃
NORMAL = auto() # 正常
HIGH = auto() # 高度专注
@dataclass
class ActivityDetector:
"""
滑动窗口键鼠活跃度检测器。
参数
----
window_seconds : 统计时间窗口(秒),默认 300s5 分钟)
idle_timeout_seconds : 超过此时间无事件判定为电脑空闲
"""
window_seconds: float = 300.0
idle_timeout_seconds: float = 60.0
# 内部状态
_events: list[tuple[float, Literal["keyboard", "mouse"]]] = field(default_factory=list)
_lock: threading.Lock = field(default_factory=threading.Lock)
# ── 公开 API ──────────────────────────────────────────────────────────
def record(self, event_type: Literal["keyboard", "mouse"], timestamp: float | None = None) -> None:
"""记录一次键鼠事件(线程安全)。"""
if timestamp is None:
timestamp = time.monotonic()
with self._lock:
self._events.append((timestamp, event_type))
self._prune(timestamp)
def get_activity(self) -> float:
"""
返回 0.0~1.0 的活跃度指数。
计算方式:滑动窗口内事件数 / 期望事件基准(每分钟 60 次)
"""
with self._lock:
self._prune(time.monotonic())
raw = len(self._events)
# 基准window_seconds 内平均每分钟 60 次 = window_seconds/60 * 60 = window_seconds 个事件
baseline = self.window_seconds / 60.0 * 30.0 # 30 events/min = normal activity
return min(raw / baseline, 1.0)
def is_idle(self) -> bool:
"""电脑是否处于空闲状态(无键鼠事件超过 idle_timeout"""
with self._lock:
if not self._events:
return True
last = self._events[-1][0]
return (time.monotonic() - last) > self.idle_timeout_seconds
def is_highly_engaged(self) -> bool:
"""是否高度专注(活跃度 > 0.7 且非空闲)。"""
return self.get_activity() > 0.7 and not self.is_idle()
def get_activity_level(self) -> ActivityLevel:
"""返回活跃度等级枚举。"""
act = self.get_activity()
idle = self.is_idle()
if idle:
return ActivityLevel.IDLE
if act < 0.2:
return ActivityLevel.LOW
if act > 0.7:
return ActivityLevel.HIGH
return ActivityLevel.NORMAL
@property
def activity_level(self) -> float:
"""与 scheduler.ActivityDetector 兼容的属性版本(返回 0.0~1.0)。"""
return self.get_activity()
# ── 内部 ──────────────────────────────────────────────────────────────
def _prune(self, now: float) -> None:
"""删除超出窗口的事件。"""
cutoff = now - self.window_seconds
self._events[:] = [(t, e) for t, e in self._events if t > cutoff]
# ---------------------------------------------------------------------------
# KeyboardMouseMonitor — 全局键鼠监听
# ---------------------------------------------------------------------------
@dataclass
class KeyboardMouseMonitor:
"""
全局键盘/鼠标监听器pynput 全局钩子)。
参数
----
window_seconds : ActivityDetector 统计窗口
idle_timeout_seconds : 空闲判定超时
on_event : 可选回调,每次键鼠事件触发 (event_type, timestamp)
use_dummy : True=不启动 pynput listener仅用于测试验证 API
示例
----
monitor = KeyboardMouseMonitor(on_event=lambda t, ts: print(f"{t} @ {ts}"))
monitor.start()
time.sleep(10)
print(monitor.get_activity())
monitor.stop()
"""
window_seconds: float = 300.0
idle_timeout_seconds: float = 60.0
on_event: Callable[[Literal["keyboard", "mouse"], float], Any] | None = None
use_dummy: bool = False
# 内部状态
_activity: ActivityDetector = field(
default_factory=lambda: ActivityDetector(window_seconds=300.0, idle_timeout_seconds=60.0)
)
_running: bool = False
_paused: bool = False
_lock: threading.Lock = field(default_factory=threading.Lock)
_queue: queue.Queue = field(default_factory=queue.Queue)
_listener_thread: threading.Thread | None = None
_worker_thread: threading.Thread | None = None
def __post_init__(self) -> None:
self._activity = ActivityDetector(
window_seconds=self.window_seconds,
idle_timeout_seconds=self.idle_timeout_seconds,
)
self._queue = queue.Queue()
# ── 生命周期 ──────────────────────────────────────────────────────────
def start(self) -> None:
"""启动监听线程(可重入幂等)。"""
with self._lock:
if self._running:
return
self._running = True
self._paused = False
if self.use_dummy or not _check_pynput():
self._start_dummy()
else:
self._start_real()
# 注册退出清理
atexit.register(self.stop)
def stop(self) -> None:
"""停止所有监听线程。"""
with self._lock:
if not self._running:
return
self._running = False
# 关闭队列(防止 worker 阻塞)
try:
self._queue.put_nowait(None)
except queue.Full:
pass
if self._listener_thread and self._listener_thread.is_alive():
self._listener_thread.join(timeout=3.0)
if self._worker_thread and self._worker_thread.is_alive():
self._worker_thread.join(timeout=2.0)
atexit.unregister(self.stop)
def pause(self) -> None:
"""暂停事件记录(监听器继续运行但不记录)。"""
with self._lock:
self._paused = True
def resume(self) -> None:
"""恢复事件记录。"""
with self._lock:
self._paused = False
# ── 活跃度查询 ────────────────────────────────────────────────────────
def get_activity(self) -> float:
"""返回 0.0~1.0 活跃度。"""
return self._activity.get_activity()
def is_idle(self) -> bool:
return self._activity.is_idle()
def is_highly_engaged(self) -> bool:
return self._activity.is_highly_engaged()
def get_activity_level(self) -> ActivityLevel:
return self._activity.get_activity_level()
def get_event_count(self, window_seconds: float | None = None) -> int:
"""返回指定窗口内的事件总数(用于调试)。"""
if window_seconds is None:
window_seconds = self.window_seconds
with self._activity._lock:
now = time.monotonic()
cutoff = now - window_seconds
return sum(1 for t, _ in self._activity._events if t > cutoff)
# ── 内部实现 ─────────────────────────────────────────────────────────
def _start_dummy(self) -> None:
"""Dummy 模式:仅启动 worker 线程,不启动 pynput。"""
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="km-dummy")
self._worker_thread.start()
def _start_real(self) -> None:
"""真实模式:启动 pynput listener + worker 线程。"""
# worker 消费队列
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="km-worker")
self._worker_thread.start()
# pynput listener 在独立线程运行
self._listener_thread = threading.Thread(target=self._listener_loop, daemon=True, name="km-listener")
self._listener_thread.start()
def _listener_loop(self) -> None:
"""pynput listener 主循环——在独立线程运行。"""
from pynput import keyboard, mouse
def on_key(event: keyboard.Key | keyboard.KeyCode | None) -> bool:
# pynput 对普通按压发送 KeyCode对特殊键发送 Key
if event is None:
return True # 忽略 None
try:
et = "keyboard"
ts = time.monotonic()
self._queue.put_nowait((et, ts))
except queue.Full:
pass
return True # 继续监听
def on_mouse(x: int, y: int, button: mouse.Button, pressed: bool) -> bool:
# 只记录按下事件,忽略释放,减少重复计数
if pressed:
try:
et = "mouse"
ts = time.monotonic()
self._queue.put_nowait((et, ts))
except queue.Full:
pass
return True
kl = keyboard.Listener(on_press=on_key, on_release=None, suppress=False)
ml = mouse.Listener(on_click=on_mouse, on_move=False, on_scroll=False, suppress=False)
kl.start()
ml.start()
kl.join()
ml.join()
def _worker_loop(self) -> None:
"""消费队列,记录事件到 ActivityDetector。"""
while self._running:
try:
item = self._queue.get(timeout=0.5)
if item is None:
break # 停止信号
event_type, timestamp = item
with self._lock:
if not self._paused:
self._activity.record(event_type, timestamp)
if self.on_event:
self.on_event(event_type, timestamp)
except queue.Empty:
continue
except Exception:
# 不让异常杀死 worker
pass
# ---------------------------------------------------------------------------
# ScreenCapture — 屏幕截图mss
# ---------------------------------------------------------------------------
@dataclass
class ScreenCapture:
"""
屏幕截图工具mss
用法
----
cap = ScreenCapture()
screenshot = cap.capture(monitor=0) # PIL.Image
screenshot.save("screen.png")
text = cap.extract_text(screenshot) # 可选 OCR需要 pytesseract
"""
_mss = None
_ocr_available: bool | None = None
def __post_init__(self) -> None:
self._load_mss()
def _load_mss(self) -> bool:
"""延迟导入 mss返回是否成功。"""
if self._mss is not None:
return True
try:
import mss # noqa: F401
self._mss = True
return True
except ImportError:
self._mss = False
return False
def is_available(self) -> bool:
"""mss 是否可用。"""
return self._mss is True
def capture(self, monitor: int = 0) -> Any | None:
"""
截取指定显示器0=主屏)。
返回 PIL.Image 或 Nonemss 不可用时)。
"""
if not self._mss:
return None
try:
import mss
with mss.msssshot() as sct:
sct_img = sct.shot(mon=monitor)
from PIL import Image
import numpy as np
img = Image.fromarray(np.asarray(sct_img))
return img
except Exception:
return None
def capture_region(self, x: int, y: int, width: int, height: int) -> Any | None:
"""
截取屏幕区域。
参数
----
x, y : 左上角坐标(像素)
width, height : 区域宽高(像素)
返回 PIL.Image 或 None。
"""
if not self._mss:
return None
try:
import mss
import numpy as np
from PIL import Image
mon = {"left": x, "top": y, "width": width, "height": height}
with mss.mssshot() as sct:
sct_img = sct.grab(mon)
return Image.fromarray(np.asarray(sct_img))
except Exception:
return None
def extract_text(self, image: Any) -> str:
"""
从截图提取文字OCR
需要安装pytesseract + Tesseract-OCR
macOS: brew install tesseract tesseract-lang
Windows: pip install tesseract
返回识别出的文字(换行分隔)。
"""
try:
import pytesseract
self._ocr_available = True
return pytesseract.image_to_string(image, lang="chi_sim+eng")
except ImportError:
self._ocr_available = False
return ""
except Exception:
return ""
def get_monitors(self) -> list[dict]:
"""返回所有显示器的信息列表。"""
if not self._mss:
return []
try:
import mss
with mss.mssshot() as sct:
return sct.monitors
except Exception:
return []
# ---------------------------------------------------------------------------
# 单例全局 Monitor方便跨模块使用
# ---------------------------------------------------------------------------
_global_monitor: KeyboardMouseMonitor | None = None
def get_global_monitor(
window_seconds: float = 300.0,
idle_timeout: float = 60.0,
) -> KeyboardMouseMonitor:
"""获取全局单例 KeyboardMouseMonitor。"""
global _global_monitor
if _global_monitor is None:
_global_monitor = KeyboardMouseMonitor(
window_seconds=window_seconds,
idle_timeout_seconds=idle_timeout,
)
_global_monitor.start()
return _global_monitor
def stop_global_monitor() -> None:
"""停止全局单例。"""
global _global_monitor
if _global_monitor:
_global_monitor.stop()
_global_monitor = None