切换视频源:

Prioritized Experience Replay (DQN) (Tensorflow)

作者: 莫烦 编辑: 莫烦 2017-03-07

学习资料:

要点

本篇教程是基于 Deep Q network (DQN) 的选学教程. 以下教程缩减了在 DQN 方面的介绍, 着重强调 DQN with Prioritized Replay 和 DQN 在代码上不同的地方. 所以还没了解 DQN 的同学们, 有关于 DQN 的知识, 请从 这个视频这个Python教程 开始学习.

这一次还是使用 MountainCar 来进行实验, 因为这次我们不需要重度改变他的 reward 了. 所以只要是没有拿到小旗子, reward=-1, 拿到小旗子时, 我们定义它获得了 +10 的 reward. 比起之前 DQN 中, 这个 reward 定义更加准确. 如果使用这种 reward 定义方式, 可以想象 Natural DQN 会花很久的时间学习, 因为记忆库中只有很少很少的 +10 reward 可以学习. 正负样本不一样. 而使用 Prioritized replay, 就会重视这种少量的, 但值得学习的样本.

Prioritized replay 算法

Prioritized Experience Replay (DQN) (Tensorflow)

这一套算法重点就在我们 batch 抽样的时候并不是随机抽样, 而是按照 Memory 中的样本优先级来抽. 所以这能更有效地找到我们需要学习的样本.

那么样本的优先级是怎么定的呢? 原来我们可以用到 TD-error, 也就是 Q现实 - Q估计 来规定优先学习的程度. 如果 TD-error 越大, 就代表我们的预测精度还有很多上升空间, 那么这个样本就越需要被学习, 也就是优先级 p 越高.

有了 TD-error 就有了优先级 p, 那我们如何有效地根据 p 来抽样呢? 如果每次抽样都需要针对 p 对所有样本排序, 这将会是一件非常消耗计算能力的事. 好在我们还有其他方法, 这种方法不会对得到的样本进行排序. 这就是这篇 paper 中提到的 SumTree.

SumTree 是一种树形结构, 每片树叶存储每个样本的优先级 p, 每个树枝节点只有两个分叉, 节点的值是两个分叉的合, 所以 SumTree 的顶端就是所有 p 的合. 正如下面图片(来自Jaromír Janisch), 最下面一层树叶存储样本的 p, 叶子上一层最左边的 13 = 3 + 10, 按这个规律相加, 顶层的 root 就是全部 p 的合了.

Prioritized Experience Replay (DQN) (Tensorflow)

</a>

抽样时, 我们会将 p 的总合 除以 batch size, 分成 batch size 那么多区间, (n=sum(p)/batch_size). 如果将所有 node 的 priority 加起来是42的话, 我们如果抽6个样本, 这时的区间拥有的 priority 可能是这样.

[0-7], [7-14], [14-21], [21-28], [28-35], [35-42]

然后在每个区间里随机选取一个数. 比如在第区间 [21-28] 里选到了24, 就按照这个 24 从最顶上的42开始向下搜索. 首先看到最顶上 42 下面有两个 child nodes, 拿着手中的24对比左边的 child 29, 如果 左边的 child 比自己手中的值大, 那我们就走左边这条路, 接着再对比 29 下面的左边那个点 13, 这时, 手中的 24 比 13 大, 那我们就走右边的路, 并且将手中的值根据 13 修改一下, 变成 24-13 = 11. 接着拿着 11 和 13 左下角的 12 比, 结果 12 比 11 大, 那我们就选 12 当做这次选到的 priority, 并且也选择 12 对应的数据.

SumTree 有效抽样

注意: 下面的代码和视频中有一点点不同, 下面的代码是根据评论中讨论的进行了修改, 多谢大家的评论.

首先要提的是, 这个 SumTree 的算法是对于 Jaromír Janisch 写的 Sumtree 的修改版. Jaromír Janisch 的代码在更新 sumtree 的时候和抽样的时候多次用到了 recursive 递归结构, 我使用的是 while 循环, 测试要比递归结构运行快. 在 class 中的功能也比它的代码少几个, 我优化了一下.

class SumTree(object):
    # 建立 tree 和 data,
    # 因为 SumTree 有特殊的数据结构,
    # 所以两者都能用一个一维 np.array 来存储
    def __init__(self, capacity):

    # 当有新 sample 时, 添加进 tree 和 data
    def add(self, p, data):

    # 当 sample 被 train, 有了新的 TD-error, 就在 tree 中更新
    def update(self, tree_idx, p):

    # 根据选取的 v 点抽取样本
    def get_leaf(self, v):

    # 获取 sum(priorities)
    @property
    def totoal_p(self):

具体的抽要和更新值的规则和上面说的类似. 具体的代码在这里呈现的话比较累赘, 详细代码请去往我的 Github对应的位置

Memory 类

这个 Memory 类也是基于 Jaromír Janisch 所写的 Memory 进行了修改和优化.

class Memory(object):
    # 建立 SumTree 和各种参数
    def __init__(self, capacity):

    # 存储数据, 更新 SumTree
    def store(self, transition):

    # 抽取 sample
    def sample(self, n):

    # train 完被抽取的 samples 后更新在 tree 中的 sample 的 priority
    def batch_update(self, tree_idx, abs_errors):

具体的代码在这里呈现的话比较累赘, 详细代码请去往我的 Github对应的位置 下面有很多朋友经常问的一个问题, 这个 ISweight 到底怎么算. 需要提到的一点是, 代码中的计算方法是经过了简化的, 将 paper 中的步骤合并了一些. 比如 prob = p / self.tree.total_p; ISWeights = np.power(prob/min_prob, -self.beta)

