Files
EzVibe/agent/health_tracker.py
e2hang 96cb28fe08 feat: multi-layer eventFilter for window drag + text input fix + Qt platform xcb
- Fix window drag: install eventFilter on live2d_container, central, and
  _live2d_widget; fix super() call in dynamic class
- Fix text input: remove WA_TransparentForMouseEvents from _chat_container
- Force QT_QPA_PLATFORM=xcb on Linux (wayland has mouse event issues)
- Add HealthTracker module, update AgentBrain with health integration
- Update scheduler and memory modules
- Add v5_modify documentation

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-19 23:36:58 +08:00

477 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
EzVibe HealthTracker
====================
结构化健康数据追踪器SQLite
设计文档对应章节Phase 1.3 — 新增 HealthTracker 模块
功能
• 记录精确时间戳的健康事件(喝水、起身、伸展、离开屏幕)
• 提供快速查询接口(今日统计、连续静坐时长)
• 生成注入 LLM System Prompt 的健康上下文
• 管理久坐警报
数据模型
• health_events — 健康事件表(事件流)
• daily_stats — 每日统计表(聚合)
• sedentary_alerts — 久坐警报表
与设计文档对照
• HealthTracker结构化时序数据健康事件
• VectorMemory语义数据偏好、习惯、对话
"""
from __future__ import annotations
import json
import logging
import time
import uuid
from datetime import datetime, date
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# ============================================================================
# 数据库Schema
# ============================================================================
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS health_events (
id TEXT PRIMARY KEY,
event_type TEXT NOT NULL, -- 'water' | 'stand' | 'stretch' | 'screen_break'
timestamp REAL NOT NULL, -- Unix timestamp
source TEXT NOT NULL, -- 'user_action' | 'reminder_confirmed' | 'implicit_detected'
metadata TEXT NOT NULL, -- JSON: {"count": 1, "note": ""}
created_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_event_type ON health_events(event_type);
CREATE INDEX IF NOT EXISTS idx_timestamp ON health_events(timestamp DESC);
CREATE TABLE IF NOT EXISTS daily_stats (
date TEXT PRIMARY KEY, -- 'YYYY-MM-DD'
water_count INTEGER DEFAULT 0,
stand_count INTEGER DEFAULT 0,
stretch_count INTEGER DEFAULT 0,
sedentary_minutes INTEGER DEFAULT 0,
screen_time_minutes INTEGER DEFAULT 0,
last_stand_at REAL,
last_water_at REAL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS sedentary_alerts (
id TEXT PRIMARY KEY,
triggered_at REAL NOT NULL,
acknowledged INTEGER DEFAULT 0,
acknowledged_at REAL,
message TEXT
);
"""
# ============================================================================
# 数据库助手
# ============================================================================
import sqlite3
def _dict_from_row(row: tuple, columns: list[str]) -> dict:
return dict(zip(columns, row))
# ============================================================================
# HealthTracker 主类
# ============================================================================
class HealthTracker:
"""
结构化健康数据追踪器。
参数
----
db_path : str
SQLite 数据库路径,默认 `data/health.db`。
"""
def __init__(self, db_path: str = "data/health.db") -> None:
self._db_path = Path(db_path)
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn: sqlite3.Connection | None = None
self._init_db()
def _init_db(self) -> None:
"""初始化数据库(创建表)。"""
conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
conn.execute("PRAGMA foreign_keys = ON;")
conn.executescript(SCHEMA_SQL)
conn.commit()
self._conn = conn
logger.info("[HealthTracker] Database initialized: %s", self._db_path)
def close(self) -> None:
if self._conn:
self._conn.close()
self._conn = None
# ── 事件记录 ──────────────────────────────────────────────────────────
async def record_water(self, count: int = 1, source: str = "user_action") -> str:
"""记录喝水事件。返回事件 ID。"""
return await self._record_event("water", count=count, source=source)
async def record_stand(self, source: str = "implicit_detected") -> str:
"""记录起身事件。返回事件 ID。"""
return await self._record_event("stand", count=1, source=source)
async def record_stretch(self, source: str = "reminder_confirmed") -> str:
"""记录伸展事件。返回事件 ID。"""
return await self._record_event("stretch", count=1, source=source)
async def record_screen_break(self, source: str = "implicit_detected") -> str:
"""记录离开屏幕事件。返回事件 ID。"""
return await self._record_event("screen_break", count=1, source=source)
async def _record_event(
self,
event_type: str,
count: int = 1,
source: str = "user_action",
metadata: dict | None = None,
) -> str:
"""通用事件记录。"""
if self._conn is None:
raise RuntimeError("Database not initialized")
event_id = str(uuid.uuid4())[:12]
now = time.time()
meta = json.dumps(metadata or {"count": count})
self._conn.execute(
"""
INSERT INTO health_events (id, event_type, timestamp, source, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(event_id, event_type, now, source, meta, now),
)
self._conn.commit()
# 更新每日统计
self._update_daily_stats(event_type, now)
logger.debug(
"[HealthTracker] Recorded event: type=%s id=%s source=%s",
event_type, event_id, source,
)
return event_id
def _update_daily_stats(self, event_type: str, timestamp: float) -> None:
"""更新每日统计表。"""
if self._conn is None:
return
today = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d")
now = time.time()
# 插入或更新
if event_type == "water":
self._conn.execute(
"""
INSERT INTO daily_stats (date, water_count, last_water_at, updated_at)
VALUES (?, 1, ?, ?)
ON CONFLICT(date) DO UPDATE SET
water_count = water_count + 1,
last_water_at = excluded.last_water_at,
updated_at = excluded.updated_at
""",
(today, timestamp, now),
)
elif event_type == "stand":
self._conn.execute(
"""
INSERT INTO daily_stats (date, stand_count, last_stand_at, updated_at)
VALUES (?, 1, ?, ?)
ON CONFLICT(date) DO UPDATE SET
stand_count = stand_count + 1,
last_stand_at = excluded.last_stand_at,
updated_at = excluded.updated_at
""",
(today, timestamp, now),
)
elif event_type == "stretch":
self._conn.execute(
"""
INSERT INTO daily_stats (date, stretch_count, updated_at)
VALUES (?, 1, ?)
ON CONFLICT(date) DO UPDATE SET
stretch_count = stretch_count + 1,
updated_at = excluded.updated_at
""",
(today, now),
)
self._conn.commit()
# ── 查询接口 ──────────────────────────────────────────────────────────
def get_today_stats(self) -> dict[str, Any]:
"""
返回今日统计:
返回
----
dict 包含 water_count, stand_count, sedentary_minutes, last_water_at, last_stand_at
"""
if self._conn is None:
return {}
today = date.today().strftime("%Y-%m-%d")
row = self._conn.execute(
"SELECT * FROM daily_stats WHERE date = ?", (today,)
).fetchone()
if row is None:
return {
"water_count": 0,
"stand_count": 0,
"stretch_count": 0,
"sedentary_minutes": 0,
"last_water_at": None,
"last_stand_at": None,
}
cols = [
"date", "water_count", "stand_count", "stretch_count",
"sedentary_minutes", "screen_time_minutes", "last_stand_at",
"last_water_at", "updated_at",
]
stats = _dict_from_row(row, cols)
return {
"water_count": stats["water_count"],
"stand_count": stats["stand_count"],
"stretch_count": stats["stretch_count"],
"sedentary_minutes": stats["sedentary_minutes"],
"last_water_at": stats["last_water_at"],
"last_stand_at": stats["last_stand_at"],
}
def get_consecutive_sedentary_minutes(self) -> int:
"""
返回当前连续静坐分钟数。
计算逻辑:查找最近一次 stand 事件之后的无事件时长。
"""
if self._conn is None:
return 0
# 查找最近起身时间
row = self._conn.execute(
"SELECT timestamp FROM health_events WHERE event_type = 'stand' ORDER BY timestamp DESC LIMIT 1"
).fetchone()
if row is None:
# 无起身记录,查找电脑空闲开始时间(通过 daily_stats 的 sedentary_minutes
today = date.today().strftime("%Y-%m-%d")
stat_row = self._conn.execute(
"SELECT sedentary_minutes FROM daily_stats WHERE date = ?", (today,)
).fetchone()
if stat_row:
return stat_row[0]
return 0
last_stand = row[0]
consecutive = (time.time() - last_stand) / 60.0
return int(consecutive)
def get_last_event_time(self, event_type: str) -> float | None:
"""返回指定事件类型的最近一次时间戳。"""
if self._conn is None:
return None
row = self._conn.execute(
"SELECT timestamp FROM health_events WHERE event_type = ? ORDER BY timestamp DESC LIMIT 1",
(event_type,),
).fetchone()
return row[0] if row else None
async def get_health_context_for_llm(self) -> str:
"""
生成注入 LLM System Prompt 的健康上下文。
返回格式
--------
【用户健康状态】
- 今日喝水3 次(上次 14:30
- 连续静坐120 分钟 ⚠️
- 上次起身16:00
"""
stats = self.get_today_stats()
consecutive = self.get_consecutive_sedentary_minutes()
# 格式化时间
def fmt_time(ts: float | None) -> str:
if ts is None:
return "无记录"
return datetime.fromtimestamp(ts).strftime("%H:%M")
water_count = stats.get("water_count", 0)
stand_count = stats.get("stand_count", 0)
last_water = fmt_time(stats.get("last_water_at"))
last_stand = fmt_time(stats.get("last_stand_at"))
# 警告符号
warning = " ⚠️" if consecutive > 60 else ""
lines = [
"【用户健康状态】",
f"- 今日喝水:{water_count} 次(上次 {last_water}",
f"- 连续静坐:{consecutive} 分钟{warning}",
f"- 上次起身:{last_stand}",
f"- 今日起身:{stand_count}",
]
return "\n".join(lines)
# ── 警报管理 ──────────────────────────────────────────────────────────
async def trigger_sedentary_alert(self, message: str) -> str:
"""触发久坐警报。返回警报 ID。"""
if self._conn is None:
raise RuntimeError("Database not initialized")
alert_id = str(uuid.uuid4())[:12]
now = time.time()
self._conn.execute(
"""
INSERT INTO sedentary_alerts (id, triggered_at, message)
VALUES (?, ?, ?)
""",
(alert_id, now, message),
)
self._conn.commit()
logger.info("[HealthTracker] Sedentary alert triggered: id=%s message=%s", alert_id, message)
return alert_id
def get_pending_alerts(self) -> list[dict]:
"""返回未确认的久坐警报列表。"""
if self._conn is None:
return []
rows = self._conn.execute(
"SELECT id, triggered_at, message FROM sedentary_alerts WHERE acknowledged = 0 ORDER BY triggered_at DESC"
).fetchall()
return [
{"id": r[0], "triggered_at": r[1], "message": r[2]}
for r in rows
]
async def acknowledge_alert(self, alert_id: str) -> bool:
"""确认警报。返回是否成功。"""
if self._conn is None:
return False
now = time.time()
cur = self._conn.execute(
"SELECT id FROM sedentary_alerts WHERE id = ? AND acknowledged = 0",
(alert_id,),
).fetchone()
if cur is None:
return False
self._conn.execute(
"UPDATE sedentary_alerts SET acknowledged = 1, acknowledged_at = ? WHERE id = ?",
(now, alert_id),
)
self._conn.commit()
return True
# ── 统计聚合 ─────────────────────────────────────────────────────────
def get_weekly_report(self) -> dict[str, Any]:
"""返回本周健康报告7天汇总"""
if self._conn is None:
return {}
# 计算本周开始日期(周一)
today = date.today()
weekday = today.weekday() # 0=Monday
week_start = (today - datetime.timedelta(days=weekday)).strftime("%Y-%m-%d")
rows = self._conn.execute(
"""
SELECT date, water_count, stand_count, stretch_count, sedentary_minutes
FROM daily_stats
WHERE date >= ?
ORDER BY date ASC
""",
(week_start,),
).fetchall()
total_water = sum(r[1] for r in rows)
total_stand = sum(r[2] for r in rows)
total_stretch = sum(r[3] for r in rows)
avg_sedentary = (
sum(r[4] for r in rows) // len(rows) if rows else 0
)
return {
"week_start": week_start,
"days_recorded": len(rows),
"total_water": total_water,
"total_stand": total_stand,
"total_stretch": total_stretch,
"avg_sedentary_minutes": avg_sedentary,
"daily_breakdown": [
{"date": r[0], "water": r[1], "stand": r[2], "stretch": r[3]}
for r in rows
],
}
def get_streak(self, event_type: str) -> int:
"""
返回连续完成某事件的日数(用于成就系统)。
计算:从今天往前推算连续有记录的天数。
"""
if self._conn is None:
return 0
today = date.today()
streak = 0
for i in range(365): # 最多回查1年
check_date = (today - datetime.timedelta(days=i)).strftime("%Y-%m-%d")
row = self._conn.execute(
"SELECT COUNT(*) FROM health_events WHERE event_type = ? AND date(timestamp, 'unixepoch', 'localtime', '+8 hours') = ?",
(event_type, check_date),
).fetchone()
if row and row[0] > 0:
streak += 1
else:
break
return streak
def update_sedentary_minutes(self, minutes: int) -> None:
"""更新今日连续静坐分钟数(由外部计时器调用)。"""
if self._conn is None:
return
today = date.today().strftime("%Y-%m-%d")
now = time.time()
self._conn.execute(
"""
INSERT INTO daily_stats (date, sedentary_minutes, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(date) DO UPDATE SET
sedentary_minutes = excluded.sedentary_minutes,
updated_at = excluded.updated_at
""",
(today, minutes, now),
)
self._conn.commit()
def __del__(self) -> None:
self.close()