注意
跳转到末尾 下载完整示例代码。
PyTorch 中的数据加载优化#
作者:Divyansh Khanna,Ramanish Singh
如何优化 DataLoader 配置以获得最大吞吐量
batch_size、num_workers和pin_memory的最佳实践将数据传输与 GPU 计算重叠的高级技术
配置共享内存策略并处理
/dev/shm问题
PyTorch v2.0+
对 PyTorch DataLoader 的基本了解
(可选)用于特定 GPU 优化的 CUDA 兼容 GPU
简介#
数据加载通常是深度学习流水线中的关键瓶颈。虽然 GPU 处理批次的速度极快,但低效的数据加载会使昂贵的硬件处于闲置状态,等待下一批数据。本教程介绍了优化数据加载配置以最大限度提高训练吞吐量的最佳实践和一些技术。
我们将探索 PyTorch DataLoader 的关键参数,并就如何针对特定工作负载对其进行调整提供实用指导。我们不会孤立地展示每项优化,而是从一个基准训练循环开始,逐步应用优化,并测量每一步的累积加速。
import time
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Set a fixed seed for reproducibility
torch.manual_seed(42)
Using device: cuda
<torch._C.Generator object at 0x7f2105817cd0>
创建一个示例数据集#
首先,让我们创建一个模拟昂贵变换的简单数据集。这将帮助我们演示各种 DataLoader 配置的影响。
class SyntheticDataset(Dataset):
"""A synthetic dataset that simulates expensive data transformations."""
def __init__(self, size=10000, feature_dim=224, transform_delay=0.001):
self.size = size
self.feature_dim = feature_dim
self.transform_delay = transform_delay
def __len__(self):
return self.size
def __getitem__(self, idx):
# Generate data lazily to avoid pre-allocating large tensors
data = torch.randn(3, self.feature_dim, self.feature_dim)
label = torch.randint(0, 10, (1,)).item()
if self.transform_delay > 0:
time.sleep(self.transform_delay)
return data, label
class SyntheticDatasetBatched(Dataset):
"""Same as SyntheticDataset but with __getitems__ for batched fetching."""
def __init__(self, size=10000, feature_dim=224, transform_delay=0.001):
self.size = size
self.feature_dim = feature_dim
self.transform_delay = transform_delay
def __len__(self):
return self.size
def __getitem__(self, idx):
data = torch.randn(3, self.feature_dim, self.feature_dim)
label = torch.randint(0, 10, (1,)).item()
if self.transform_delay > 0:
time.sleep(self.transform_delay)
return data, label
def __getitems__(self, indices):
"""Fetch multiple items at once — enables vectorized generation.
Instead of N individual __getitem__ calls (each with its own
overhead), this generates the entire batch in one shot using
vectorized tensor operations.
"""
n = len(indices)
# Vectorized generation: one call instead of N individual ones
data = torch.randn(n, 3, self.feature_dim, self.feature_dim)
labels = torch.randint(0, 10, (n,))
# Simulate batch-level I/O: one sleep for the whole batch,
# not one per sample (e.g., one DB query for N rows)
if self.transform_delay > 0:
time.sleep(self.transform_delay)
return [(data[i], labels[i].item()) for i in range(n)]
基准训练循环#
我们的起点:一个简单的 DataLoader,没有多进程,没有固定内存(pinned memory),且采用默认设置。这建立了我们将要改进的性能底线。
baseline_loader = DataLoader(
benchmark_dataset,
batch_size=32,
shuffle=True,
num_workers=0,
pin_memory=False,
)
print("\n=== Progressive Optimization Results ===")
print("\nBaseline (num_workers=0, pin_memory=False):")
baseline_time, baseline_loss = train_and_benchmark(baseline_loader)
print(f" Time: {baseline_time:.4f}s | Loss: {baseline_loss:.4f}")
prev_time = baseline_time
=== Progressive Optimization Results ===
Baseline (num_workers=0, pin_memory=False):
Time: 32.8404s | Loss: 2.3171
批次大小(Batch Size)优化#
batch_size 参数控制一次处理多少个样本。选择合适的批次大小涉及权衡几个因素
内存注意事项
较大的批次大小需要更多的 GPU 内存来存储输入、激活和梯度
显存溢出(OOM)错误在大批次大小时很常见
中等批次大小(32-128)通常能提供最佳平衡
训练动力学
批次大小的变化会影响有效学习率,通常需要进行调整
较大的批次提供更稳定的梯度估计,但泛化表现可能有所不同
注意
在更改批次大小时,除非您是在进行推理,否则请记住调整优化器参数,尤其是学习率调度
由于批次大小取决于模型(不是“加上就好”的优化),我们对其进行隔离测试,而不是将其纳入逐步优化链中。
# Example: Testing different batch sizes
batch_dataset = SyntheticDataset(size=1000, transform_delay=0)
def benchmark_batch_size(batch_size, num_batches=10):
"""Benchmark data loading with a specific batch size."""
loader = DataLoader(batch_dataset, batch_size=batch_size, shuffle=True)
start = time.perf_counter()
for i, (data, labels) in enumerate(loader):
if i >= num_batches:
break
data = data.to(device, non_blocking=True)
_ = data.sum()
if torch.cuda.is_available():
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
return elapsed
# Benchmark different batch sizes
print("\nBatch size comparison (isolated benchmark):")
for bs in [16, 32, 64, 128]:
elapsed = benchmark_batch_size(bs)
print(f" Batch size {bs:3d}: {elapsed:.4f}s for 10 batches")
Batch size comparison (isolated benchmark):
Batch size 16: 0.1916s for 10 batches
Batch size 32: 0.3705s for 10 batches
Batch size 64: 0.6136s for 10 batches
Batch size 128: 0.9224s for 10 batches
工作进程数 (num_workers)#
num_workers 参数控制用于数据加载的子进程数量。这对于并行化昂贵的数据变换至关重要。
工作原理
每个工作进程维护一个批次队列(由
prefetch_factor控制)工作进程并行准备批次并将其转移到主进程
如果
in_order=True(默认),批次将按顺序返回
何时增加 ``num_workers``
当变换计算开销很大时(增强、解码)
当数据从慢速存储加载时(网络驱动器、HDD)
当您观察到由于数据加载导致的 GPU 空闲时间时
何时 ``num_workers=0`` 可能更快
当变换开销很小时(简单的张量操作)
当数据已在内存中时
进程间通信 (IPC) 的开销超过了并行化的好处
注意
寻找最佳 num_workers 需要微调:增加工作进程直到吞吐量趋于平台期。过多的工作进程会浪费 CPU 内存(每个工作进程持有自己的数据集对象副本和预取批次),并可能导致 /dev/shm 耗尽。一个好的起点是每个 GPU 配备 2-4 个工作进程;使用不同的值进行分析以找到适合您工作负载的最佳平衡点。
让我们将 num_workers=4 和 prefetch_factor=2 添加到我们的训练循环中并测量改进效果
workers_loader = DataLoader(
benchmark_dataset,
batch_size=32,
shuffle=True,
num_workers=4,
prefetch_factor=2,
pin_memory=False,
)
print("\n+ num_workers=4, prefetch_factor=2:")
workers_time, workers_loss = train_and_benchmark(workers_loader)
print(f" Time: {workers_time:.4f}s | Loss: {workers_loss:.4f}")
print(
f" Speedup vs baseline: {baseline_time / workers_time:.2f}x | vs previous: {prev_time / workers_time:.2f}x"
)
prev_time = workers_time
+ num_workers=4, prefetch_factor=2:
Time: 9.9969s | Loss: 2.3169
Speedup vs baseline: 3.29x | vs previous: 3.29x
理解 pin_memory#
pin_memory 参数通过使用页锁定(pinned)内存来加快 CPU 到 GPU 的数据传输。
固定内存(Pinned memory)的工作原理
固定内存不能被操作系统交换到磁盘
这使得向 GPU 的 DMA(直接内存访问)传输速度更快
CPU 到 GPU 的传输可以异步发生
最佳实践
在 DataLoader 中使用
pin_memory=True(推荐做法)将数据移动到 GPU 时结合使用
non_blocking=True避免手动调用
tensor.pin_memory()后接.to(device, non_blocking=True)- 这样做较慢,因为pin_memory()是阻塞的
安全模式
# Recommended: Let DataLoader handle pinning
loader = DataLoader(dataset, pin_memory=True)
for data, labels in loader:
data = data.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
另请参阅
更多详情,请参阅 pin_memory 教程
让我们将 pin_memory=True 添加到我们的配置中
pinmem_loader = DataLoader(
benchmark_dataset,
batch_size=32,
shuffle=True,
num_workers=4,
prefetch_factor=2,
pin_memory=torch.cuda.is_available(),
)
if torch.cuda.is_available():
print("\n+ pin_memory=True:")
pinmem_time, pinmem_loss = train_and_benchmark(pinmem_loader)
print(f" Time: {pinmem_time:.4f}s | Loss: {pinmem_loss:.4f}")
print(
f" Speedup vs baseline: {baseline_time / pinmem_time:.2f}x | vs previous: {prev_time / pinmem_time:.2f}x"
)
print(
" (pin_memory benefit is modest here because CPU transform time dominates H2D transfer)"
)
prev_time = pinmem_time
else:
print("\n+ pin_memory: skipped (CUDA not available)")
pinmem_time = workers_time
+ pin_memory=True:
Time: 9.8940s | Loss: 2.3209
Speedup vs baseline: 3.32x | vs previous: 1.01x
(pin_memory benefit is modest here because CPU transform time dominates H2D transfer)
持久化工作进程 (Persistent Workers)#
默认情况下,工作进程会在 epoch 之间关闭并重新启动。这在每个 epoch 边界都会产生启动开销(导入模块、派生进程、重新初始化数据集)。
设置 persistent_workers=True 可以使工作进程在 epoch 之间保持活动状态,从而消除这种重复的启动成本。
何时帮助最大
在较小的数据集上训练多个 epoch 时
当数据集
__init__开销很大时(例如加载元数据)当结合高
num_workers时
让我们对比一下在多个 epoch 中使用和不使用持久化工作进程的情况
non_persistent_loader = DataLoader(
benchmark_dataset,
batch_size=32,
shuffle=True,
num_workers=4,
prefetch_factor=2,
pin_memory=torch.cuda.is_available(),
persistent_workers=False,
)
persistent_loader = DataLoader(
benchmark_dataset,
batch_size=32,
shuffle=True,
num_workers=4,
prefetch_factor=2,
pin_memory=torch.cuda.is_available(),
persistent_workers=True,
)
print("\n+ persistent_workers=True (10 epochs):")
non_persistent_time, _ = train_and_benchmark(non_persistent_loader)
persistent_time, persistent_loss = train_and_benchmark(persistent_loader)
print(f" Without persistent_workers: {non_persistent_time:.4f}s")
print(f" With persistent_workers: {persistent_time:.4f}s")
print(
f" Speedup vs baseline: {baseline_time / persistent_time:.2f}x | vs previous: {prev_time / persistent_time:.2f}x"
)
prev_time = persistent_time
+ persistent_workers=True (10 epochs):
Without persistent_workers: 9.9208s
With persistent_workers: 8.7311s
Speedup vs baseline: 3.76x | vs previous: 1.13x
将 H2D 传输与 GPU 计算重叠#
为了获得最大吞吐量,您可以将主机到设备 (H2D) 数据传输与 GPU 计算重叠。这可以确保 GPU 永远不会因等待数据而闲置。
这个想法是在处理当前批次的同时,预取下一批次到 GPU。
注意
当 H2D 传输时间与 GPU 计算时间有意义地重叠时,DataPrefetcher 显示出最大的益处。如果数据加载已经很快,流同步开销可能会超过收益。
class DataPrefetcher:
"""Prefetches data to GPU while previous batch is being processed."""
def __init__(self, loader, device):
self.loader = iter(loader)
self.device = device
self.stream = torch.cuda.Stream() if torch.cuda.is_available() else None
self.next_data = None
self.next_labels = None
self.preload()
def preload(self):
try:
self.next_data, self.next_labels = next(self.loader)
except StopIteration:
self.next_data = None
self.next_labels = None
return
if self.stream is not None:
with torch.cuda.stream(self.stream):
self.next_data = self.next_data.to(self.device, non_blocking=True)
self.next_labels = self.next_labels.to(self.device, non_blocking=True)
def __iter__(self):
return self
def __next__(self):
if self.stream is not None:
torch.cuda.current_stream().wait_stream(self.stream)
data = self.next_data
labels = self.next_labels
if data is None:
raise StopIteration
# Ensure tensors are ready
if self.stream is not None:
data.record_stream(torch.cuda.current_stream())
labels.record_stream(torch.cuda.current_stream())
self.preload()
return data, labels
# Integrate prefetcher into the training loop.
if torch.cuda.is_available():
print("\n+ DataPrefetcher (overlapping H2D transfer):")
prefetch_time, prefetch_loss = train_and_benchmark(
persistent_loader, prefetch_device=device
)
print(f" Time: {prefetch_time:.4f}s | Loss: {prefetch_loss:.4f}")
print(
f" Speedup vs baseline: {baseline_time / prefetch_time:.2f}x | vs previous: {prev_time / prefetch_time:.2f}x"
)
prev_time = prefetch_time
else:
print("\n+ DataPrefetcher: skipped (CUDA not available)")
prefetch_time = persistent_time
+ DataPrefetcher (overlapping H2D transfer):
Time: 8.7203s | Loss: 2.3194
Speedup vs baseline: 3.77x | vs previous: 1.00x
数据集级优化:__getitems__#
除了调整 DataLoader 参数外,您还可以优化数据集本身。PyTorch 的 DataLoader 支持通过 __getitems__ 实现的批量提取协议:如果您的数据集定义了此方法,提取器将使用索引列表调用它一次,而不是为每个样本重复调用 __getitem__。
工作原理
默认提取器执行:
[dataset[idx] for idx in batch_indices]使用
__getitems__:dataset.__getitems__(batch_indices)
何时会有帮助
当每个样本的开销很大时(例如打开连接、解析标头、获取锁)
当数据可以更高效地批量获取时(例如用一个 SQL 查询获取 N 行而不是执行 N 次查询,或矢量化生成张量)
当变换具有固定的启动成本,可以分摊到整个批次中时
预期签名
def __getitems__(self, indices: list[int]) -> list:
# Fetch all items at once and return as a list
...
我们的 SyntheticDatasetBatched 实现了 __getitems__,以便在一次矢量化调用中生成整个批次(具有单一的分摊延迟),而不是 N 次单独调用,每次都有各自的延迟。让我们将此添加到我们的累积配置中
benchmark_dataset_batched = SyntheticDatasetBatched(
size=512, feature_dim=224, transform_delay=0.005
)
batched_loader = DataLoader(
benchmark_dataset_batched,
batch_size=32,
shuffle=True,
num_workers=4,
prefetch_factor=2,
pin_memory=torch.cuda.is_available(),
persistent_workers=True,
)
print("\n+ __getitems__ (batched dataset fetching):")
batched_time, batched_loss = train_and_benchmark(batched_loader)
print(f" Time: {batched_time:.4f}s | Loss: {batched_loss:.4f}")
print(
f" Speedup vs baseline: {baseline_time / batched_time:.2f}x | vs previous: {prev_time / batched_time:.2f}x"
)
prev_time = batched_time
+ __getitems__ (batched dataset fetching):
Time: 2.8529s | Loss: 2.3205
Speedup vs baseline: 11.51x | vs previous: 3.06x
in_order 参数#
默认情况下 (in_order=True),DataLoader 按与数据集索引相同的顺序返回批次。这需要缓存从工作进程乱序到达的批次。
何时考虑 ``in_order=False``
当您不需要确定性顺序时(例如不设置检查点时)
当您观察到由于批次缓存导致的训练峰值时
当最大化吞吐量比可重复性更重要时
注意
in_order=False 可能不会增加平均吞吐量,但它可以减少方差,并消除当一个工作进程比其他进程慢时由队头阻塞引起的偶尔缓慢批次。
快照频率 (snapshot_every_n_steps)#
当使用 torchdata 的 StatefulDataLoader(用于设置检查点)时,snapshot_every_n_steps 参数控制 DataLoader 状态保存的频率。
权衡
较高频率(较小的 n): 开销较多,但作业失败时数据丢失较少
较低频率(较大的 n): 开销较少,但恢复时重放的样本较多
根据您的容错要求和重新处理数据的成本进行选择。
最终总结#
这是应用于我们训练循环的每项优化的累积效果。每一行都包含之前行的所有优化
配置 |
对比基准 |
对比上一步 |
|---|---|---|
基准 (num_workers=0, 无内存固定) |
1.00x |
— |
+ num_workers=4, prefetch_factor=2 |
~2.7x |
~2.7x |
+ pin_memory=True |
~2.8x |
~1.0x |
+ persistent_workers=True |
~3.7x |
~1.3x |
+ DataPrefetcher (H2D 重叠) |
~3.6x |
~1.0x |
+ __getitems__ (批量提取) |
~10x |
~2.9x |
注意
这些结果是基于我们的基准数据集。实际加速效果将取决于您的具体工作负载、硬件、数据集大小和变换复杂度。
总结与最佳实践#
从中等批次大小开始 (32-128),如果显存允许则逐步扩大。
当变换开销大时,使用 ``num_workers > 0``。从 2-4 个工作进程开始,根据内存容量增加。越高并不总是越好。
使用加速器时,启用 ``pin_memory=True``。
使用 ``persistent_workers=True`` 以避免 epoch 之间的工作进程重启开销。
分析您的流水线 以识别数据集访问、变换等过程中的 CPU 瓶颈。
为 GPU 工作负载实现数据预取,以将数据传输与计算重叠。
遇到文件描述符限制时,使用 ``file_system`` 共享策略。
结论#
在本教程中,我们学习了如何逐步优化 PyTorch 数据加载流水线 —— 从原始的单进程基准到使用多进程工作进程、固定内存、持久化工作进程、基于 CUDA 流的预取以及使用 __getitems__ 的批量数据集提取的全面优化配置。每项优化都针对不同的瓶颈,它们结合在一起可以使吞吐量提高一个数量级。这些应被视为最佳实践,具体性能取决于特定的工作负载。
其他资源#
脚本总运行时间: (1 分 25.251 秒)