PyTorch 深度学习实战(25):逆向强化学习(Inverse RL)
ztj100 2025-04-26 22:45 34 浏览 0 评论
一、逆向强化学习原理
1. 逆向强化学习核心思想
逆向强化学习(Inverse Reinforcement Learning, IRL)旨在从专家示范中推断奖励函数,而非直接学习策略。与强化学习的区别在于:
对比维度 | 强化学习 (RL) | 逆向强化学习 (IRL) |
输入 | 已知奖励函数 | 已知专家轨迹(状态-动作序列) |
输出 | 最优策略 | 推测的奖励函数 + 模仿策略 |
目标 | 最大化累计奖励 | 使专家轨迹在推测的奖励函数下最优 |
应用场景 | 游戏、机器人控制 | 模仿学习、自动驾驶策略推断 |
2. 最大熵逆向强化学习框架
最大熵 IRL 通过最大化专家轨迹的似然推断奖励函数,核心公式:
二、生成对抗模仿学习(GAIL)算法
生成对抗模仿学习(Generative Adversarial Imitation Learning, GAIL)结合了 IRL 和 GAN 的思想:
- 生成器(策略网络):生成模仿专家行为的轨迹
- 判别器(奖励网络):区分专家轨迹和生成轨迹
- 对抗训练:策略网络欺骗判别器,使其无法分辨轨迹来源
数学表达:
三、GAIL 实现步骤(基于 Gymnasium)
我们将以 MuJoCo HalfCheetah 环境 为例,实现 GAIL 算法:
- 采集专家数据:使用预训练策略生成专家轨迹
- 构建策略网络:基于 PPO 的生成器
- 构建判别器网络:二分类网络区分专家/生成数据
- 对抗训练:交替优化生成器和判别器
四、代码实现
生成专家数据:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gymnasium as gym
from collections import deque
import time
# ================== 配置参数优化 ==================
class SACConfig:
env_name = "HalfCheetah-v5" # 使用与 GAIL 相同的环境
hidden_dim = 256 # 网络隐藏层维度
actor_lr = 3e-4 # 策略网络学习率
critic_lr = 3e-4 # 价值网络学习率
alpha_lr = 3e-4 # 温度系数学习率
gamma = 0.99 # 折扣因子
tau = 0.005 # 软更新系数
buffer_size = 100000 # 经验回放缓冲区大小
batch_size = 256 # 批量大小
max_episodes = 1000 # 最大训练回合数(可根据需要调整)
target_entropy = -torch.prod(torch.Tensor([1])).item() # 熵目标
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ================== 策略网络(Actor) ==================
class Actor(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, SACConfig.hidden_dim),
nn.ReLU(),
nn.Linear(SACConfig.hidden_dim, SACConfig.hidden_dim),
nn.ReLU(),
nn.Linear(SACConfig.hidden_dim, action_dim),
nn.Tanh() # 假设动作空间在 [-1, 1]
)
self.log_std = nn.Parameter(torch.zeros(action_dim))
def forward(self, state):
mean = self.net(state)
std = self.log_std.exp()
return mean, std
# ================== 价值网络(Critic) ==================
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, SACConfig.hidden_dim),
nn.ReLU(),
nn.Linear(SACConfig.hidden_dim, SACConfig.hidden_dim),
nn.ReLU(),
nn.Linear(SACConfig.hidden_dim, 1)
)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
return self.net(x)
# ================== SAC 训练系统 ==================
class SACTrainer:
def __init__(self):
self.env = gym.make(SACConfig.env_name)
self.state_dim = self.env.observation_space.shape[0]
self.action_dim = self.env.action_space.shape[0]
# 初始化网络
self.actor = Actor(self.state_dim, self.action_dim).to(SACConfig.device)
self.critic1 = Critic(self.state_dim, self.action_dim).to(SACConfig.device)
self.critic2 = Critic(self.state_dim, self.action_dim).to(SACConfig.device)
self.target_critic1 = Critic(self.state_dim, self.action_dim).to(SACConfig.device)
self.target_critic2 = Critic(self.state_dim, self.action_dim).to(SACConfig.device)
self.target_critic1.load_state_dict(self.critic1.state_dict())
self.target_critic2.load_state_dict(self.critic2.state_dict())
# 优化器
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=SACConfig.actor_lr)
self.critic1_optimizer = optim.Adam(self.critic1.parameters(), lr=SACConfig.critic_lr)
self.critic2_optimizer = optim.Adam(self.critic2.parameters(), lr=SACConfig.critic_lr)
# 自动调节温度系数 alpha
self.log_alpha = torch.zeros(1, requires_grad=True, device=SACConfig.device)
self.alpha_optimizer = optim.Adam([self.log_alpha], lr=SACConfig.alpha_lr)
# 经验回放缓冲区
self.buffer = deque(maxlen=SACConfig.buffer_size)
def select_action(self, state):
state = torch.FloatTensor(state).to(SACConfig.device)
mean, std = self.actor(state)
dist = Normal(mean, std)
action = dist.sample()
return action.detach().cpu().numpy()
def update(self):
if len(self.buffer) < SACConfig.batch_size:
return
# 从缓冲区采样
samples = random.sample(self.buffer, SACConfig.batch_size)
states, actions, rewards, next_states, dones = zip(*samples)
states = torch.FloatTensor(np.array(states)).to(SACConfig.device)
actions = torch.FloatTensor(np.array(actions)).to(SACConfig.device)
rewards = torch.FloatTensor(np.array(rewards)).unsqueeze(-1).to(SACConfig.device)
next_states = torch.FloatTensor(np.array(next_states)).to(SACConfig.device)
dones = torch.FloatTensor(np.array(dones)).unsqueeze(-1).to(SACConfig.device)
# 更新 Critic
with torch.no_grad():
next_actions, next_log_probs = self.actor(next_states)
target_q1 = self.target_critic1(next_states, next_actions)
target_q2 = self.target_critic2(next_states, next_actions)
target_q = torch.min(target_q1, target_q2) - self.log_alpha.exp() * next_log_probs
target_q = rewards + SACConfig.gamma * (1 - dones) * target_q
current_q1 = self.critic1(states, actions)
current_q2 = self.critic2(states, actions)
critic1_loss = nn.MSELoss()(current_q1, target_q)
critic2_loss = nn.MSELoss()(current_q2, target_q)
self.critic1_optimizer.zero_grad()
critic1_loss.backward()
self.critic1_optimizer.step()
self.critic2_optimizer.zero_grad()
critic2_loss.backward()
self.critic2_optimizer.step()
# 更新 Actor
new_actions, log_probs = self.actor(states)
q1 = self.critic1(states, new_actions)
q2 = self.critic2(states, new_actions)
actor_loss = (self.log_alpha.exp() * log_probs - torch.min(q1, q2)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新 alpha
alpha_loss = -(self.log_alpha * (log_probs + SACConfig.target_entropy).detach()).mean()
self.alpha_optimizer.zero_grad()
alpha_loss.backward()
self.alpha_optimizer.step()
# 软更新目标网络
for param, target_param in zip(self.critic1.parameters(), self.target_critic1.parameters()):
target_param.data.copy_(SACConfig.tau * param.data + (1 - SACConfig.tau) * target_param.data)
for param, target_param in zip(self.critic2.parameters(), self.target_critic2.parameters()):
target_param.data.copy_(SACConfig.tau * param.data + (1 - SACConfig.tau) * target_param.data)
def train_and_save_expert_data(self, save_path="expert_data.npy"):
expert_states = []
expert_actions = []
for episode in range(SACConfig.max_episodes):
state = self.env.reset()
episode_reward = 0
while True:
action = self.select_action(state)
next_state, reward, done, _ = self.env.step(action)
self.buffer.append((state, action, reward, next_state, done))
# 收集专家数据(后期训练阶段)
if episode > SACConfig.max_episodes // 2: # 后半段训练数据作为专家数据
expert_states.append(state)
expert_actions.append(action)
state = next_state
episode_reward += reward
self.update()
if done:
break
if (episode + 1) % 100 == 0:
print(f"Episode {episode+1} | Reward: {episode_reward:.1f}")
# 保存专家数据
np.save(save_path, {
'states': np.array(expert_states),
'actions': np.array(expert_actions)
})
print(f"专家数据已保存至 {save_path}")
if __name__ == "__main__":
start = time.time()
start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start))
print(f"开始时间: {start_str}")
print("训练专家策略...")
trainer = SACTrainer()
trainer.train_and_save_expert_data()
end = time.time()
end_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end))
print(f"训练完成时间: {end_str}")
print(f"总耗时: {end - start:.2f}秒")
实现代码:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Normal
import gymnasium as gym
from collections import deque
import time
import random
class GAILConfig:
env_name = "HalfCheetah-v5"
expert_data_path = "expert_data.npy"
hidden_dim = 256
policy_lr = 3e-4
discriminator_lr = 1e-4
gamma = 0.99
lam = 0.95
clip_epsilon = 0.2
batch_size = 64
max_episodes = 100
max_steps = 1000 # Added max steps per episode
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Policy(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.actor = nn.Sequential(
nn.Linear(state_dim, GAILConfig.hidden_dim),
nn.ReLU(),
nn.Linear(GAILConfig.hidden_dim, GAILConfig.hidden_dim),
nn.ReLU(),
nn.Linear(GAILConfig.hidden_dim, action_dim)
)
self.critic = nn.Sequential(
nn.Linear(state_dim, GAILConfig.hidden_dim),
nn.ReLU(),
nn.Linear(GAILConfig.hidden_dim, GAILConfig.hidden_dim),
nn.ReLU(),
nn.Linear(GAILConfig.hidden_dim, 1)
)
self.log_std = nn.Parameter(torch.zeros(action_dim))
def forward(self, state):
action_mean = self.actor(state)
value = self.critic(state)
return action_mean, value
def act(self, state):
with torch.no_grad():
action_mean, value = self.forward(state)
dist = Normal(action_mean, self.log_std.exp())
action = dist.sample()
log_prob = dist.log_prob(action).sum(-1)
return action.cpu().numpy(), log_prob.cpu().numpy(), value.cpu().numpy()
class Discriminator(nn.Module):
def __init__(self, state_dim, action_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim + action_dim, GAILConfig.hidden_dim),
nn.ReLU(),
nn.Linear(GAILConfig.hidden_dim, GAILConfig.hidden_dim),
nn.ReLU(),
nn.Linear(GAILConfig.hidden_dim, 1),
nn.Sigmoid()
)
def forward(self, state, action):
x = torch.cat([state, action], dim=-1)
return self.net(x)
class ReplayBuffer:
def __init__(self):
self.buffer = deque(maxlen=100000)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
samples = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*samples)
return (
torch.FloatTensor(np.array(states)).to(GAILConfig.device),
torch.FloatTensor(np.array(actions)).to(GAILConfig.device),
torch.FloatTensor(np.array(rewards)).unsqueeze(-1).to(GAILConfig.device),
torch.FloatTensor(np.array(next_states)).to(GAILConfig.device),
torch.FloatTensor(np.array(dones)).unsqueeze(-1).to(GAILConfig.device)
)
def __len__(self):
return len(self.buffer)
class GAILTrainer:
def __init__(self):
self.env = gym.make(GAILConfig.env_name)
self.state_dim = self.env.observation_space.shape[0]
self.action_dim = self.env.action_space.shape[0]
self.policy = Policy(self.state_dim, self.action_dim).to(GAILConfig.device)
self.discriminator = Discriminator(self.state_dim, self.action_dim).to(GAILConfig.device)
self.optimizer_policy = optim.Adam(self.policy.parameters(), lr=GAILConfig.policy_lr)
self.optimizer_discriminator = optim.Adam(self.discriminator.parameters(), lr=GAILConfig.discriminator_lr)
# Load expert data
self.expert_data = np.load(GAILConfig.expert_data_path, allow_pickle=True).item()
self.expert_states = torch.FloatTensor(self.expert_data['states']).to(GAILConfig.device)
self.expert_actions = torch.FloatTensor(self.expert_data['actions']).to(GAILConfig.device)
self.buffer = ReplayBuffer()
def compute_reward(self, states, actions):
with torch.no_grad():
d = self.discriminator(states, actions)
return -torch.log(1 - d + 1e-8)
def update_discriminator(self):
states, actions, _, _, _ = self.buffer.sample(GAILConfig.batch_size)
idx = np.random.randint(0, len(self.expert_states), GAILConfig.batch_size)
expert_states = self.expert_states[idx]
expert_actions = self.expert_actions[idx]
real_output = self.discriminator(expert_states, expert_actions)
fake_output = self.discriminator(states, actions)
loss_real = -torch.log(real_output + 1e-8).mean()
loss_fake = -torch.log(1 - fake_output + 1e-8).mean()
loss = loss_real + loss_fake
self.optimizer_discriminator.zero_grad()
loss.backward()
self.optimizer_discriminator.step()
return loss.item()
def compute_gae(self, rewards, values, dones):
advantages = torch.zeros_like(rewards)
last_advantage = 0
for t in reversed(range(len(rewards))):
if t == len(rewards) - 1:
next_value = 0
next_non_terminal = 1.0 - dones[t]
else:
next_value = values[t + 1]
next_non_terminal = 1.0 - dones[t]
delta = rewards[t] + GAILConfig.gamma * next_value * next_non_terminal - values[t]
advantages[t] = delta + GAILConfig.gamma * GAILConfig.lam * next_non_terminal * last_advantage
last_advantage = advantages[t]
returns = advantages + values
return advantages, returns
def update_policy(self, states, actions, log_probs, rewards, dones):
# Calculate values for all states
_, values = self.policy(states)
values = values.squeeze(-1)
# Compute GAE
advantages, returns = self.compute_gae(rewards.squeeze(-1), values, dones.squeeze(-1))
# Normalize advantages
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# Calculate new log probs and values
action_means, new_values = self.policy(states)
dist = Normal(action_means, torch.ones_like(action_means))
new_log_probs = dist.log_prob(actions).sum(-1, keepdim=True)
# PPO loss
ratio = (new_log_probs - log_probs).exp()
surr1 = ratio * advantages.unsqueeze(-1)
surr2 = torch.clamp(ratio, 1 - GAILConfig.clip_epsilon, 1 + GAILConfig.clip_epsilon) * advantages.unsqueeze(-1)
policy_loss = -torch.min(surr1, surr2).mean()
# Value loss
value_loss = 0.5 * (new_values - returns.unsqueeze(-1)).pow(2).mean()
# Total loss
total_loss = policy_loss + value_loss
self.optimizer_policy.zero_grad()
total_loss.backward()
self.optimizer_policy.step()
return total_loss.item()
def train(self):
for episode in range(GAILConfig.max_episodes):
state, _ = self.env.reset()
episode_reward = 0
episode_states = []
episode_actions = []
episode_rewards = []
episode_log_probs = []
episode_dones = []
for _ in range(GAILConfig.max_steps):
state_tensor = torch.FloatTensor(state).unsqueeze(0).to(GAILConfig.device)
action, log_prob, _ = self.policy.act(state_tensor)
action = action[0]
next_state, reward, terminated, truncated, _ = self.env.step(action)
done = terminated or truncated
episode_states.append(state)
episode_actions.append(action)
episode_rewards.append(reward)
episode_log_probs.append(log_prob)
episode_dones.append(done)
episode_reward += reward
self.buffer.add(state, action, reward, next_state, done)
state = next_state
if done:
break
# Convert to tensors
states_tensor = torch.FloatTensor(np.array(episode_states)).to(GAILConfig.device)
actions_tensor = torch.FloatTensor(np.array(episode_actions)).to(GAILConfig.device)
rewards_tensor = torch.FloatTensor(np.array(episode_rewards)).unsqueeze(-1).to(GAILConfig.device)
log_probs_tensor = torch.FloatTensor(np.array(episode_log_probs)).unsqueeze(-1).to(GAILConfig.device)
dones_tensor = torch.FloatTensor(np.array(episode_dones)).unsqueeze(-1).to(GAILConfig.device)
# Compute rewards from discriminator
rewards_tensor = self.compute_reward(states_tensor, actions_tensor)
# Update policy
p_loss = self.update_policy(
states_tensor,
actions_tensor,
log_probs_tensor,
rewards_tensor,
dones_tensor
)
# Update discriminator
d_loss = self.update_discriminator()
if (episode + 1) % 10 == 0:
print(f"Episode {episode+1} | Reward: {episode_reward:.1f} | Policy Loss: {p_loss:.2f} | D Loss: {d_loss:.2f}")
if __name__ == "__main__":
print("初始化环境...")
trainer = GAILTrainer()
trainer.train()
五、关键代码解析
1.判别器网络
- 输入为状态和动作的拼接,输出专家数据的概率(Sigmoid 激活)
- 损失函数为二元交叉熵,区分专家数据和生成数据
2.奖励计算
- 生成器的奖励来自判别器的输出:$r(s,a) = -\log(1 - D(s,a))$
- 判别器 $D(s,a)$ 越接近 1(专家数据),奖励越大
3.对抗训练流程
- 步骤 1:生成器采样轨迹并存储到经验回放缓冲区
- 步骤 2:用生成数据和专家数据更新判别器
- 步骤 3:用判别器计算的奖励更新生成器(基于 PPO)
六、训练输出示例
初始化环境...
Episode 10 | Reward: -953.6 | Policy Loss: 78.33 | D Loss: 1.23
Episode 20 | Reward: -1001.0 | Policy Loss: 58.94 | D Loss: 1.05
Episode 30 | Reward: -1096.4 | Policy Loss: 50.88 | D Loss: 0.88
Episode 40 | Reward: -1108.3 | Policy Loss: 34.90 | D Loss: 0.78
Episode 50 | Reward: -1144.0 | Policy Loss: 34.77 | D Loss: 0.66
Episode 60 | Reward: -1292.6 | Policy Loss: 33.78 | D Loss: 0.60
Episode 70 | Reward: -1403.1 | Policy Loss: 38.53 | D Loss: 0.59
Episode 80 | Reward: -1741.3 | Policy Loss: 29.86 | D Loss: 0.45
Episode 90 | Reward: -2023.8 | Policy Loss: 45.42 | D Loss: 0.45
Episode 100 | Reward: -2192.2 | Policy Loss: 111.91 | D Loss: 0.36
在下一篇文章中,我们将探索 多目标强化学习(Multi-Objective RL),并实现基于 Pareto 前沿的优化算法!
注意事项
1.安装依赖:
pip install gymnasium torch numpy
2.专家数据生成:
- 使用预训练策略(如 SAC)在目标环境中生成专家轨迹并保存为 expert_data.npy
- 示例数据格式:
expert_data = {
'states': np.array([s0, s1, ..., sn]), # 状态序列
'actions': np.array([a0, a1, ..., an]) # 动作序列
}
3.完整训练需要 GPU 加速(推荐显存 ≥ 8GB)
相关推荐
- Sublime Text 4 稳定版 Build 4113 发布
-
IT之家7月18日消息知名编辑器SublimeText4近日发布了Build4113版本,是SublimeText4的第二个稳定版。IT之家了解到,SublimeTe...
- 【小白课程】openKylin便签贴的设计与实现
-
openKylin便签贴作为侧边栏的一个小插件,提供便捷的文本记录和灵活的页面展示。openKylin便签贴分为两个部分:便签列表...
- 壹啦罐罐 Android 手机里的 Xposed 都装了啥
-
这是少数派推出的系列专题,叫做「我的手机里都装了啥」。这个系列将邀请到不同的玩家,从他们各自的角度介绍手机中最爱的或是日常使用最频繁的App。文章将以「每周一篇」的频率更新,内容范围会包括iOS、...
- 电气自动化专业词汇中英文对照表(电气自动化专业英语单词)
-
专业词汇中英文对照表...
- Python界面设计Tkinter模块的核心组件
-
我们使用一个模块,我们要熟悉这个模块的主要元件。如我们设计一个窗口,我们可以用Tk()来完成创建;一些交互元素,按钮、标签、编辑框用到控件;怎么去布局你的界面,我们可以用到pack()、grid()...
- 以色列发现“死海古卷”新残片(死海古卷是真的吗)
-
编译|陈家琦据艺术新闻网(artnews.com)报道,3月16日,以色列考古学家发现了死海古卷(DeadSeaScrolls)新残片。新出土的羊皮纸残片中包括以希腊文书写的《十二先知书》段落,这...
- 鸿蒙Next仓颉语言开发实战教程:订单列表
-
大家上午好,最近不断有友友反馈仓颉语言和ArkTs很像,所以要注意不要混淆。今天要分享的是仓颉语言开发商城应用的订单列表页。首先来分析一下这个页面,它分为三大部分,分别是导航栏、订单类型和订单列表部分...
- 哪些模块可以用在 Xposed for Lollipop 上?Xposed 模块兼容性解答
-
虽然已经有了XposedforLollipop的安装教程,但由于其还处在alpha阶段,一些Xposed模块能不能依赖其正常工作还未可知。为了解决大家对于模块兼容性的疑惑,笔者尽可能多...
- 利用 Fluid 自制 Mac 版 Overcast 应用
-
我喜爱收听播客,健身、上/下班途中,工作中,甚至是忙着做家务时。大多数情况下我会用MarcoArment开发的Overcast(Freemium)在iPhone上收听,这是我目前最喜爱的Po...
- 浅色Al云食堂APP代码(三)(手机云食堂)
-
以下是进一步优化完善后的浅色AI云食堂APP完整代码,新增了数据可视化、用户反馈、智能推荐等功能,并优化了代码结构和性能。项目结构...
- 实战PyQt5: 121-使用QImage实现一个看图应用
-
QImage简介QImage类提供了独立于硬件的图像表示形式,该图像表示形式可以直接访问像素数据,并且可以用作绘制设备。QImage是QPaintDevice子类,因此可以使用QPainter直接在图...
- 滚动条隐藏及美化(滚动条隐藏但是可以滚动)
-
1、滚动条隐藏背景/场景:在移动端,滑动的时候,会显示默认滚动条,如图1://隐藏代码:/*隐藏滚轮*/.ul-scrool-box::-webkit-scrollbar,.ul-scrool...
- 浅色AI云食堂APP完整代码(二)(ai 食堂)
-
以下是整合后的浅色AI云食堂APP完整代码,包含后端核心功能、前端界面以及优化增强功能。项目采用Django框架开发,支持库存管理、订单处理、财务管理等核心功能,并包含库存预警、数据导出、权限管理等增...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)