高性能的ReplayBuffer应该满足以下三点:
随机采样 random sample 的速度要快,尽可能加快读取速度(最为重要)
减少保存的数据量,增加吞吐效率(对分布式而言重要)
保存能简化计算的变量(对特定算法而言重要)
为了达成以上要求,我建议做出以下修改:
把Replay Buffer 的数据都放在连续的内存里,加快读取速度
按trajectory的顺序保存 env transition,避免重复保存next state,减少数据量
分开保存 state 与其他数据,减少数据量
将off-policy 的数据一直保存在显存内
保存 mask = gamma if done else 0 用于计算Q值,而不是保存 done
为on-policy 的PPO 算法保存 noise 用于计算新旧策略的熵
本片文章的目录就是的13点注意事项,以及16点解决方案
本文的重点:ReplayBuffer的数据要放在连续内存里,实验结果如下:

我们使用Numpy库在内存里、使用PyTorch库在显存里 创建了一整块连续的空间,对比了List 和 Tuple 的方案。结果:连续存储空间的明显更节省时间。因此,DRL库的ReplayBuffer 有必要围绕着 连续内存空间 来设计。
代码见 github Yonv1943 ReplayBufferComparison.py ,是一份基于Python Numpy 的实现,代码在保证高性能的同时做到了优雅。我根据这一页的结论还写出来一个DRL库「小雅:ElegantRL」
更新日志
- 2020-06-17 初版 匹配了文字与图片,修改语法错误
- 2020-12-10 第二版 在@UncleDrew 的提醒下修复了失效链接
- 原标题 DRL的经验回放(Experiment Replay Buffer ) 用Numpy实现让它更快一点,改正为Experience Replay
- 2021-03-24 修改标题 DRL的经验回放(Experience Replay Buffer)的三点高性能修改建议
- 新增 3. 将state 与其他数据分开保存
0. 简单介绍「经验回放」
简单介绍「经验回放」(Experimen Replay):当DQN(Deep Q Network)[1]使用一个神经网络用于拟合一个值函数时,Q-learning完成了从离散状态空间到连续状态空间的跨越。为了更好地使用这个叫Q Network的神经网络,我们需要用符合独立同分布(independent and identically distributed,i.i.d.)的数据分批次地训练它。

而DQN使用了1997年就存在的 Experience Replay [2]技术:梯度下降要求用于批次训练的数据符合 i.i.d.,然而与环境交互后产生的数据不能直接用于训练 。因此,我们先把贝尔曼公式(Bellman Equation)需要的数据保存起来,当缓存中的数据足够多时,随机抽样得到的数据就能接近i.i.d.。
而高性能的ReplayBuffer应该满足以下三点:
1. 随机采样 random sample 的速度要快,尽可能加快读取速度(重要)
需要 ReplayBuffer 完成的任务有两个,会降低读写速度的方案不应该采用:
+写入:actor与环境交互,得到 environment transition元组 (state, action, reward, done, next state),或者得到一整条trajectory 写入其中
- 读取:使用ReplayBuffer内部的数据训练网络的时候,需要从中 random sample 出许多批次的数据用于随机梯度下降(Stochastic Gradient Descent)
2. 减少保存的数据量,增加吞吐效率(对分布式而言重要)
深度学习(Deep Learning)中的有监督训练,训练数据一开始就存放在数据集中,因此可以进行数据预处理,预加载等操作。除去offline RL,大部分强化学习任务做不到:数据需要智能体在环境中探索,一点点存入ReplayBuffer中。如果是 同策略(on-policy),那么在探索前还需要清空数据,再重新收集。

因此,ReplayBuffer需要减少需要保存的数据量,增加吞吐效率。
- 这对多进程、分布式的通信很重要,直接影响通信效率。
- 网络训练时,数据一定会从产生地(在内存中的env)传输到使用地(在显存中的优化器 optimizer),数据量越小,吞吐效率越高。
3. 保存能简化计算的变量(对特定算法而言重要)
这是编程常用技巧,有一点动态规划Dynamic Programming 的思想:已经算好的中间变量如果以后有用,就试着保存下来,简化后面的计算。例如:不要保存一个done 这个bool,而是保存为一个mask float,简化TD-errors的计算:
state, action, reward, done, next_state, gamma
mask = 0.0 if done else gamma
bellman equation
原本的: y = r + (1-done) * gamma * next_q_value
改变为: y = r + mask * next_q_value
有一些代码保存了 undone,明显就是有“简化计算”的意识,却没有贯彻到底
为了达成以上要求,我提出的修改方案主要有以下6点:
1. 把Replay Buffer 的数据都放在连续的内存里
从对比实验得出:把数据都放在连续的内存/显存里是最快的。
实验结果如下,可以看到使用了 Numpy将数据放在连续内存中的方法最快,用了13秒(比其他方法快了 50%),而不管在何时传入GPU,对总用时的影响都很小:实验结果如下,可以看到使用了 Numpy将数据放在连续内存中的方法最快,用了13秒(比其他方法快了 50%),而不管在何时传入GPU,对总用时的影响都很小:

