PyTorch 強化學(xué)習(xí)(DQN)教程

2020-09-07 17:59 更新
原文: https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

作者  : Adam Paszke

翻譯校驗: Dynmi Wang

本教程介紹了如何使用 PyTorch 在 OpenAI Gym 上的 CartPole-v0 任務(wù)上訓(xùn)練深度 Q-learning(DQN)智能體。

任務(wù)

智能體必須在兩個動作之間做出決定-向左或向右移動小車來使其上的桿保持直立。 您可以在 Gym 網(wǎng)站上找到具有各種算法和可視化的官方排行榜。

cartpole 

當(dāng)智能體觀察環(huán)境的當(dāng)前狀態(tài)并選擇一個動作時,環(huán)境將轉(zhuǎn)換到新狀態(tài),并且還會返回表示該動作結(jié)果的獎勵。 在此任務(wù)中,每前進(jìn)一個時間步,獎勵為+1,并且如果桿子掉落得太遠(yuǎn)或小車離中心的距離超過 2.4 個單位,則對局終止。 這意味著性能更好的操作方案將持續(xù)更長的時間,從而積累更大的回報。

Cartpole任務(wù)的設(shè)計為智能點輸入代表環(huán)境狀態(tài)(位置、速度等)的4個實際值。 但是,神經(jīng)網(wǎng)絡(luò)可以完全通過查看場景來解決任務(wù),因此我們將以小車為中心的一部分屏幕作為輸入。 因此,我們的結(jié)果無法直接與官方排行榜上的結(jié)果進(jìn)行比較-我們的任務(wù)更加艱巨。 不幸的是,這確實減慢了訓(xùn)練速度,因為我們必須渲染所有幀。

嚴(yán)格地說,我們將以當(dāng)前幀和前一個幀之間的差異來呈現(xiàn)狀態(tài)。這將允許代理從一張圖像中考慮桿子的速度。

軟件包

首先,讓我們導(dǎo)入所需的軟件包。 首先,我們需要針對環(huán)境的體育館(使用 <cite>pip install Gym</cite> 進(jìn)行安裝)。 我們還將使用 PyTorch 中的以下內(nèi)容:

  • 神經(jīng)網(wǎng)絡(luò)(torch.nn)
  • 優(yōu)化(torch.optim)
  • 自動微分(torch.autograd)
  • 專門做視覺處理的工具(torchvision-單獨的軟件包)。
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


env = gym.make('CartPole-v0').unwrapped


## set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display


plt.ion()


## if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

經(jīng)驗回放

我們將使用經(jīng)驗回放內(nèi)存來訓(xùn)練我們的 DQN。 它存儲智能體觀察到的轉(zhuǎn)換,使我們以后可以重用此數(shù)據(jù)。 通過從中隨機抽樣,可以構(gòu)建成批的過渡。 實踐表明,這極大穩(wěn)定并改進(jìn)了 DQN 訓(xùn)練過程。

為此,我們需要兩個類:

  • Transition  -一個命名元組,表示我們環(huán)境中的單個轉(zhuǎn)換。 本質(zhì)上它是將(state,action)對映射到緊隨其后的(next_state,reward)結(jié)果,狀態(tài)是屏幕差異圖像,如下所述。
  • ReplayMemory -一個有界大小的循環(huán)緩沖區(qū),用于保存最近觀察到的轉(zhuǎn)換。 它還實現(xiàn)了.sample()方法,從經(jīng)驗庫中隨機選擇一批transitions,方便直接拿去訓(xùn)練智能體。
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):


    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0


    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity


    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)


    def __len__(self):
        return len(self.memory)

現(xiàn)在我們來定義自己的模型。 首先快速回顧一下DQN基礎(chǔ)知識。

DQN 算法

我們的環(huán)境是確定的,故為簡單起見,這里提出的所有方程也都是確定性的。 在強化學(xué)習(xí)文獻(xiàn)中,它們還將包含對環(huán)境中隨機轉(zhuǎn)換的期望。

我們的目標(biāo)是訓(xùn)練并得到一種試圖最大化帶衰減的累積獎勵 的策略,其中 也稱為收獲。 衰減率 應(yīng)該是 和 之間的常數(shù),以確??偤褪諗?。 它使不確定的遠(yuǎn)期回報對于我們的智能體而言不如對它可以相當(dāng)有信心的近期回報重要。

