PyTorch 分布式數(shù)據(jù)并行入門

2020-09-10 10:30 更新
原文: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

作者:申力

DistributedDataParallel (DDP)在模塊級別實(shí)現(xiàn)數(shù)據(jù)并行性。 它使用 Torch.distributed 程序包中的通信集合來同步梯度,參數(shù)和緩沖區(qū)。 并行性在流程內(nèi)和跨流程均可用。 在一個(gè)過程中,DDP 將輸入模塊復(fù)制到device_ids中指定的設(shè)備,將輸入沿批次維度分散,然后將輸出收集到output_device,這與 DataParallel 相似。 在整個(gè)過程中,DDP 在正向傳遞中插入必要的參數(shù)同步,在反向傳遞中插入梯度同步。 用戶可以將進(jìn)程映射到可用資源,只要進(jìn)程不共享 GPU 設(shè)備即可。 推薦的方法(通常是最快的方法)是為每個(gè)模塊副本創(chuàng)建一個(gè)過程,即在一個(gè)過程中不進(jìn)行任何模塊復(fù)制。 本教程中的代碼在 8-GPU 服務(wù)器上運(yùn)行,但可以輕松地推廣到其他環(huán)境。

DataParallelDistributedDataParallel之間的比較

在深入探討之前,讓我們澄清一下為什么盡管增加了復(fù)雜性,但還是考慮使用DistributedDataParallel而不是DataParallel

  • 首先,請回顧先前的教程,如果模型太大而無法容納在單個(gè) GPU 上,則必須使用模型并行將其拆分到多個(gè) GPU 中。 DistributedDataParallel與模型并行一起使用; DataParallel目前沒有。
  • DataParallel是單進(jìn)程,多線程,并且只能在單臺機(jī)器上運(yùn)行,而DistributedDataParallel是多進(jìn)程,并且適用于單機(jī)和多機(jī)訓(xùn)練。 因此,即使在單機(jī)訓(xùn)練中,數(shù)據(jù)足夠小以適合單機(jī),DistributedDataParallel仍比DataParallel快。 DistributedDataParallel還預(yù)先復(fù)制模型,而不是在每次迭代時(shí)復(fù)制模型,并避免了全局解釋器鎖定。
  • 如果您的兩個(gè)數(shù)據(jù)都太大而無法容納在一臺計(jì)算機(jī)和上,而您的模型又太大了以至于無法安裝在單個(gè) GPU 上,則可以將模型并行(跨多個(gè) GPU 拆分單個(gè)模型)與DistributedDataParallel結(jié)合使用。 在這種情況下,每個(gè)DistributedDataParallel進(jìn)程都可以并行使用模型,而所有進(jìn)程都將并行使用數(shù)據(jù)。

基本用例

要?jiǎng)?chuàng)建 DDP 模塊,請首先正確設(shè)置過程組。 更多細(xì)節(jié)可以在用 PyTorch 編寫分布式應(yīng)用程序中找到。

import os
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp


from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'


    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)


    # Explicitly setting seed to make sure that models created in two processes
    # start from same random weights and biases.
    torch.manual_seed(42)


def cleanup():
    dist.destroy_process_group()

現(xiàn)在,讓我們創(chuàng)建一個(gè)玩具模塊,將其與 DDP 封裝在一起,并提供一些虛擬輸入數(shù)據(jù)。 請注意,由于DDP將0級進(jìn)程中的模型狀態(tài)廣播到DDP構(gòu)造函數(shù)中的所有其他進(jìn)程,因此無需擔(dān)心不同的DDP進(jìn)程從不同的模型參數(shù)初始值開始。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)


    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    setup(rank, world_size)


    # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and
    # rank 2 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank * n, (rank + 1) * n))


    # create model and move it to device_ids[0]
    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)


    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)


    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn(outputs, labels).backward()
    optimizer.step()


    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

如您所見,DDP 包裝了較低級別的分布式通信詳細(xì)信息,并提供了干凈的 API,就好像它是本地模型一樣。 對于基本用例,DDP 僅需要幾個(gè) LoC 來設(shè)置流程組。 在將 DDP 應(yīng)用到更高級的用例時(shí),需要注意一些警告。

