pytorch--流水线并行

流水线并行(pipelining )部署实施起来非常困难,因为这需要根据模型的weights把模型分块(通常涉及到对源码的修改),此外,分布式的调度和数据流的依赖也是要考虑的点;
pipelining 库可以让部署变得更加简单;
这个库包含两个部分:
splitting frontend:此部分用于把模型分块,并且捕捉到数据流之间的关系;
distributed runtime:并行地执行pipeline stage在不同的设备上,同时处理好batch的划分、调度、通信和梯度回传;
所以这个库支持以下操作:
1.对于模型的简单划分;
2.丰富的流水线调度策略,包括GPipe, 1F1B, Interleaved 1F1B and Looped BFS;
3.支持跨主机的并行;
4.支持一些常规的并行操作,比如data parallel (DDP, FSDP) or tensor parallel;
关于模型的splitting:
为了构建PipelineStage,需要提供包含了nn.Parameters and nn.Buffers的nn.Module,同时定义了能够执行对应stage的forward函数

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers witout affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

用这种方式定义的模型可以很容易配置stage和初始化,(为了防止OMM error使用meta device),删除对应stage不需要的层,然后构造PipelineStage 来wrap model;

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
        input_args=example_input_microbatch,
    )

这里还提供自动切分模型的接口函数,这里不做细致赘述;
其中input_args 代表执行时候的input data samples,这个要拿去经过forward去确定输入输出的shapes;当同时使用其他并行trick的时候,output_args 也需要的,因为模型输出大小可能会受到影响;
第一步:构建一个执行的PipelineStage
PipelineStage用于分配通信内存,创造发送、接受操作去通信;它用来存储还未被consume的forward的缓存,同时为stage model执行backward;
PipelineStage需要知道输入输出的shape大小,方便创建通信缓存,shapes必须是固定大小的,也就是训练执行的时候它不能是变化的;
每一个stage model必须是nn.Module的格式;(所以第一步要做的事情就是手动分割模型);
当然也有其他替代方式,可以用图分割去把你的模型自动分割为一系列的nn.Module,这个要求模型必须是torch.Export traceable ;所以能手动更改模型代码是最方便的;
第二步:用PipelineSchedule 去执行
以下是执行的示例代码:

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

以上代码的rank应该指的是进程号,也就是在0进程中,执行stage1,在1进程中执行stage 2;

以下是官方给的关于llama流水线并行的示例代码,会更加清晰明了;

# $ torchrun --nproc-per-node 4 pippy_llama.py
import os
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from torch.distributed.pipelining import SplitPoint, pipeline, ScheduleGPipe

# Grab the model
llama = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-chat-hf", low_cpu_mem_usage=True
)
print(llama)

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
tokenizer.pad_token = tokenizer.eos_token
mb_prompts = (
    "How do you", "I like to",
)  # microbatch size = 2

rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank % torch.cuda.device_count()}")
torch.distributed.init_process_group(rank=rank, world_size=world_size)

llama.to(device).eval()

# Cut model by equal number of layers per rank
layers_per_rank = llama.config.num_hidden_layers // world_size
print(f"layers_per_rank = {layers_per_rank}")
split_spec = {
    f"model.layers.{i * layers_per_rank}": SplitPoint.BEGINNING
    for i in range(1, world_size)
}

# Create a pipeline representation from the model
mb_inputs = tokenizer(mb_prompts, return_tensors="pt", padding=True).to(device)
pipe = pipeline(llama, mb_args=(mb_inputs["input_ids"],))

# Create pipeline stage for each rank
stage = pipe.build_stage(rank, device=device)

# Run time inputs
full_batch_prompts = (
    "How do you", "I like to", "Can I help", "You need to",
    "The weather is", "I found a", "What is your", "You are so",
)  # full batch size = 8
inputs = tokenizer(full_batch_prompts, return_tensors="pt", padding=True).to(device)

# Attach to a schedule
# number of microbatches = 8 // 2 = 4
num_mbs = 4
schedule = ScheduleGPipe(stage, num_mbs)

# Run
if rank == 0:
    args = inputs["input_ids"]
else:
    args = None

output = schedule.step(args)

# Decode
if output is not None:
    next_token_logits = output[0][:, -1, :]
    next_token = torch.argmax(next_token_logits, dim=-1)
    print(tokenizer.batch_decode(next_token))

torchrun --nproc-per-node 8 pippy_llama.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882609.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

10.软件工程知识详解上

软件工程概述 软件开发生命周期 软件定义时期:包括可行性研究和详细需求分析过程,任务是确定软件开发工程必须完成的总目标,具体可分成问题定义、可行性研究、需求分析等。软件开发时期:就是软件的设计与实现,可分成…

汽车总线之----FlexRay总线

Introduction 随着汽车智能化发展,车辆开发的ECU数量不断增加,人们对汽车系统的各个性能方面提出了更高的需求,比如更多的数据交互,更高的传输带宽等。现如今人们广泛接受电子功能来提高驾驶安全性,像ABS防抱死系统&a…

git push出错Push cannot contain secrets

报错原因: 因为你的代码里面包含了github token明文信息,github担心你的token会泄漏,所以就不允许你推送这些内容。 解决办法: 需要先把代码里面的github token信息删除掉,并且删掉之前的历史提交,只要包…

关于ShuffleNetV1中的channel shuffle操作【代码分析】