Method Used Times (second) Detail
List 24 list()
NamedTuple 20 collections.namedtuple
Array(CPU) 13 numpy.array/torch.tensor(CPU)
Array(GPU) 13 torch.tensor (GPU)
2. 按trajectory的顺序保存 env transition,避免重复保存next state
一般我们会说 env transition 是一个元组 (state, action, reward, next_state) ,我们保存了这些状态转移元组,来描述环境与策略。如果我们按 trajectory的顺序保存trainsition,那么不必重复保存 next_state。因为它可以在下一行找到。
- 优点:如果state 的数据量是action 的2倍以上,那么数据量将降低至 60% ~ 50%。
- 缺点:random sample 的时候,需要使用random id 寻址两次而不是一次
ReplayBuffer (in order)
state1, reward1, mask1, action1, state2, ...
state2, reward2, mask2, action2, state3, ...
...
数据量降低带来的提升非常明显,还收顺便节省显存、内存。而寻址两次并不慢,可以承受。值得一提的是,按顺序保存,给分布式下的ReplayBuffer提出了更高的要求,不同workers 收集到的 env transition 在汇集的时候,一定要以trajectory 为单位。对于无终止状态的环境,无法以 trajectory为单位。此时我们需要给每一个 workers 对应配置一个ReplayBuffer,而不是直接把它们都存到同一个内。
class ReplayBuffer:
class ReplayBufferMP:
def __init__(...):
...
self.buffers = [ReplayBuffer(...) for _ in range(rollout_num)]
...
3. 分开保存 state 与其他数据,减少数据量
在以图像作为state 的任务中(Atari Game),很有必要分开保存state 与其他数据。图片的格式是 uint8 (0255),而其他数据是 float32。结合上面第二点:“不重复保存 next state”,ReplayBuffer 的数据量将降低至 30% 12.5% 。分开保存之后,如果state 是图片,它也可以直接保存成 (n, height, width, channel) 的格式,减少 flatten 的麻烦。
byte: = # 一个字节
uint8: =
float16: ==
float32: ====
此外,结合上面第二点“不重复保存 next state”,我们可以直接在state 上寻址两次。来获取state 以及 next state。
buf_state.shape == (sum_of_trajectory_len, state_dim) 是按trajectory顺序保存的 state
self.buf_state[indices],
self.buf_state[indices + 1]
此外,随机抽样时需要产生 indices。如果允许在同一批次中产生重复的抽样,那么算法速度更快。很意外的是:重复抽样竟然效果更好。(我猜测这是因为:这种方式丰富了 batch 数据的多样性,因此更不容易过拟合)
# indices = rd.choice(self.memo_len, batch_size, replace=False) # why perform worse?
# indices = rd.choice(self.memo_len, batch_size, replace=True) # why perform better?
# same as:
indices = rd.randint(self.now_len, size=batch_size)
buffer[indices]
4. 将off-policy 的数据一直保存在显存内
异策略 off-policy:可以使用与“被更新的策略”相异的策略收集到的ReplayBuffer数据用于更新的算法。同策略 on-policy 则不能。因此异策略 的ReplayBuffer中,有很多数据在达到最大容量前能被保留。因此有必要将 off-policy 的数据一直保存在显存内,减少数据吞吐量。
5. 保存mask = gamma if done else 0 用于计算Q值,而不是保存 done
不要保存一个done 这个bool,而是保存为一个mask float,简化TD-errors的计算:
state, action, reward, done, next_state, gamma
mask = 0.0 if done else gamma
bellman equation
原本的: y = r + (1-done) * gamma * next_q_value
改变为: y = r + mask * next_q_value
有一些代码保存了 undone,明显就是有“简化计算”的意识,却没有贯彻到底
6. 为on-policy 的PPO 算法保存 noise 用于计算新旧策略的熵
在随机策略中,动作由高斯噪声产生。PPO为了计算旧策略的熵需要知道noise的值,PPO算法计算新旧策略的熵 logprob 的代码如下:
# in AgentZoo.py AgentPPO.update_net()
action = action_avg + action_std * noise
noise = (action_avg - action)/(action_std)
a_std_log = np.log(action_std)
sqrt_2pi_log = np.log(np.sqrt(2*pi))
logprob = -(noise.pow(2).__mul__(0.5) + a_std_log + sqrt_2pi_log).sum(1)
# same as below
logprob = -((action_avg - action) ** 2 /(2 * (action_std ** 2)) + a_std_log + sqrt_2pi_log).sum(1)
1. 附录:用于对比的其他ReplayBuffer方案
- 使用List:维护一个长度可变的List,到达最大容量时,删除最早加入的记忆,这个方案最简单。此列表的每个元素为这样的一个元组(state, action, reward, mask, next_state)。但是随机抽样的速度较慢,将位于不同内存空间的元素组成批次数据时较慢。
self.buffer.append(memory_tuple)
del self.buffer[:del_len]
- 使用NamedTuple:具体实现与List相同。维护一个长度可变的NamedTuple。(其实这也是一个List,只是使用了Python内建的 NamedTuple为 state, action, reward, mask, next_state 命名)。代码十分简洁优雅。但是随机抽样的速度较慢。PyTorch 官网上的RL入门教程就使用了这种方法。
from collections import namedtuple
self.transition = namedtuple(
'Transition', ('reward', 'mask', 'state', 'action', 'next_state',)
)
self.buffer.append(self.transition(*args))
'''convert tuple into array'''
arrays = self.transition(*zip(*[self.buffer[i] for i in indices]))
- 使用Numpy.ndarray:直接划出一整块连续的内存空间。设置更新的指针。到达最大容量时,依照指针让新的记忆覆盖旧的记忆。抽样时使用Numpy自带的方法。速度较快,且代码优雅。
self.buffer = np.empty((memo_max_len, memo_dim), dtype=np.float32)
self.buffer[self.next_idx] = np.hstack(memo_tuple)
tensor = torch.tensor(buffer, device=device)
'''convert array into torch.tensor'''
tensors = (
tensor[:, 0:1], # rewards
tensor[:, 1:2], # masks, mark == (1-float(done)) * gamma
tensor[:, 2: ], # states
tensor[:, : ], # actions
tensor[:, : ], # next_states
)