偏斜的處理速度

在 DDP 中,構(gòu)造函數(shù),轉(zhuǎn)發(fā)方法和輸出的微分是分布式同步點(diǎn)。 期望不同的過程以相同的順序到達(dá)同步點(diǎn),并在大致相同的時(shí)間進(jìn)入每個(gè)同步點(diǎn)。 否則,快速流程可能會提早到達(dá),并在等待流浪者時(shí)超時(shí)。 因此,用戶負(fù)責(zé)平衡流程之間的工作負(fù)載分配。 有時(shí),由于例如網(wǎng)絡(luò)延遲,資源爭用,不可預(yù)測的工作量峰值,不可避免地會出現(xiàn)偏斜的處理速度。 為了避免在這些情況下超時(shí),請?jiān)谡{(diào)用 init_process_group 時(shí)傳遞足夠大的timeout值。

保存和加載檢查點(diǎn)

在訓(xùn)練過程中通常使用torch.savetorch.load來檢查點(diǎn)模塊并從檢查點(diǎn)中恢復(fù)。 有關(guān)更多詳細(xì)信息,請參見保存和加載模型。 使用 DDP 時(shí),一種優(yōu)化方法是僅在一個(gè)進(jìn)程中保存模型,然后將其加載到所有進(jìn)程中,從而減少寫開銷。 這是正確的,因?yàn)樗羞^程都從相同的參數(shù)開始,并且梯度在向后傳遞中同步,因此優(yōu)化程序應(yīng)將參數(shù)設(shè)置為相同的值。 如果使用此優(yōu)化,請確保在保存完成之前不要啟動(dòng)所有進(jìn)程。 此外,在加載模塊時(shí),您需要提供適當(dāng)?shù)?code>map_location參數(shù),以防止進(jìn)程進(jìn)入其他設(shè)備。 如果缺少map_location,則torch.load將首先將該模塊加載到 CPU,然后將每個(gè)參數(shù)復(fù)制到其保存位置,這將導(dǎo)致同一臺機(jī)器上的所有進(jìn)程使用同一組設(shè)備。

def demo_checkpoint(rank, world_size):
    setup(rank, world_size)


    # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and
    # rank 2 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank * n, (rank + 1) * n))


    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)


    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)


    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    rank0_devices = [x - rank * len(device_ids) for x in device_ids]
    device_pairs = zip(rank0_devices, device_ids)
    map_location = {'cuda:%d' % x: 'cuda:%d' % y for x, y in device_pairs}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))


    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()


    # Use a barrier() to make sure that all processes have finished reading the
    # checkpoint
    dist.barrier()


    if rank == 0:
        os.remove(CHECKPOINT_PATH)


    cleanup()

將 DDP 與模型并行性結(jié)合

DDP 還可以與多 GPU 模型一起使用,但是不支持進(jìn)程內(nèi)的復(fù)制。 您需要為每個(gè)模塊副本創(chuàng)建一個(gè)進(jìn)程,與每個(gè)進(jìn)程的多個(gè)副本相比,通??梢蕴岣咝阅?。 當(dāng)訓(xùn)練具有大量數(shù)據(jù)的大型模型時(shí),DDP 包裝多 GPU 模型特別有用。 使用此功能時(shí),需要小心地實(shí)現(xiàn)多 GPU 模型,以避免使用硬編碼的設(shè)備,因?yàn)闀⒉煌哪P透北痉胖玫讲煌脑O(shè)備上。

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)


    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

將多 GPU 模型傳遞給 DDP 時(shí),不得設(shè)置device_idsoutput_device。 輸入和輸出數(shù)據(jù)將通過應(yīng)用程序或模型forward()方法放置在適當(dāng)?shù)脑O(shè)備中。

def demo_model_parallel(rank, world_size):
    setup(rank, world_size)


    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)


    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)


    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()


    cleanup()


if __name__ == "__main__":
    run_demo(demo_basic, 2)
    run_demo(demo_checkpoint, 2)


    if torch.cuda.device_count() >= 8:
        run_demo(demo_model_parallel, 4)


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號