当前位置: 首页 > news >正文

手写PPO_clip(FrozenLake环境)

参考:白话PPO训练

成功截图

算法组件

四大部分

同A2C相比,PPO算法额外引入了一个old_actor_model.

在PPO的训练中,首先使用old_actor_model与环境进行交互得到经验,然后利用一批经验优化actor_model,最后再将actor_model的参数复制回old_actor_model

超参数

同A2C相比,PPO_clip多了两个参数: 单批数据更新次数和截断阈值

  • times_per_update:在收集到的一批数据上,进行多少次梯度更新。
  • clip_param(ε):PPO裁剪目标函数中的阈值,通常取 0.1 或 0.2

训练过程

整体训练框架同A2C, 差别在于使用old_policy采集经验,然后优化new_policy,最后复制回old_policy.

PPO为了高效利用经验数据,在一批经验上进行多次数据更新。

目标函数

1. critic的目标函数同A2C

2. actor的目标函数为PPO_clip

完整代码

import torch import torch.nn as nn from torch.nn import functional as F import gymnasium as gym import tqdm from torch.distributions import Categorical from typing import Tuple import copy class PolicyNetwork(nn.Module): def __init__(self, n_observations: int, n_actions: int): super(PolicyNetwork, self).__init__() self.layer1 = nn.Linear(n_observations, 32) self.layer2 = nn.Linear(32, 16) self.layer3 = nn.Linear(16, n_actions) def forward(self, x: torch.Tensor) -> Categorical: x = F.relu(self.layer1(x)) x = F.relu(self.layer2(x)) action_logits = self.layer3(x) return Categorical(logits=action_logits) class PPO_clip: def __init__(self, env, total_episodes): #############超参数############# self.actor_lr = 0.01 self.critic_lr = 0.01 self.batch_size = 64 self.times_per_update = 5 # 多次更新参数 self.clip_param = 0.2 # 比率截断参数,一般取0.2或0.1 self.entropy_coeff = 0.01 self.value_loss_coeff = 0.5 self.gae_lambda = 0.95 self.discount_rate = 0.9 self.total_episodes = total_episodes #############PPO_clip的核心要件############# self.replay_buffer = [] self.actor_model = PolicyNetwork(16, 4) self.old_actor_model = copy.deepcopy(self.actor_model) self.critic_model = nn.Sequential( # 不需要像 actor model那么复杂 nn.Linear(16, 16), nn.ReLU(), nn.Linear(16, 1) ) ############优化组件############# self.actor_optimizer = torch.optim.Adam(self.actor_model.parameters(), lr=self.actor_lr) self.critic_optimizer = torch.optim.Adam(self.critic_model.parameters(), lr=self.critic_lr) self.env = env self.count = 0 self.success = 0 def train(self): bar = tqdm.tqdm(range(self.total_episodes), desc=f"episode {0} {self.success / (self.count+1e-8)}") for i in bar: state, info = self.env.reset() done = False truncated = False # 收集经验 old_policy (fixed) while not done or truncated: action = self.choose_action(state) new_state, r, done, truncated, info = self.env.step(action) self.append_data(state, action, r, new_state, done) state = new_state if done or truncated: self.count+=1 if new_state == 15: self.success+=1 # 优化模型 new_policy (updated) if len(self.replay_buffer) == self.batch_size: self.optimize_model() self.replay_buffer.clear() # 复制new_policy到old_policy self.old_actor_model.load_state_dict(self.actor_model.state_dict()) if i % 100 == 0: self.count = 0 self.success = 0 bar.set_description(f"episode {i} {self.success / (self.count+1e-8)}") def choose_action(self, state): with torch.no_grad(): policy_dist = self.old_actor_model(self.state_to_input(state)) action_tensor = policy_dist.sample() action = action_tensor.item() return action def optimize_model(self): state = torch.stack([self.state_to_input(tup[0]) for tup in self.replay_buffer[-self.batch_size:]]) action = torch.IntTensor([tup[1] for tup in self.replay_buffer[-self.batch_size:]]) reward = torch.FloatTensor([tup[2] for tup in self.replay_buffer[-self.batch_size:]]) new_state = torch.stack([self.state_to_input(tup[3]) for tup in self.replay_buffer[-self.batch_size:]]) done = torch.FloatTensor([tup[4] for tup in self.replay_buffer[-self.batch_size:]]) # 以上state和new_state是二维的, 其他是一维的,即batch维 with torch.no_grad(): value = self.critic_model(state).squeeze() next_value = self.critic_model(new_state).squeeze() # 相比一次TD误差, GAE效果显著之好 advantages, returns_to_go = self.compute_gae_and_returns( reward, value, next_value, done, self.discount_rate, self.gae_lambda ) # 一份batch上的数据多次更新 for _ in range(self.times_per_update): # 更新actor policy_dist = self.actor_model(state) old_policy_dist = self.old_actor_model(state) new_log_prob = policy_dist.log_prob(action) old_log_prob = old_policy_dist.log_prob(action).detach() # old 不要梯度 r = torch.exp(new_log_prob - old_log_prob) # 计算比率用exp(ln(a)-ln(b)) 就是 a/b new_div_old_rate = r actor_fn = -(torch.min(new_div_old_rate*advantages, torch.clamp(new_div_old_rate, 1-self.clip_param, 1+self.clip_param)*advantages) + self.entropy_coeff * policy_dist.entropy()) self.actor_optimizer.zero_grad() actor_fn.mean().backward(retain_graph=True) # .mean() torch要求梯度得标量函数 self.actor_optimizer.step() # 更新critic v = self.critic_model(state).squeeze() critic_fn = F.mse_loss(v, returns_to_go) self.critic_optimizer.zero_grad() (self.value_loss_coeff * critic_fn).backward() self.critic_optimizer.step() def compute_gae_and_returns(self, rewards: torch.Tensor, values: torch.Tensor, next_values: torch.Tensor, dones: torch.Tensor, discount_rate: float, lambda_gae: float, ) -> Tuple[torch.Tensor, torch.Tensor]: advantages = torch.zeros_like(rewards) last_advantage = 0.0 n_steps = len(rewards) # 计算GAE for t in reversed(range(n_steps)): mask = 1.0 - dones[t] delta = rewards[t] + discount_rate * next_values[t] * mask - values[t] advantages[t] = delta + discount_rate * lambda_gae * last_advantage * mask # buffer中数据是按时间顺序排列,这里 delta和 advantage的计算都用mask 保证了done时间步出现在buffer中的任意位置都是可以的 (done-1 done 1 2) last_advantage = advantages[t] # 返回给critic作为TD目标 returns_to_go = advantages + values return advantages, returns_to_go def append_data(self, state, action, r, new_state, done): self.replay_buffer.append((state, action, r, new_state, done)) def state_to_input(self, state): input_dim = 16 input = torch.zeros(input_dim, dtype=torch.float) input[int(state)] = 1 return input env = gym.make("FrozenLake-v1", is_slippery=False) policy = PPO_clip(env, 2000) policy.train() env = gym.make("FrozenLake-v1", is_slippery=False, render_mode="human") state, info = env.reset() done = False truncated = False while True: with torch.no_grad(): action=policy.choose_action(state) new_state, reward, done, truncated, info = env.step(action) state=new_state if done or truncated: state, info = env.reset()
http://www.rkmt.cn/news/1445649.html

