百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Pytorch - 手写Allreduce分布式训练

ztj100 2025-07-20 00:01 7 浏览 0 评论

1 介绍

近些年随着深度学习的火爆,模型的参数规模也飞速增长,OpenAI数据显示:

  • 2012年以前,模型计算耗时每2年增长一倍,和摩尔定律保持一致;
  • 2012年后,模型计算耗时每3.4个月翻一倍,远超硬件发展速度;


近一年来,百亿、千亿级的参数模型陆续面世,谷歌、英伟达、阿里、智源研究院更是发布了万亿参数模型。因此,大模型已经成为了未来深度学习的趋势。提到大模型,就不得不提分布式训练,由于模型参数和训练数据的不断增多,只有通过分布式训练才能完成大模型的训练任务。


分布式训练可以分为数据并行、模型并行,流水线并行和混合并行。分布式算法又有典型的parameter server和ring all-reduce。无论是哪一种分布式技术一个核心的关键就是如何进行communication,这是实现分布式训练的基础,因此要想掌握分布式训练或当前流行的大模型训练务必对worker间的通信方式有所了解。


互联网上已经有很多关于分布式训练的通信方面的文章,但是均没有代码层面的例子。我是属于比较愚钝类型的,只有通过自己手动实现一下方能对一些抽象的概念有较深的理解。因此,上一篇Pytorch - 分布式通信原语通过pytorch中的分布式原语库来介绍每个通信原语的行为表现,本篇文章将介绍如何在这些原语上实现分布式训练。


2 整体流程

手动数据并行的分布式训练,整体流程如下:

  • 数据处理:将数据按照rank进行分片,每个rank读取对应的partition;
  • 模型训练:模型构建、forward、loss和backward均与单机相同,不同的是在进行梯度更新之前调用我们自定义的average_gradients 函数进行所有rank间的梯度同步,同步完成之后再调用optimize的step接口进行梯度的更新;
  • 调试执行:启动一个单机2 rank的DDP训练任务;


3 数据处理

3.1 构建数据集

构建通过pytorch中提供的torchvision DataSet来创建MNIST数据集;

  • 参数root为数据下载的目录;
  • 参数train指明当前创建的DataSet使用的是MNIST的训练集还是测试集;
  • 参数download指明是否进行数据的下载;
  • 参数transform指明DataSet中数据的变化方式
  • toTensor() 将数据转换为tensor表示的形式;
  • Normalize是对数据进行归一化;
    dataset = datasets.MNIST(
        root='./data',
        train=True,
        download=True,
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307, ), (0.3081, ))
        ]))


3.2 数据切分

通过DataPartitioner对象对dataset进行切分,执行逻辑如下:

  • 构建阶段:将dataset中的数据随机按照sizes的比例分配到不同的partition中
  • 返回阶段:返回参数partition指定的对应数据分片
class DataPartitioner(object):
    """ Partitions a dataset into different chuncks. """
    # 先对index进行shuffle
    # 然后按照size进行partition
    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])


通过下面的Partition对象来实现sub dataset数据的遍历

class Partition(object):
    """ Dataset-like object, but only access a subset of it. """

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


3.3 完整数据处理

def partition_dataset():
    """ Partitioning MNIST """
    dataset = datasets.MNIST(
        './data',
        train=True,
        download=True,
        transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307, ), (0.3081, ))
        ]))
    size = int(dist.get_world_size()) # 获取rank的个数
    total_bach_size = 128
    bsz = int(total_bach_size / float(size)) # 每个rank对应的batch size
    partition_sizes = [1.0 / size for _ in range(size)] # 设置每个rank处理数据量的大小
    partition = DataPartitioner(dataset, partition_sizes) # 数据切分
    partition = partition.use(dist.get_rank()) # 获取当前rank对应的数据

    train_set = torch.utils.data.DataLoader(partition, batch_size=bsz, shuffle=True)
    return train_set, bsz


4 模型训练

4.1 模型构建

由于本例是DDP(数据并行),模型被完整加载到一个GPU上,因此模型的构建单卡训练一致。

class Net(nn.Module):
    """ Network architecture. """

    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)


4.2 训练

训练主流程同单机训练基本一致,只是在后向传播和梯度更新之间新添加个average_gradients逻辑来在所有rank之间做梯度的平均

def run(rank, size):
    """ Distributed Synchronous SGD Example """
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    model = model
    model = model.cuda(rank)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            data, target = Variable(data), Variable(target)
            data, target = Variable(data.cuda(rank)), Variable(target.cuda(rank))
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ',
              dist.get_rank(), ', epoch ', epoch, ': ',
              epoch_loss / num_batches)


4.3 梯度平均

梯度平均逻辑如下:

  • 遍历模型中的所有参数;
  • 对每个参数调用dist.all_reduce,并求平均;
  • pytorch中分布式原语的使用,可以参考上一篇文章:Pytorch - 分布式通信原语(附源码)