1. 官方给出的代码 旷视科技在自己的开源GitHub上给出的channel shuffle相关代码如下图所示: 分析上图中的代码,旷视科技将channel shuffle这个操作视为一个函数,函数传入的参数是输入张量x,x的shape为(batchsize, num_ch…

Ceph 基本架构(一)

Ceph架构图 Ceph整体组成 Ceph 是一个开源的分布式存储系统,设计用于提供优秀的性能、可靠性和可扩展性。Ceph 的架构主要由几个核心组件构成,每个组件都有特定的功能,共同协作以实现高可用性和数据的一致性。 以下是 Ceph 的整体架构及其…

大数据处理从零开始————3.Hadoop伪分布式和分布式搭建

1.伪分布式搭建(不会用,了解就好不需要搭建) 这里接上一节。 1.1 伪分布式集群概述 伪分布式集群就是只有⼀个服务器节点的分布式集群。在这种模式中,我们也是只需要⼀台机器。 但与本地模式不同,伪分布式采⽤了分布式…

新手操作指引:快速上手腾讯混元大模型

引言 腾讯混元大模型是一款功能强大的AI工具,适用于文本生成、图像创作和视频生成等多种应用场景。对于新手用户,快速上手并充分利用这一工具可能会有些挑战。本文将提供详细的新手操作指引,帮助您轻松开始使用腾讯混元大模型。 步骤一&…

计算机毕业设计 基于Python内蒙古旅游景点数据分析系统 Django+Vue 前后端分离 附源码 讲解 文档

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点…

006——队列

目录 队列: 单端队列: 存储结构: 顺序队列 思路1:r指针指向尾元素的下一个位置 思路2:r指针指向真正的尾元素 如何解决假溢出的问题? 链式队列 双端队列 存储方式: 顺式存储 代码案例…

Redis基础数据结构之 Sorted Set 有序集合 源码解读

目录标题 Sorted Set 是什么?Sorted Set 数据结构跳表(skiplist)跳表节点的结构定义跳表的定义跳表节点查询层数设置 Sorted Set 基本操作 Sorted Set 是什么? 有序集合(Sorted Set)是 Redis 中一种重要的数据类型,…

国央企如何完善黑名单排查体系?

国央企完善黑名单排查体系的关键在于建立健全的供应商管理机制、风险评估体系和信息共享平台。以下是一些具体措施: 1.建立黑名单库:国央企可以依据外部黑名单数据(如政府监管部门、行业协会、第三方征信机构公布的黑名单)和内部…

瑞芯微RK3588开发板Linux系统添加自启动命令的方法,深圳触觉智能Arm嵌入式鸿蒙硬件方案商

本文适用于触觉智能所有Linux系统的开发板、主板添加自启动命令的方法,本次使用了触觉智能的EVB3588开发板演示,搭载了瑞芯微RK3588旗舰芯片。 该开发板为核心板加底板设计,为工业场景设计研发的模块化产品,10年以上稳定供货,帮助…

免费分享:全月地质图

数据详情 世界第一幅1∶250万月球全月地质图 数据属性 数据名称:月球1:250万全月地质图 数据时间:- 空间位置:月球 数据格式:jpg 空间分辨率:1:250万 坐标系:- 下载方法 打开数字地球开放平台网站&…

跨境商家如何在1688找优质供应商货源,新手卖家必看

选产品和找供应,是每个跨境人不可避免的,但是盲目的选品,无疑是大海捞针。如果你选择的商品没有固定的供应商,要上1688找又得花不少时间,店雷达选品工具就能够帮助我们解决这个问题。据我所知,很多跨境同行…

STM32上实现FFT算法精准测量正弦波信号的幅值、频率和相位差(标准库)

在研究声音、电力或任何形式的波形时,我们常常需要穿过表面看本质。FFT(快速傅里叶变换)就是这样一种强大的工具,它能够揭示隐藏在复杂信号背后的频率成分。本文将带你走进FFT的世界,了解它是如何将时域信号转化为频域…

最新绿豆影视系统 /反编译版源码/PC+WAP+APP端 /附搭建教程+软件

源码简介: 最新的绿豆影视系统5.1.8,这可是个反编译版的源码哦!它不仅支持PC端、WAP端,还有APP端,一应俱全。而且附上了搭建教程和软件,安卓和苹果双端都能用,实用方便! 优化内容&…

设计模式 组合模式(Composite Pattern)

组合模式简绍 组合模式(Composite Pattern)是一种结构型设计模式,它允许你将对象组合成树形结构来表示“部分-整体”的层次结构。组合模式使得客户端可以用一致的方式处理单个对象和组合对象。这样,可以在不知道对象具体类型的条…

K8S容器实例Pod安装curl-vim-telnet工具

在没有域名的情况下,有时候需要调试接口等需要此工具 安装curl、telnet、vim等 直接使用 apk add curlapk add vimapk add tennet

裸土检测算法实际应用、裸土覆盖检测算法、裸土检测算法

裸土检测算法主要用于环境保护、农业管理、城市规划和土地管理等领域,通过图像识别技术来检测和识别地表上的裸露土壤。这种技术可以帮助管理者实时监控裸土面积,及时采取措施,防止水土流失、环境污染和生态退化。 一、技术实现 裸土检测算…

内核驱动开发之系统移植

系统移植 系统移植:定制linux操作系统 系统移植是驱动开发的前导,驱动开发是系统运行起来之后,在内核中新增一些子功能而已 系统移植就四个部分: 交叉编译环境搭建好bootloader的选择和移植:BootLoader有一些很成熟…