相关文章:

  • TransmonCross Hamiltonian to Geometry常见问题解答:解决用户最关心的10个技术难题
  • 2026年毕业论文降AI必备教程:5款免费工具盘点与3招人工修改技巧 - 降AI实验室
  • 食刻外卖全栈开源包:含用户小程序、商户后台、骑手APP及管理端完整源码
  • 3分钟完成foobar2000界面美化:从默认皮肤到专业音乐中心的完整指南
  • ESP8266-12F引脚功能详解与避坑指南:GPIO、ADC、UART到底怎么用才不烧芯片?
  • 圣彼得堡艺术科技融合实践:三层框架与交互装置设计
  • UE5 GAS实战:别再直接改HP了!用Meta Attributes和Set by Caller做个靠谱的RPG伤害系统
  • 如何永久备份微信聊天记录:WeChatMsg本地数据守护完整指南
  • HsMod深度解析:基于BepInEx的55+项炉石传说高级功能增强方案
  • 从 Visual Studio Copilot 的请求内容学习其实现原理
  • CogAgent-vqa-hf技术原理解析:从1120x1120超高清图像输入到精准答案输出
  • 未来已来:DeepSeek-V4-Pro-NVFP4在科学计算与代码生成领域的突破性应用
  • 定理证明器在干细胞生物学中的应用:形式化方法解析细胞命运
  • OptiScaler:打破显卡限制,全平台超分辨率画质增强方案探索
  • 保姆级教程:用联想官方Recovery Creator制作Win10/11恢复U盘,彻底告别系统崩溃
  • 告别电脑串口助手:用STM32F407的USB Host直连4G模块(广和通MC665)收发AT指令
  • 哪家佛山全屋定制品牌专业?2026年6月推荐TOP10案例评测对比适用场景 - 品牌推荐
  • Ultimate Vocal Remover GUI 5.6:专业人声分离软件的完整安装指南
  • 腾讯混元IFMTBench评测集:如何评估翻译模型的指令遵循能力
  • 风景图识别训练资源包:MobileNet模型权重+训练日志+标注数据集(含山海林城草五类)
  • 免费超越GPT-4?DeepSeek-Coder-V2开源代码模型终极指南
  • 2026年6月佛山全屋定制品牌推荐:十大榜单专业评测防风格踩雷价格 - 品牌推荐
  • 2025-2026年临沂耐易达铝塑制品有限公司电话查询:选择铝塑板供应商需注意核实资质 - 品牌推荐
  • 别再盲目采样了!STM32 FOC控制中,三电阻分扇区采样避坑实战(附代码)
  • 2025-2026年上海光华专利事务所电话查询:选择知识产权服务前需关注机构资质与专业背景 - 品牌推荐
  • 从五个维度重新定义人工智能:超越技术标签的功能性评估框架
  • Hermes WebUI可观察性指南:Phase G架构改进的10个关键优势 [特殊字符]
  • 从DoWhy到PyWhy:因果推断库的模块化重构与生态演进
  • 从波形图到SDC命令:用Python+Tcl脚本可视化理解set_multicycle_path
  • 智能家居自动化实战:从核心架构到高阶场景设计