Q-learning背后的思想是,如果我們有一個函數(shù) ,它可以告訴我們我們的回報是什么,如果我們要在特定狀態(tài)下采取行動,那么我們可以輕松地構(gòu)建一個最大化收獲的策略:

但是,我們并不了解世界的一切,因此我們無法訪問 。 但是,由于神經(jīng)網(wǎng)絡(luò)是通用的函數(shù)逼近器,因此我們可以輕松創(chuàng)建一個并訓(xùn)練它為類似于 的函數(shù)。

對于我們的訓(xùn)練更新規(guī)則,我們將假設(shè)--某些策略的每個 函數(shù)都遵循 Bellman 方程:

等式兩邊之間的差異稱為時間差誤差 

為了盡量減小這個錯誤,我們將使用 Huber 損失。 當(dāng)誤差較小時,Huber 損失類似于均方誤差;而當(dāng)誤差較大時,表現(xiàn)為平均絕對誤差--這使得當(dāng) 的估計值非常嘈雜時,對異常值的魯棒性更強。 我們通過從經(jīng)驗回放中取樣的一批轉(zhuǎn)換 來計算:

Q-Network

我們的模型將是一個卷積神經(jīng)網(wǎng)絡(luò),該卷積神經(jīng)網(wǎng)絡(luò)將吸收當(dāng)前屏幕補丁與先前屏幕補丁之間的差異。 它有兩個輸出,分別表示 和 (其中 是網(wǎng)絡(luò)的輸入)。 實際上,網(wǎng)絡(luò)正嘗試預(yù)測在給定當(dāng)前輸入的情況下執(zhí)行每個action的預(yù)期收獲。

class DQN(nn.Module):


    def __init__(self, h, w, outputs):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
        self.bn3 = nn.BatchNorm2d(32)


        # Number of Linear input connections depends on output of conv2d layers
        # and therefore the input image size, so compute it.
        def conv2d_size_out(size, kernel_size = 5, stride = 2):
            return (size - (kernel_size - 1) - 1) // stride  + 1
        convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
        convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
        linear_input_size = convw * convh * 32
        self.head = nn.Linear(linear_input_size, outputs)


    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.head(x.view(x.size(0), -1))

獲取輸入

以下代碼是用于從環(huán)境中提取和處理渲染圖像的實用程序。 它使用torchvision包,可輕松組合圖像變換。 運行單元后,它將顯示它提取的示例幀。

resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),
                    T.ToTensor()])


def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART


def get_screen():
    # Returned screen requested by gym is 400x600x3, but is sometimes larger
    # such as 800x1200x3\. Transpose it into torch order (CHW).
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    # Cart is in the lower half, so strip off the top and bottom of the screen
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    # Strip off the edges, so that we have a square image centered on a cart
    screen = screen[:, :, slice_range]
    # Convert to float, rescale, convert to torch tensor
    # (this doesn't require a copy)
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    # Resize, and add a batch dimension (BCHW)
    return resize(screen).unsqueeze(0).to(device)


env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
           interpolation='none')
plt.title('Example extracted screen')
plt.show()

訓(xùn)練

超參數(shù)和配置

該單元實例化我們的模型及其優(yōu)化器,并定義超參數(shù):

  • select_action- 將根據(jù) epsilon-greedy策略選擇一個行為。 簡而言之,我們有時會使用我們的模型來選擇行為,有時我們只會對其中一個進(jìn)行統(tǒng)一采樣。 選擇隨機行為的概率將從EPS_START開始,并朝EPS_END呈指數(shù)衰減。 EPS_DECAY控制衰減率。
  • plot_durations- 一個幫助繪制迭代次數(shù)持續(xù)時間,以及過去100迭代次數(shù)的平均值(官方評估中使用的度量)。 迭代次數(shù)將在包含主訓(xùn)練循環(huán)的單元下方,并在每次迭代之后更新。
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10


## 獲取屏幕大小,以便我們可以根據(jù)從ai-gym返回的形狀正確初始化層。
## 這一點上的平常尺寸接近3x40x90,這是在get_screen()中抑制和縮小的渲染緩沖區(qū)的結(jié)果。
init_screen = get_screen()
_, _, screen_height, screen_width = init_screen.shape


## Get number of actions from gym action space
n_actions = env.action_space.n


