""" perception/keyboard_mouse_monitor.py ===================================== 全局键盘/鼠标监听器 —— 使用 pynput 全局钩子捕获所有键鼠事件, 通过滑动窗口统计活跃度,并将事件传递给 ActivityDetector(来自 scheduler.py)。 设计要点 -------- - pynput.Listener 运行在独立线程,通过 queue.Queue 传递事件到主线程 - ActivityDetector(来自 agent.scheduler)负责滑动窗口统计 - 支持暂停/恢复监听(pause/resume) - 可选:事件触发回调(用于记录到 memory) - Dummy 模式:不依赖 pynput,用于 headless 测试 用法 ---- from perception.keyboard_mouse_monitor import KeyboardMouseMonitor monitor = KeyboardMouseMonitor( window_seconds=300, idle_timeout_seconds=60, on_event=None, # 可选:每次键鼠事件回调 (event_type, timestamp) use_dummy=False, # True=模拟事件,用于测试 ) monitor.start() # 后台线程开始监听 monitor.stop() # 停止监听 # 查询活跃度 activity = monitor.get_activity() # float 0.0~1.0 is_idle = monitor.is_idle() # bool is_engaged = monitor.is_highly_engaged() # bool """ from __future__ import annotations import atexit import queue import threading import time import os import sys from dataclasses import dataclass, field from enum import Enum, auto from typing import Callable, Literal, Any # --------------------------------------------------------------------------- # pynput 延迟导入(Dummy 模式不需要) # --------------------------------------------------------------------------- _pynput_available: bool | None = None def _check_pynput() -> bool: global _pynput_available if _pynput_available is None: try: from pynput import keyboard, mouse # noqa: F401 _pynput_available = True except ImportError: _pynput_available = False return _pynput_available # --------------------------------------------------------------------------- # ActivityDetector — 滑动窗口活跃度统计(与 scheduler.py 共享逻辑) # --------------------------------------------------------------------------- class ActivityLevel(Enum): IDLE = auto() # 电脑空闲(无键鼠) LOW = auto() # 低活跃 NORMAL = auto() # 正常 HIGH = auto() # 高度专注 @dataclass class ActivityDetector: """ 滑动窗口键鼠活跃度检测器。 参数 ---- window_seconds : 统计时间窗口(秒),默认 300s(5 分钟) idle_timeout_seconds : 超过此时间无事件判定为电脑空闲 """ window_seconds: float = 300.0 idle_timeout_seconds: float = 60.0 # 内部状态 _events: list[tuple[float, Literal["keyboard", "mouse"]]] = field(default_factory=list) _lock: threading.Lock = field(default_factory=threading.Lock) # ── 公开 API ────────────────────────────────────────────────────────── def record(self, event_type: Literal["keyboard", "mouse"], timestamp: float | None = None) -> None: """记录一次键鼠事件(线程安全)。""" if timestamp is None: timestamp = time.monotonic() with self._lock: self._events.append((timestamp, event_type)) self._prune(timestamp) def get_activity(self) -> float: """ 返回 0.0~1.0 的活跃度指数。 计算方式:滑动窗口内事件数 / 期望事件基准(每分钟 60 次) """ with self._lock: self._prune(time.monotonic()) raw = len(self._events) # 基准:window_seconds 内平均每分钟 60 次 = window_seconds/60 * 60 = window_seconds 个事件 baseline = self.window_seconds / 60.0 * 30.0 # 30 events/min = normal activity return min(raw / baseline, 1.0) def is_idle(self) -> bool: """电脑是否处于空闲状态(无键鼠事件超过 idle_timeout)。""" with self._lock: if not self._events: return True last = self._events[-1][0] return (time.monotonic() - last) > self.idle_timeout_seconds def is_highly_engaged(self) -> bool: """是否高度专注(活跃度 > 0.7 且非空闲)。""" return self.get_activity() > 0.7 and not self.is_idle() def get_activity_level(self) -> ActivityLevel: """返回活跃度等级枚举。""" act = self.get_activity() idle = self.is_idle() if idle: return ActivityLevel.IDLE if act < 0.2: return ActivityLevel.LOW if act > 0.7: return ActivityLevel.HIGH return ActivityLevel.NORMAL @property def activity_level(self) -> float: """与 scheduler.ActivityDetector 兼容的属性版本(返回 0.0~1.0)。""" return self.get_activity() # ── 内部 ────────────────────────────────────────────────────────────── def _prune(self, now: float) -> None: """删除超出窗口的事件。""" cutoff = now - self.window_seconds self._events[:] = [(t, e) for t, e in self._events if t > cutoff] # --------------------------------------------------------------------------- # KeyboardMouseMonitor — 全局键鼠监听 # --------------------------------------------------------------------------- @dataclass class KeyboardMouseMonitor: """ 全局键盘/鼠标监听器(pynput 全局钩子)。 参数 ---- window_seconds : ActivityDetector 统计窗口 idle_timeout_seconds : 空闲判定超时 on_event : 可选回调,每次键鼠事件触发 (event_type, timestamp) use_dummy : True=不启动 pynput listener,仅用于测试验证 API 示例 ---- monitor = KeyboardMouseMonitor(on_event=lambda t, ts: print(f"{t} @ {ts}")) monitor.start() time.sleep(10) print(monitor.get_activity()) monitor.stop() """ window_seconds: float = 300.0 idle_timeout_seconds: float = 60.0 on_event: Callable[[Literal["keyboard", "mouse"], float], Any] | None = None use_dummy: bool = False # 内部状态 _activity: ActivityDetector = field( default_factory=lambda: ActivityDetector(window_seconds=300.0, idle_timeout_seconds=60.0) ) _running: bool = False _paused: bool = False _lock: threading.Lock = field(default_factory=threading.Lock) _queue: queue.Queue = field(default_factory=queue.Queue) _listener_thread: threading.Thread | None = None _worker_thread: threading.Thread | None = None def __post_init__(self) -> None: self._activity = ActivityDetector( window_seconds=self.window_seconds, idle_timeout_seconds=self.idle_timeout_seconds, ) self._queue = queue.Queue() # ── 生命周期 ────────────────────────────────────────────────────────── def start(self) -> None: """启动监听线程(可重入幂等)。""" with self._lock: if self._running: return self._running = True self._paused = False if self.use_dummy or not _check_pynput(): self._start_dummy() else: self._start_real() # 注册退出清理 atexit.register(self.stop) def stop(self) -> None: """停止所有监听线程。""" with self._lock: if not self._running: return self._running = False # 关闭队列(防止 worker 阻塞) try: self._queue.put_nowait(None) except queue.Full: pass if self._listener_thread and self._listener_thread.is_alive(): self._listener_thread.join(timeout=3.0) if self._worker_thread and self._worker_thread.is_alive(): self._worker_thread.join(timeout=2.0) atexit.unregister(self.stop) def pause(self) -> None: """暂停事件记录(监听器继续运行但不记录)。""" with self._lock: self._paused = True def resume(self) -> None: """恢复事件记录。""" with self._lock: self._paused = False # ── 活跃度查询 ──────────────────────────────────────────────────────── def get_activity(self) -> float: """返回 0.0~1.0 活跃度。""" return self._activity.get_activity() def is_idle(self) -> bool: return self._activity.is_idle() def is_highly_engaged(self) -> bool: return self._activity.is_highly_engaged() def get_activity_level(self) -> ActivityLevel: return self._activity.get_activity_level() def get_event_count(self, window_seconds: float | None = None) -> int: """返回指定窗口内的事件总数(用于调试)。""" if window_seconds is None: window_seconds = self.window_seconds with self._activity._lock: now = time.monotonic() cutoff = now - window_seconds return sum(1 for t, _ in self._activity._events if t > cutoff) # ── 内部实现 ───────────────────────────────────────────────────────── def _start_dummy(self) -> None: """Dummy 模式:仅启动 worker 线程,不启动 pynput。""" self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="km-dummy") self._worker_thread.start() def _start_real(self) -> None: """真实模式:启动 pynput listener + worker 线程。""" # worker 消费队列 self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="km-worker") self._worker_thread.start() # pynput listener 在独立线程运行 self._listener_thread = threading.Thread(target=self._listener_loop, daemon=True, name="km-listener") self._listener_thread.start() def _listener_loop(self) -> None: """pynput listener 主循环——在独立线程运行。""" from pynput import keyboard, mouse def on_key(event: keyboard.Key | keyboard.KeyCode | None) -> bool: # pynput 对普通按压发送 KeyCode,对特殊键发送 Key if event is None: return True # 忽略 None try: et = "keyboard" ts = time.monotonic() self._queue.put_nowait((et, ts)) except queue.Full: pass return True # 继续监听 def on_mouse(x: int, y: int, button: mouse.Button, pressed: bool) -> bool: # 只记录按下事件,忽略释放,减少重复计数 if pressed: try: et = "mouse" ts = time.monotonic() self._queue.put_nowait((et, ts)) except queue.Full: pass return True kl = keyboard.Listener(on_press=on_key, on_release=None, suppress=False) ml = mouse.Listener(on_click=on_mouse, on_move=False, on_scroll=False, suppress=False) kl.start() ml.start() kl.join() ml.join() def _worker_loop(self) -> None: """消费队列,记录事件到 ActivityDetector。""" while self._running: try: item = self._queue.get(timeout=0.5) if item is None: break # 停止信号 event_type, timestamp = item with self._lock: if not self._paused: self._activity.record(event_type, timestamp) if self.on_event: self.on_event(event_type, timestamp) except queue.Empty: continue except Exception: # 不让异常杀死 worker pass # --------------------------------------------------------------------------- # ScreenCapture — 屏幕截图(mss) # --------------------------------------------------------------------------- @dataclass class ScreenCapture: """ 屏幕截图工具(mss)。 用法 ---- cap = ScreenCapture() screenshot = cap.capture(monitor=0) # PIL.Image screenshot.save("screen.png") text = cap.extract_text(screenshot) # 可选 OCR(需要 pytesseract) """ _mss = None _ocr_available: bool | None = None def __post_init__(self) -> None: self._load_mss() def _load_mss(self) -> bool: """延迟导入 mss,返回是否成功。""" if self._mss is not None: return True try: import mss # noqa: F401 self._mss = True return True except ImportError: self._mss = False return False def is_available(self) -> bool: """mss 是否可用。""" return self._mss is True def capture(self, monitor: int = 0) -> Any | None: """ 截取指定显示器(0=主屏)。 返回 PIL.Image 或 None(mss 不可用时)。 """ if not self._mss: return None try: import mss with mss.msssshot() as sct: sct_img = sct.shot(mon=monitor) from PIL import Image import numpy as np img = Image.fromarray(np.asarray(sct_img)) return img except Exception: return None def capture_region(self, x: int, y: int, width: int, height: int) -> Any | None: """ 截取屏幕区域。 参数 ---- x, y : 左上角坐标(像素) width, height : 区域宽高(像素) 返回 PIL.Image 或 None。 """ if not self._mss: return None try: import mss import numpy as np from PIL import Image mon = {"left": x, "top": y, "width": width, "height": height} with mss.mssshot() as sct: sct_img = sct.grab(mon) return Image.fromarray(np.asarray(sct_img)) except Exception: return None def extract_text(self, image: Any) -> str: """ 从截图提取文字(OCR)。 需要安装:pytesseract + Tesseract-OCR macOS: brew install tesseract tesseract-lang Windows: pip install tesseract 返回识别出的文字(换行分隔)。 """ try: import pytesseract self._ocr_available = True return pytesseract.image_to_string(image, lang="chi_sim+eng") except ImportError: self._ocr_available = False return "" except Exception: return "" def get_monitors(self) -> list[dict]: """返回所有显示器的信息列表。""" if not self._mss: return [] try: import mss with mss.mssshot() as sct: return sct.monitors except Exception: return [] # --------------------------------------------------------------------------- # 单例全局 Monitor(方便跨模块使用) # --------------------------------------------------------------------------- _global_monitor: KeyboardMouseMonitor | None = None def get_global_monitor( window_seconds: float = 300.0, idle_timeout: float = 60.0, ) -> KeyboardMouseMonitor: """获取全局单例 KeyboardMouseMonitor。""" global _global_monitor if _global_monitor is None: _global_monitor = KeyboardMouseMonitor( window_seconds=window_seconds, idle_timeout_seconds=idle_timeout, ) _global_monitor.start() return _global_monitor def stop_global_monitor() -> None: """停止全局单例。""" global _global_monitor if _global_monitor: _global_monitor.stop() _global_monitor = None