def average_gradients(model):
    """ Gradient averaging. """
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size


5 调试执行

代码执行环境:

image: pytorch/pytorch:1.11.0-cuda11.3-cudnn8-runtime

gpu: v100


/workspace/communication# python all_reduce_train.py
Rank  0 , epoch  0 :  1.330785048033383
Rank  1 , epoch  0 :  1.3148830299819712
Rank  0 , epoch  1 :  0.549655341136176
Rank  1 , epoch  1 :  0.5391304553317617
Rank  0 , epoch  2 :  0.43256897175871234
Rank  1 , epoch  2 :  0.42089327191238973
Rank  0 , epoch  3 :  0.37275126312714396
Rank  1 , epoch  3 :  0.3543623070409303
Rank  0 , epoch  4 :  0.31136283705801343
Rank  1 , epoch  4 :  0.3075531961868948
Rank  0 , epoch  5 :  0.29167098775982603
Rank  1 , epoch  5 :  0.2841323056836118
Rank  0 , epoch  6 :  0.26905299833556734
Rank  1 , epoch  6 :  0.26066392272520167
Rank  0 , epoch  7 :  0.25440651411885645
Rank  1 , epoch  7 :  0.2499371356706121
Rank  0 , epoch  8 :  0.2421310727260133
Rank  1 , epoch  8 :  0.2329997108149122
Rank  0 , epoch  9 :  0.22838556196199042
Rank  1 , epoch  9 :  0.2229069949927996

相关推荐

爬取电影视频数据(电影资源爬虫)

本文的文字及图片来源于网络,仅供学习、交流使用,不具有任何商业用途,如有问题请及时联系我们以作处理。作者:yangrq1018原文链接:https://segmentfault.com/a/11900...

Python效率倍增的10个实用代码片段

引言Python是一门功能强大且灵活的编程语言,广泛应用于数据分析、Web开发、人工智能等多个领域。它的简洁语法和高可读性让开发者能够快速上手,但在实际工作中,我们常常会遇到一些重复性或繁琐的任务。这...

Python数据处理:深入理解序列化与反序列化

在现代编程实践中,数据的序列化与反序列化是数据持久化、网络通信等领域不可或缺的技术。本文将深入探讨Python中数据序列化与反序列化的概念、实现方式以及数据验证的重要性,并提供丰富的代码示例。...

亿纬锂能:拟向PKL买地,在马来西亚建立锂电池制造厂

亿纬锂能5月12日公告,亿纬马来西亚与PEMAJUKELANGLAMASDN.BHD.(PKL)签订《MEMORANDUMOFUNDERSTANDING》(谅解备忘录),亿纬马来西亚拟向PKL购买标的...

一个超强的机器学习库(spark机器学习库)

简介PyCaret...

30天学会Python编程:9. Python文件与IO操作

9.1文件操作基础9.1.1文件操作流程9.1.2文件打开模式表9-1Python文件打开模式...

Python的Pickle序列化与反序列化(python反序列化json)

动动小手,点击关注...

python进阶突破内置模块——数据序列化与格式

数据序列化是将数据结构或对象转换为可存储/传输格式的过程,反序列化则是逆向操作。Python提供了多种工具来处理不同场景下的序列化需求。一、核心内置模块...

微信聊天记录可视化工具详细介绍(微信聊天记录分析报告小程序)

功能概要能做什么...

Python常用文件操作库使用详解(python中文件操作的相关函数有哪些)

Python生态系统提供了丰富的文件操作库,可以处理各种复杂的文件操作需求。本教程将介绍Python中最常用的文件操作库及其实际应用。一、标准库核心模块1.1os模块-操作系统接口主要功能...

Vue3+Django4全新技术实战全栈项目(已完结)

获课》aixuetang.xyz/5739/Django与推荐算法的集成及模型部署实践...

性能调优方面,经常要优化跑的最慢的代码,教你一种快速的方法

在我们遇到性能问题的时候,很多时候需要去查看性能的瓶颈在哪里,本篇文章就是提供了多种常用的方案来监控函数的运行时间。1.time首先说明,time模块很多是系统相关的,在不同的OS中可能会有一些精度差...

Python解决读取excel数据慢的问题

前言:在做自动化测试的时候,我思考了一个问题,就是如果我们的测试用例随着项目的推进越来越多时,我们做自动化回归的时间也就越来越长,其中影响自动化测试速度的一个原因就是测试用例的读取问题。用例越多,所消...

【Python机器学习系列】基于Flask来构建API调用机器学习模型服务

这是我的第364篇...

不会用mmdet工具?速看MMDetection工具的终极指南

来源:计算机视觉工坊添加微信:dddvisiona,备注:目标检测,拉你入群。文末附行业细分群...

取消回复欢迎 发表评论: