Files
TexasPoker-AI/eval_elo.py
2026-05-11 17:32:43 +08:00

446 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
eval_elo.py — Deep CFR 模型对抗评估脚本
让两个不同轮次的 checkpoint 模型 (Model A / Model B) 进行 100,000 局对战,
计算 Model A 对 Model B 的百手赢率 (bb/100)。
=== 核心设计 ===
1. 多进程并发: ProcessPoolExecutor(mp_context='spawn'), 22 Workers
2. 绝对公平: 50,000 局 Model A 做 P0(小盲), 50,000 局 Model A 做 P1(大盲)
3. 严格推理: 使用 avg_strategy + legal_mask 归一化 + numpy.random.choice 采样
4. bb/100 指标: (总筹码收益 / BB) / (总局数 / 100)
用法:
python eval_elo.py --model_a ckpt_10000.pt --model_b ckpt_5000.pt
python eval_elo.py --model_a checkpoints/ckpt_iter_10000.pt --model_b checkpoints/ckpt_iter_5000.pt --num_games 10000
"""
# ── 必须在 import torch 之前,锁死 C++ 线性代数库多线程,防止 spawn 模式下 OpenMP 死锁 ──
import os
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
import argparse
import random
import multiprocessing as mp
from typing import Dict, List, Tuple
import numpy as np
import torch
import pyspiel
from concurrent.futures import ProcessPoolExecutor, as_completed
torch.set_num_threads(1)
# ── 将 poker/ 根目录加入 sys.path ──
import sys
_POKER_DIR = os.path.abspath(os.path.dirname(__file__))
if _POKER_DIR not in sys.path:
sys.path.insert(0, _POKER_DIR)
from env_adapter import (
HUNL_SB_BB_STRING,
BetTranslator,
extract_env_state,
CFR_ACTIONS,
NUM_CFR_ACTIONS,
STACK_NORMALIZE,
)
from card_model.config import PAD_TOKEN, BOARD_SIZE
from card_model.data_generator import extract_cards_from_state
from card_model.model import CardModel
from cfr_net import CFRNetwork, CARD_DIM, ENV_DIM, NUM_ACTIONS
# ───────────────────── 常量 ─────────────────────
STREET_NORMALIZE = 3.0
BIG_BLIND = 100 # BB = 100 筹码
NUM_WORKERS = 22 # 并行 Worker 数
CARD_MODEL_CHECKPOINT = os.path.join(_POKER_DIR, "card_model", "data", "best_card_model.pt")
# ───────────────────── 全局 Worker 状态 ─────────────────────
_WORKER_STATE: Dict = {}
# ───────────────────── 特征构建 ─────────────────────
def build_env_features(env_info: dict) -> torch.Tensor:
"""将 extract_env_state 返回的字典归一化为 5 维 env_features Tensor。"""
features = [
env_info["pot"] / STACK_NORMALIZE,
env_info["p0_stack"] / STACK_NORMALIZE,
env_info["p1_stack"] / STACK_NORMALIZE,
env_info["street"] / STREET_NORMALIZE,
float(env_info["position"]),
]
return torch.tensor(features, dtype=torch.float32)
def build_card_features(card_model: CardModel, state) -> torch.Tensor:
"""
使用 CardModel 从当前 state 提取 50 维胜率直方图 (card_features)。
必须使用 extract_cards_from_state(state, player_id=current_player)
提取当前行动玩家的底牌,绝不能看错牌!
"""
current_player = state.current_player()
hole_cards, board_cards = extract_cards_from_state(state, player_id=current_player)
model_device = next(card_model.parameters()).device
x_hole = torch.tensor([hole_cards], dtype=torch.int64, device=model_device) # [1, 2]
padded_board = board_cards + [PAD_TOKEN] * (BOARD_SIZE - len(board_cards))
x_board = torch.tensor([padded_board], dtype=torch.int64, device=model_device) # [1, 5]
with torch.no_grad():
_, pred_histogram = card_model(x_hole, x_board) # [1, 50]
return pred_histogram.squeeze(0).cpu()
# ───────────────────── Worker 初始化 ─────────────────────
def _init_worker(
model_a_state_dict: dict,
model_b_state_dict: dict,
card_state_dict: dict,
) -> None:
"""
ProcessPoolExecutor 的 Worker 初始化函数。
在每个 Worker 进程启动时调用一次,创建该进程专属的:
- OpenSpiel 游戏实例 (标准 P0=SB, P1=BB)
- CPU 上的两个 CFRNetworkModel A / Model Beval 模式)
- CPU 上的 CardModeleval 模式)
- BetTranslator
所有模型权重通过 state_dict 参数传入CPU Tensor
避免 CUDA 跨进程问题。
"""
global _WORKER_STATE
# 每个 Worker 自行创建 OpenSpiel 游戏实例(不可跨进程传递)
_WORKER_STATE["game"] = pyspiel.load_game(HUNL_SB_BB_STRING) # P0=SB(50), P1=BB(100)
# 在 CPU 上创建 Model A 的 CFRNetwork 并加载权重
cfr_net_a = CFRNetwork(
card_dim=CARD_DIM, env_dim=ENV_DIM, num_actions=NUM_ACTIONS,
)
cfr_net_a.load_state_dict(model_a_state_dict)
cfr_net_a.eval()
_WORKER_STATE["cfr_net_a"] = cfr_net_a
# 在 CPU 上创建 Model B 的 CFRNetwork 并加载权重
cfr_net_b = CFRNetwork(
card_dim=CARD_DIM, env_dim=ENV_DIM, num_actions=NUM_ACTIONS,
)
cfr_net_b.load_state_dict(model_b_state_dict)
cfr_net_b.eval()
_WORKER_STATE["cfr_net_b"] = cfr_net_b
# 在 CPU 上创建 CardModel 并加载权重
# CardModel 是共享的——两个模型用相同的牌面编码器
card_model = CardModel()
card_model.load_state_dict(card_state_dict)
card_model.eval()
_WORKER_STATE["card_model"] = card_model
# BetTranslator 是无状态对象,可以直接创建
_WORKER_STATE["translator"] = BetTranslator()
# ───────────────────── 单步推理 ─────────────────────
def _choose_action(
cfr_net: CFRNetwork,
card_model: CardModel,
translator: BetTranslator,
state,
) -> int:
"""
使用 avg_strategy 进行动作采样,返回引擎原生 action ID。
严格规则:
1. 使用 extract_cards_from_state(state, player_id=current_player) 提取底牌
2. 调用 get_strategy() 后丢弃第一个返回值,使用第二个返回值 avg_strategy
3. 绝对禁止 argmax用 avg_strategy * legal_mask 归一化后 numpy.random.choice 采样
4. 用 BetTranslator.cfr_to_engine_action() 转换为引擎动作
"""
current_player = state.current_player()
# 提取环境特征
env_info = extract_env_state(state, translator)
env_features = build_env_features(env_info) # [5]
card_features = build_card_features(card_model, state) # [50]
legal_mask = env_info["legal_mask"] # list[int], 长度5
# 构造网络输入(加 batch 维度)
card_input = card_features.unsqueeze(0) # [1, 50]
env_input = env_features.unsqueeze(0) # [1, 5]
legal_mask_tensor = torch.tensor([legal_mask], dtype=torch.float32) # [1, 5]
# ── 核心:调用 get_strategy丢弃 current_strategy只用 avg_strategy ──
with torch.no_grad():
_, avg_strategy = cfr_net.get_strategy(card_input, env_input, legal_mask_tensor)
# 取出 [5] 向量,转为 numpy
avg_strat = avg_strategy.squeeze(0).cpu().numpy() # [5], numpy array
# ── avg_strategy 与 legal_mask 相乘,重新归一化 ──
legal_mask_np = np.array(legal_mask, dtype=np.float64)
masked_probs = avg_strat.astype(np.float64) * legal_mask_np
prob_sum = masked_probs.sum()
if prob_sum > 1e-9:
masked_probs = masked_probs / prob_sum
else:
# 极端兜底:在合法动作上均匀分布
num_legal = int(legal_mask_np.sum())
if num_legal > 0:
masked_probs = legal_mask_np / num_legal
else:
masked_probs[1] = 1.0 # 最后兜底CALL
# ── 使用 numpy.random.choice 按概率分布采样 ──
cfr_action_idx = int(np.random.choice(NUM_CFR_ACTIONS, p=masked_probs))
# ── 通过 BetTranslator 将 CFR 离散动作映射为引擎原生 action ID ──
engine_action = translator.cfr_to_engine_action(state, cfr_action_idx)
return engine_action
# ───────────────────── Worker 对战函数 ─────────────────────
def worker_play_games(
num_games: int,
model_a_is_p0: bool,
) -> float:
"""
Worker 进程中的对战函数。
执行 num_games 局对战,返回 Model A 的总筹码收益。
始终使用标准游戏 (P0=SB, P1=BB),通过 nets 列表的顺序实现位置轮换:
- model_a_is_p0=True: Model A = P0(SB), Model B = P1(BB)
- model_a_is_p0=False: Model A = P1(BB), Model B = P0(SB)
在 OpenSpiel 中state.returns()[0] 始终是 P0 的收益,
state.returns()[1] 始终是 P1 的收益。
因此需要根据 model_a_is_p0 将 P0/P1 收益映射到 Model A 的收益。
"""
global _WORKER_STATE
cfr_net_a = _WORKER_STATE["cfr_net_a"]
cfr_net_b = _WORKER_STATE["cfr_net_b"]
card_model = _WORKER_STATE["card_model"]
translator = _WORKER_STATE["translator"]
game = _WORKER_STATE["game"]
if model_a_is_p0:
# Model A 坐在 P0(SB) 位置
nets = [cfr_net_a, cfr_net_b]
else:
# Model A 坐在 P1(BB) 位置
nets = [cfr_net_b, cfr_net_a]
total_chips_won_by_a = 0.0
for _ in range(num_games):
state = game.new_initial_state()
while not state.is_terminal():
# ── Chance Node: 随机发牌 ──
if state.is_chance_node():
outcomes = state.chance_outcomes()
action_list, prob_list = zip(*outcomes)
chance_action = random.choices(action_list, weights=prob_list, k=1)[0]
state.apply_action(chance_action)
continue
# ── Player Node: 由对应模型推理选动作 ──
current_player = state.current_player()
current_net = nets[current_player]
engine_action = _choose_action(current_net, card_model, translator, state)
state.apply_action(engine_action)
# ── 终局:收集 Model A 的筹码收益 ──
returns = state.returns()
if model_a_is_p0:
# Model A 是 P0
total_chips_won_by_a += returns[0]
else:
# Model A 是 P1
total_chips_won_by_a += returns[1]
return total_chips_won_by_a
# ───────────────────── 加载模型权重 ─────────────────────
def load_cfr_state_dict(checkpoint_path: str) -> dict:
"""
加载 CFRNetwork 的 state_dict。
支持两种 checkpoint 格式:
- 训练存档格式: {"model_state_dict": ..., "optimizer_state_dict": ..., "iteration": ...}
- 纯模型格式: 直接的 state_dict如 export_model.py 导出的)
"""
print(f"[加载] 正在读取: {checkpoint_path}")
ckpt = torch.load(checkpoint_path, map_location="cpu", weights_only=False)
if isinstance(ckpt, dict) and "model_state_dict" in ckpt:
state_dict = ckpt["model_state_dict"]
iter_info = ckpt.get("iteration", "未知")
print(f"[加载] 训练存档格式, iteration={iter_info}")
else:
state_dict = ckpt
print(f"[加载] 纯模型权重格式")
# 确保所有 Tensor 在 CPU 上
return {k: v.cpu() for k, v in state_dict.items()}
def load_card_state_dict(checkpoint_path: str) -> dict:
"""加载 CardModel 的 state_dict。"""
print(f"[加载] CardModel: {checkpoint_path}")
ckpt = torch.load(checkpoint_path, map_location="cpu", weights_only=False)
if isinstance(ckpt, dict) and "model_state_dict" in ckpt:
state_dict = ckpt["model_state_dict"]
else:
state_dict = ckpt
return {k: v.cpu() for k, v in state_dict.items()}
# ───────────────────── 主函数 ─────────────────────
def main():
parser = argparse.ArgumentParser(
description="Deep CFR 模型对抗评估 — 计算 Model A 对 Model B 的 bb/100"
)
parser.add_argument(
"--model_a", type=str, required=True,
help="Model A 的 checkpoint 路径"
)
parser.add_argument(
"--model_b", type=str, required=True,
help="Model B 的 checkpoint 路径"
)
parser.add_argument(
"--num_games", type=int, default=100_000,
help="总对战局数 (默认 100,000)"
)
parser.add_argument(
"--num_workers", type=int, default=NUM_WORKERS,
help=f"并行 Worker 数 (默认 {NUM_WORKERS})"
)
parser.add_argument(
"--card_model", type=str, default=CARD_MODEL_CHECKPOINT,
help=f"CardModel 权重路径 (默认 {CARD_MODEL_CHECKPOINT})"
)
args = parser.parse_args()
total_games = args.num_games
num_workers = args.num_workers
# ── 1. 加载模型权重 ──
model_a_state_dict = load_cfr_state_dict(args.model_a)
model_b_state_dict = load_cfr_state_dict(args.model_b)
card_state_dict = load_card_state_dict(args.card_model)
# ── 2. 分配任务:半数局 Model A 做 P0半数做 P1 ──
games_per_side = total_games // 2
# 每一侧均匀分配给 num_workers 个 Worker
games_per_worker_per_side = games_per_side // num_workers
remainder_per_side = games_per_side % num_workers
# 构建 Worker 任务列表: (num_games, model_a_is_p0)
tasks: List[Tuple[int, bool]] = []
# Model A 做 P0 (小盲) 的任务
for i in range(num_workers):
games = games_per_worker_per_side + (1 if i < remainder_per_side else 0)
if games > 0:
tasks.append((games, True))
# Model A 做 P1 (大盲) 的任务
for i in range(num_workers):
games = games_per_worker_per_side + (1 if i < remainder_per_side else 0)
if games > 0:
tasks.append((games, False))
actual_total = sum(t[0] for t in tasks)
actual_a_as_p0 = sum(t[0] for t in tasks if t[1])
actual_a_as_p1 = sum(t[0] for t in tasks if not t[1])
print(f"\n{'='*70}")
print(f" Deep CFR 模型对抗评估")
print(f" Model A: {args.model_a}")
print(f" Model B: {args.model_b}")
print(f" 总局数: {actual_total} (A做P0: {actual_a_as_p0}, A做P1: {actual_a_as_p1})")
print(f" Workers: {num_workers} | BB = {BIG_BLIND} 筹码")
print(f"{'='*70}\n")
# ── 3. 启动多进程对战 ──
spawn_ctx = mp.get_context('spawn')
with ProcessPoolExecutor(
max_workers=num_workers,
initializer=_init_worker,
initargs=(model_a_state_dict, model_b_state_dict, card_state_dict),
mp_context=spawn_ctx,
) as executor:
# 提交所有任务
futures = []
for task_idx, (num_games_task, model_a_is_p0) in enumerate(tasks):
future = executor.submit(worker_play_games, num_games_task, model_a_is_p0)
futures.append((future, task_idx, num_games_task, model_a_is_p0))
# 收集结果
total_chips_won_by_a = 0.0
completed = 0
for future, task_idx, num_games_task, model_a_is_p0 in futures:
try:
chips = future.result()
total_chips_won_by_a += chips
completed += 1
position_str = "A=P0(SB)" if model_a_is_p0 else "A=P1(BB)"
if completed % 5 == 0 or completed == len(futures):
print(f" [进度] {completed}/{len(futures)} 任务完成 "
f"({position_str}, {num_games_task}局, "
f"Model A 筹码: {chips:+.0f})")
except Exception as e:
print(f" [警告] 任务 {task_idx} 执行失败: {e}")
# 失败的任务不影响整体,仅跳过
# ── 4. 计算 bb/100 ──
# bb/100 = (总筹码收益 / BB) / (总局数 / 100)
# = (total_chips_won / BIG_BLIND) / (total_games / 100)
# = total_chips_won * 100 / (BIG_BLIND * total_games)
bb_per_100 = (total_chips_won_by_a / BIG_BLIND) / (actual_total / 100)
# ── 5. 打印结果 ──
sign = "+" if bb_per_100 >= 0 else ""
print(f"\n{'='*70}")
print(f" 评估完成!")
print(f" 总局数: {actual_total}")
print(f" Model A 总筹码收益: {total_chips_won_by_a:+.0f}")
print(f" 经过 {actual_total:,} 局对抗Model A 对 Model B 的百手赢率为: "
f"{sign}{bb_per_100:.1f} bb/100")
print(f"{'='*70}")
if __name__ == "__main__":
main()