Prioritized Experience Replay (DQN) (Tensorflow)

下面是我的推导, 如果有不正确还请指出. 在paper 中, ISWeight = (N*Pj)^(-beta) / maxi_wi 里面的 maxi_wi 是为了 normalize ISWeight, 所以我们先把他放在一边. 所以单纯的 importance sampling 就是 (N*Pj)^(-beta), 那 maxi_wi = maxi[(N*Pi)^(-beta)].

如果将这两个式子合并,

ISWeight = (N*Pj)^(-beta) / maxi[ (N*Pi)^(-beta) ]

而且如果将 maxi[ (N*Pi)^(-beta) ] 中的 (-beta) 提出来, 这就变成了 mini[ (N*Pi) ] ^ (-beta)

看出来了吧, 有的东西可以抵消掉的. 最后

ISWeight = (Pj / mini[Pi])^(-beta)

这样我们就有了代码中的样子.

还有代码中的 alpha 是一个决定我们要使用多少 ISweight 的影响, 如果 alpha = 0, 我们就没使用到任何 Importance Sampling.

更新方法

基于之前的 DQN 代码, 我们做出以下修改. 我们将 class 的名字改成 DQNPrioritiedReplay, 为了对比 Natural DQN, 我们也保留原来大部分的 DQN 的代码. 我们在 __init__ 中加一个 prioritized 参数来表示 DQN 是否具备 prioritized 能力. 为了对比的需要, 我们的 tf.Session() 也单独传入. 并移除原本在 DQN 代码中的这一句:

self.sess.run(tf.global_variables_initializer())

class DQNPrioritiedReplay:
    def __init__(..., prioritized=True, sess=None)
        self.prioritized = prioritized
        ...
        if self.prioritized:
            self.memory = Memory(capacity=memory_size)
        else:
            self.memory = np.zeros((self.memory_size, n_features*2+2))

        if sess is None:
            self.sess = tf.Session()
            self.sess.run(tf.global_variables_initializer())
        else:
            self.sess = sess

Prioritized Experience Replay (DQN) (Tensorflow)

搭建神经网络时, 我们发现 DQN with Prioritized replay 只多了一个 ISWeights, 这个正是刚刚算法中提到的 Importance-Sampling Weights, 用来恢复被 Prioritized replay 打乱的抽样概率分布.

class DQNPrioritizedReplay:
    def _build_net(self)
        ...
        # self.prioritized 时 eval net 的 input 多加了一个 ISWeights
        self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s')  # input
        self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target')  # for calculating loss
        if self.prioritized:
            self.ISWeights = tf.placeholder(tf.float32, [None, 1], name='IS_weights')

        ...
        # 为了得到 abs 的 TD error 并用于修改这些 sample 的 priority, 我们修改如下
        with tf.variable_scope('loss'):
            if self.prioritized:
                self.abs_errors = tf.reduce_sum(tf.abs(self.q_target - self.q_eval), axis=1)    # for updating Sumtree
                self.loss = tf.reduce_mean(self.ISWeights * tf.squared_difference(self.q_target, self.q_eval))
            else:
                self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))

因为和 Natural DQN 使用的 Memory 不一样, 所以在存储 transition 的时候方式也略不相同.

class DQNPrioritizedReplay:
    def store_transition(self, s, a, r, s_):
        if self.prioritized:    # prioritized replay
            transition = np.hstack((s, [a, r], s_))
            self.memory.store(transition)
        else:       # random replay
            if not hasattr(self, 'memory_counter'):
                self.memory_counter = 0
            transition = np.hstack((s, [a, r], s_))
            index = self.memory_counter % self.memory_size
            self.memory[index, :] = transition
            self.memory_counter += 1

接下来是相对于 Natural DQN 代码, 我们在 learn() 改变的部分也在如下展示.

class DQNPrioritizedReplay:
    def learn(self):
        ...
        # 相对于 DQN 代码, 改变的部分
        if self.prioritized:
            tree_idx, batch_memory, ISWeights = self.memory.sample(self.batch_size)
        else:
            sample_index = np.random.choice(self.memory_size, size=self.batch_size)
            batch_memory = self.memory[sample_index, :]

        ...

        if self.prioritized:
            _, abs_errors, self.cost = self.sess.run([self._train_op, self.abs_errors, self.loss],
                                         feed_dict={self.s: batch_memory[:, :self.n_features],
                                                    self.q_target: q_target,
                                                    self.ISWeights: ISWeights})
            self.memory.batch_update(tree_idx, abs_errors)   # update priority
        else:
            _, self.cost = self.sess.run([self._train_op, self.loss],
                                         feed_dict={self.s: batch_memory[:, :self.n_features],
                                                    self.q_target: q_target})

        ...

对比结果

Prioritized Experience Replay (DQN) (Tensorflow)

运行我 Github 中的这个 MountainCar 脚本, 我们就不难发现, 我们都从两种方法最初拿到第一个 R=+10 奖励的时候算起, 看看经历过一次 R=+10 后, 他们有没有好好利用这次的奖励, 可以看出, 有 Prioritized replay 的可以高效的利用这些不常拿到的奖励, 并好好学习他们. 所以 Prioritized replay 会更快结束每个 episode, 很快就到达了小旗子.

分享到: Facebook 微博 微信 Twitter
如果你觉得这篇文章或视频对你的学习很有帮助, 请你也分享它, 让它能再次帮助到更多的需要学习的人. 莫烦没有正式的经济来源, 如果你也想支持 莫烦Python 并看到更好的教学内容, 赞助他一点点, 作为鼓励他继续开源的动力.

支持 让教学变得更优秀