policy_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net = DQN(screen_height, screen_width, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()


optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1\. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())


    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())

訓(xùn)練循環(huán)

最后,訓(xùn)練我們的模型,其實就是優(yōu)化我們的智能體。

在這里,您可以找到執(zhí)行優(yōu)化步驟的optimize_model函數(shù)。 它首先對一批進(jìn)行采樣,將所有張量連接成一個張量,計算 和 ,然后將它們組合成我們的損失。 根據(jù)定義,如果 為對局結(jié)束狀態(tài),則設(shè)置 。 我們還使用目標(biāo)網(wǎng)絡(luò)來計算 ,以提高穩(wěn)定性。 目標(biāo)網(wǎng)絡(luò)的權(quán)重大部分時間保持不變,但每隔一段時間就會更新一次價值網(wǎng)絡(luò)權(quán)重。這是一組固定的步驟,但為了簡單起見,我們將使用迭代次數(shù)。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # 轉(zhuǎn)置批樣本(有關(guān)詳細(xì)說明,請參閱https://stackoverflow.com/a/19343/3343043)。
    # 這會將轉(zhuǎn)換的批處理數(shù)組轉(zhuǎn)換為批處理數(shù)組的轉(zhuǎn)換。
    batch = Transition(*zip(*transitions))


    # 計算非最終狀態(tài)的掩碼并連接批處理元素(最終狀態(tài)將是模擬結(jié)束后的狀態(tài))
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)


    # 計算Q(s_t, a)-模型計算 Q(s_t),然后選擇所采取行動的列。
    # 這些是根據(jù)策略網(wǎng)絡(luò)對每個批處理狀態(tài)所采取的操作。
    state_action_values = policy_net(state_batch).gather(1, action_batch)


    # 計算下一個狀態(tài)的V(s_{t+1})。
    # 非最終狀態(tài)下一個狀態(tài)的預(yù)期操作值是基于“舊”目標(biāo)網(wǎng)絡(luò)計算的;選擇max(1)[0]的最佳獎勵。
    # 這是基于掩碼合并的,這樣當(dāng)狀態(tài)為最終狀態(tài)時,我們將獲得預(yù)期狀態(tài)值或0。


    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # 計算期望Q值
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch


    # 計算Huber損失
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))


    # 優(yōu)化模型
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

在下面,您可以找到主要的訓(xùn)練循環(huán)。 首先,我們重置環(huán)境并初始化state Tensor。 然后,我們采樣一個動作,執(zhí)行它,觀察下一個屏幕和獎勵(總是 1),并一次優(yōu)化我們的模型。 當(dāng)情節(jié)結(jié)束(我們的模型失?。r,我們重新開始循環(huán)。

下面,將 <cite>num_episodes</cite> 設(shè)置得較小。 您應(yīng)該下載筆記本并運行更多的片段,例如 300 多個片段,以實現(xiàn)有意義的持續(xù)時間改進(jìn)。

num_episodes = 50
for i_episode in range(num_episodes):
    # Initialize the environment and state
    env.reset()
    last_screen = get_screen()
    current_screen = get_screen()
    state = current_screen - last_screen
    for t in count():
        # Select and perform an action
        action = select_action(state)
        _, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)


        # Observe new state
        last_screen = current_screen
        current_screen = get_screen()
        if not done:
            next_state = current_screen - last_screen
        else:
            next_state = None


        # Store the transition in memory
        memory.push(state, action, next_state, reward)


        # Move to the next state
        state = next_state


        # Perform one step of the optimization (on the target network)
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())


print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()

下面這張圖是對整個DQN算法的總結(jié):

../_images/reinforcement_learning_diagram.jpg

行為可以是隨機選擇的,也可以是基于一個策略,從Gym環(huán)境中獲取下一步的樣本。我們將結(jié)果記錄在經(jīng)驗回放庫中,并在每次迭代中運行優(yōu)化步驟。優(yōu)化從經(jīng)驗回放中隨機抽取一批來訓(xùn)練新策略。 “舊的”target_net也用于優(yōu)化計算預(yù)期的Q值;它偶爾會更新以保持其最新。

腳本的總運行時間:(0 分鐘 0.000 秒)

Download Python source code: reinforcement_q_learning.py Download Jupyter notebook: reinforcement_q_learning.ipynb

由獅身人面像畫廊生成的畫廊



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號