【DQN】使用标准DQN(优化)进行CartPole游戏的经典强化学习训练

By e2hang at 24 小时前 • 0人收藏 • 3人看过

一、无经验回放

先放一个没有经验池(经验回放)的代码

import gymnasium as gym
import random
import torch
import torch.optim as optim
import torch.nn.functional as F
import torch.nn as nn
import time

# 创建环境
#env = gym.make("CartPole-v1", render_mode="human")  # human模式会用pyglet显示窗口
env = gym.make('CartPole-v1')
# 重置环境
observation, info = env.reset()
print("初始观察值:", observation)

#记住DQN训练的是Q*,输出的也是Q*,而不是动作,动作要根据Q*判断并反馈
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.l1 = nn.Linear(state_size, 128)
        self.l3 = nn.Linear(128, 64)
        self.l4 = nn.Linear(64, 32)
        self.l5 = nn.Linear(32, action_size)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l3(x))
        x = F.relu(self.l4(x))
        x = self.l5(x)
        return x


#定义参数
epsilon = 0.95
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
gamma = 0.99
lrs = 0.005
epsilon_decay = 0.995
epsilon_min = 0.01
batch_size = 64
#memory = deque(maxlen=10000)
num_episodes = 500

#初始化
ez = DQN(state_size, action_size)
optimizer = optim.Adam(ez.parameters(), lr = lrs)
criterion = nn.MSELoss()

#训练
for i in range(500):
    state, _ = env.reset()   # gym >=0.26 返回 (obs, info)
    total_reward = 0.0
    done = False

    while not done:
        # ε-greedy 选择动作
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = ez(torch.tensor(state, dtype=torch.float32)).argmax().item()

        # 与环境交互
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

        # 计算 Q(s, a)
        q_values = ez(torch.tensor(state, dtype=torch.float32))
        now_Q = q_values[action]

        # 计算 target
        next_q_value = ez(torch.tensor(next_state, dtype=torch.float32)).max().item()
        target_Q = reward + gamma * next_q_value * (0 if done else 1)

        # 计算损失
        target_Q = torch.tensor(target_Q, dtype=torch.float32)
        loss = criterion(now_Q, target_Q)

        # 更新网络
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # 状态更新
        state = next_state

    print(f"Episode {i}, total_reward = {total_reward}")

# 测试函数
def test_agent(env, model, episodes=10):
    total_rewards = []
    for ep in range(episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0.0
        while not done:
            # 关闭梯度计算,加速
            with torch.no_grad():
                action = model(torch.tensor(state, dtype=torch.float32)).argmax().item()
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = next_state
            total_reward += reward
        total_rewards.append(total_reward)
        print(f"Test Episode {ep+1}: reward = {total_reward}")

    avg_reward = sum(total_rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward}")

# 调用测试
test_agent(env, ez, episodes=10)

#最佳数据:平均150

这里很明显可以看到,测试的平均reward只有150,学习了,但是学习得很差


二、有经验回放

这里放有经验回放的代码,精髓是deque和随机batch抽取

还有是epsilon - greedy算法,这里epsilon随着训练过程而减小,帮助训练过程收敛

from collections import deque
import numpy as np
import random
import torch
import torch.optim as optim
import torch.nn as nn
import gymnasium as gym
import torch.nn.functional as F


class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.l1 = nn.Linear(state_size, 128)
        self.l2 = nn.Linear(128, 32)
        self.l4 = nn.Linear(32, action_size)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.relu(self.l2(x))
        x = self.l4(x)
        return x

lr = 0.001
gamma = 0.99
epsilon = 1.0
memory = deque(maxlen=1000000)
batch_size = 64
epsilon_decay = 0.995
epsilon_min = 0.01

env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n


ez = DQN(state_size, action_size)
optimizer = optim.Adam(ez.parameters(), lr=lr)
criterion = nn.MSELoss()

def replay():
    if(len(memory) < batch_size):
        return

    #随机选一组64个训练
    batch = random.sample(memory, batch_size)
    states, actions, rewards, next_states, dones = zip(*batch)
    states = torch.from_numpy(np.stack(states))  # 直接把 batch 堆成 array 再转 tensor
    next_states = torch.from_numpy(np.stack(next_states))
    rewards = torch.from_numpy(np.array(rewards, dtype=np.float32))
    actions = torch.from_numpy(np.array(actions, dtype=np.int64))
    dones = torch.from_numpy(np.array(dones, dtype=np.float32))

    q_values = ez(states).gather(1, actions.unsqueeze(1)).squeeze()
    next_q_values = ez(next_states).max(1)[0]
    target = rewards + gamma * next_q_values * (1 - dones)

    loss = criterion(q_values, target.detach())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

for e in range(350):
    state, _ = env.reset()
    done = False
    total_reward = 0
    while not done:
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = ez(torch.tensor(state, dtype=torch.float32)).argmax().item()

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        # 在存储到 memory 时:
        memory.append((np.array(state, dtype=np.float32),
                       action,
                       reward,
                       np.array(next_state, dtype=np.float32),
                       done))
        state = next_state
        total_reward += reward
        replay()

    epsilon = max(epsilon * epsilon_decay, epsilon_min)
    print(f"Episode {e + 1}: Reward = {total_reward}")

torch.save(ez.state_dict(), "cartpole_dqn.pth")
print("Model saved to cartpole_dqn.pth")

# 测试函数
def test_agent(env, model, episodes=10):
    env = gym.make('CartPole-v1', render_mode='human')
    total_rewards = []
    for ep in range(episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0.0
        while not done:
            # 关闭梯度计算,加速
            with torch.no_grad():
                action = model(torch.tensor(state, dtype=torch.float32)).argmax().item()
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            state = next_state
            total_reward += reward
        total_rewards.append(total_reward)
        print(f"Test Episode {ep+1}: reward = {total_reward}")

    avg_reward = sum(total_rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward}")

# 调用测试
test_agent(env, ez, episodes=10)

训练结果如下

dc7e4c08100287899297ebd2ff9ebdfc.png

不过这个版本也是有问题的,没有抑制过拟合(添加噪声等),也没有使用DDQN(目标网络w-)


三、总结

很明显可以感觉到,DQN是十分随机的,很多时候都会有各种情况导致训练无法收敛/过拟合,使用技巧和方法去优化DQN是极为重要的

登录后方可回帖

登 录
信息栏
欢迎来到滑稽社论坛!注册会员即可发帖!

你好啊

Loading...