评价此页

PyTorch 中的数据加载优化#

作者Divyansh KhannaRamanish Singh

您将学到什么
  • 如何优化 DataLoader 配置以获得最大吞吐量

  • batch_sizenum_workerspin_memory 的最佳实践

  • 将数据传输与 GPU 计算重叠的高级技术

  • 配置共享内存策略并处理 /dev/shm 问题

先决条件
  • PyTorch v2.0+

  • 对 PyTorch DataLoader 的基本了解

  • (可选)用于特定 GPU 优化的 CUDA 兼容 GPU

简介#

数据加载通常是深度学习流水线中的关键瓶颈。虽然 GPU 处理批次的速度极快,但低效的数据加载会使昂贵的硬件处于闲置状态,等待下一批数据。本教程介绍了优化数据加载配置以最大限度提高训练吞吐量的最佳实践和一些技术。

我们将探索 PyTorch DataLoader 的关键参数,并就如何针对特定工作负载对其进行调整提供实用指导。我们不会孤立地展示每项优化,而是从一个基准训练循环开始,逐步应用优化,并测量每一步的累积加速。

import time

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Set a fixed seed for reproducibility
torch.manual_seed(42)
Using device: cuda

<torch._C.Generator object at 0x7f2105817cd0>

创建一个示例数据集#

首先,让我们创建一个模拟昂贵变换的简单数据集。这将帮助我们演示各种 DataLoader 配置的影响。

class SyntheticDataset(Dataset):
    """A synthetic dataset that simulates expensive data transformations."""

    def __init__(self, size=10000, feature_dim=224, transform_delay=0.001):
        self.size = size
        self.feature_dim = feature_dim
        self.transform_delay = transform_delay

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        # Generate data lazily to avoid pre-allocating large tensors
        data = torch.randn(3, self.feature_dim, self.feature_dim)
        label = torch.randint(0, 10, (1,)).item()
        if self.transform_delay > 0:
            time.sleep(self.transform_delay)
        return data, label


class SyntheticDatasetBatched(Dataset):
    """Same as SyntheticDataset but with __getitems__ for batched fetching."""

    def __init__(self, size=10000, feature_dim=224, transform_delay=0.001):
        self.size = size
        self.feature_dim = feature_dim
        self.transform_delay = transform_delay

    def __len__(self):
        return self.size

    def __getitem__(self, idx):
        data = torch.randn(3, self.feature_dim, self.feature_dim)
        label = torch.randint(0, 10, (1,)).item()
        if self.transform_delay > 0:
            time.sleep(self.transform_delay)
        return data, label

    def __getitems__(self, indices):
        """Fetch multiple items at once — enables vectorized generation.

        Instead of N individual __getitem__ calls (each with its own
        overhead), this generates the entire batch in one shot using
        vectorized tensor operations.
        """
        n = len(indices)
        # Vectorized generation: one call instead of N individual ones
        data = torch.randn(n, 3, self.feature_dim, self.feature_dim)
        labels = torch.randint(0, 10, (n,))
        # Simulate batch-level I/O: one sleep for the whole batch,
        # not one per sample (e.g., one DB query for N rows)
        if self.transform_delay > 0:
            time.sleep(self.transform_delay)
        return [(data[i], labels[i].item()) for i in range(n)]

共享训练基础设施#

为了衡量每项优化的实际影响,我们定义了一个可重用的训练循环,它接受 DataLoader 并返回计时和损失。这避免了为每个基准测试重复编写训练循环。

我们使用一个带有 高变换延迟 (5ms)小数据集 (500 个样本),以确保流水线在整个过程中始终受数据限制。小数据集意味着 epoch 短(每个 epoch 16 个 batch),因此我们运行多个 epoch —— 使 persistent_workers 在跨 epoch 边界时的优势变得可见。

benchmark_dataset = SyntheticDataset(size=512, feature_dim=224, transform_delay=0.005)


class SmallTransformerModel(nn.Module):

    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=7, stride=4, padding=3),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((7, 7)),
        )
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=64, nhead=4, dim_feedforward=128, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=2)
        self.classifier = nn.Linear(64, 10)

    def forward(self, x):
        x = self.features(x)  # (B, 64, 7, 7)
        B, C, H, W = x.shape
        x = x.view(B, C, H * W).permute(0, 2, 1)  # (B, 49, 64)
        x = self.transformer(x)  # (B, 49, 64)
        x = x.mean(dim=1)  # (B, 64)
        return self.classifier(x)


def create_model():
    """Create a conv+transformer model for benchmarking."""
    return SmallTransformerModel().to(device)


def train_and_benchmark(loader, max_batches=160, epochs=10, prefetch_device=None):
    """Train a model over multiple epochs and return elapsed time and average loss.

    Running multiple epochs (10) with a small dataset ensures many epoch
    boundaries, making persistent_workers' startup savings visible.

    Args:
        loader: A DataLoader to iterate over.
        max_batches: Maximum total number of batches to process across all epochs.
        epochs: Number of epochs to iterate (re-iterates the loader each epoch).
        prefetch_device: If set, wraps the loader in a DataPrefetcher each epoch
            for overlapping H2D transfers. Data arrives already on device.

    Returns:
        Tuple of (elapsed_seconds, average_loss).
    """
    model = create_model()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()

    start_time = time.perf_counter()
    total_loss = 0.0
    num_batches = 0

    for epoch in range(epochs):
        if prefetch_device is not None:
            data_iter = DataPrefetcher(loader, prefetch_device)
        else:
            data_iter = loader

        for data, labels in data_iter:
            if prefetch_device is None:
                data = data.to(device, non_blocking=True)
                labels = labels.to(device, non_blocking=True)

            output = model(data)
            loss = criterion(output, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            num_batches += 1

            if num_batches >= max_batches:
                break
        if num_batches >= max_batches:
            break

    if torch.cuda.is_available():
        torch.cuda.synchronize()
    elapsed = time.perf_counter() - start_time

    return elapsed, total_loss / num_batches

基准训练循环#

我们的起点:一个简单的 DataLoader,没有多进程,没有固定内存(pinned memory),且采用默认设置。这建立了我们将要改进的性能底线。

baseline_loader = DataLoader(
    benchmark_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=0,
    pin_memory=False,
)

print("\n=== Progressive Optimization Results ===")
print("\nBaseline (num_workers=0, pin_memory=False):")
baseline_time, baseline_loss = train_and_benchmark(baseline_loader)
print(f"  Time: {baseline_time:.4f}s | Loss: {baseline_loss:.4f}")
prev_time = baseline_time
=== Progressive Optimization Results ===

Baseline (num_workers=0, pin_memory=False):
  Time: 32.8404s | Loss: 2.3171

批次大小(Batch Size)优化#

batch_size 参数控制一次处理多少个样本。选择合适的批次大小涉及权衡几个因素

内存注意事项

  • 较大的批次大小需要更多的 GPU 内存来存储输入、激活和梯度

  • 显存溢出(OOM)错误在大批次大小时很常见

  • 中等批次大小(32-128)通常能提供最佳平衡

训练动力学

  • 批次大小的变化会影响有效学习率,通常需要进行调整

  • 较大的批次提供更稳定的梯度估计,但泛化表现可能有所不同

注意

在更改批次大小时,除非您是在进行推理,否则请记住调整优化器参数,尤其是学习率调度

由于批次大小取决于模型(不是“加上就好”的优化),我们对其进行隔离测试,而不是将其纳入逐步优化链中。

# Example: Testing different batch sizes
batch_dataset = SyntheticDataset(size=1000, transform_delay=0)


def benchmark_batch_size(batch_size, num_batches=10):
    """Benchmark data loading with a specific batch size."""
    loader = DataLoader(batch_dataset, batch_size=batch_size, shuffle=True)
    start = time.perf_counter()
    for i, (data, labels) in enumerate(loader):
        if i >= num_batches:
            break
        data = data.to(device, non_blocking=True)
        _ = data.sum()
    if torch.cuda.is_available():
        torch.cuda.synchronize()
    elapsed = time.perf_counter() - start
    return elapsed


# Benchmark different batch sizes
print("\nBatch size comparison (isolated benchmark):")
for bs in [16, 32, 64, 128]:
    elapsed = benchmark_batch_size(bs)
    print(f"  Batch size {bs:3d}: {elapsed:.4f}s for 10 batches")
Batch size comparison (isolated benchmark):
  Batch size  16: 0.1916s for 10 batches
  Batch size  32: 0.3705s for 10 batches
  Batch size  64: 0.6136s for 10 batches
  Batch size 128: 0.9224s for 10 batches

工作进程数 (num_workers)#

num_workers 参数控制用于数据加载的子进程数量。这对于并行化昂贵的数据变换至关重要。

工作原理

  • 每个工作进程维护一个批次队列(由 prefetch_factor 控制)

  • 工作进程并行准备批次并将其转移到主进程

  • 如果 in_order=True(默认),批次将按顺序返回

何时增加 ``num_workers``

  • 当变换计算开销很大时(增强、解码)

  • 当数据从慢速存储加载时(网络驱动器、HDD)

  • 当您观察到由于数据加载导致的 GPU 空闲时间时

何时 ``num_workers=0`` 可能更快

  • 当变换开销很小时(简单的张量操作)

  • 当数据已在内存中时

  • 进程间通信 (IPC) 的开销超过了并行化的好处

注意

寻找最佳 num_workers 需要微调:增加工作进程直到吞吐量趋于平台期。过多的工作进程会浪费 CPU 内存(每个工作进程持有自己的数据集对象副本和预取批次),并可能导致 /dev/shm 耗尽。一个好的起点是每个 GPU 配备 2-4 个工作进程;使用不同的值进行分析以找到适合您工作负载的最佳平衡点。

让我们将 num_workers=4prefetch_factor=2 添加到我们的训练循环中并测量改进效果

workers_loader = DataLoader(
    benchmark_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    prefetch_factor=2,
    pin_memory=False,
)

print("\n+ num_workers=4, prefetch_factor=2:")
workers_time, workers_loss = train_and_benchmark(workers_loader)
print(f"  Time: {workers_time:.4f}s | Loss: {workers_loss:.4f}")
print(
    f"  Speedup vs baseline: {baseline_time / workers_time:.2f}x | vs previous: {prev_time / workers_time:.2f}x"
)
prev_time = workers_time
+ num_workers=4, prefetch_factor=2:
  Time: 9.9969s | Loss: 2.3169
  Speedup vs baseline: 3.29x | vs previous: 3.29x

理解 pin_memory#

pin_memory 参数通过使用页锁定(pinned)内存来加快 CPU 到 GPU 的数据传输。

固定内存(Pinned memory)的工作原理

  • 固定内存不能被操作系统交换到磁盘

  • 这使得向 GPU 的 DMA(直接内存访问)传输速度更快

  • CPU 到 GPU 的传输可以异步发生

最佳实践

  1. 在 DataLoader 中使用 pin_memory=True(推荐做法)

  2. 将数据移动到 GPU 时结合使用 non_blocking=True

  3. 避免手动调用 tensor.pin_memory() 后接 .to(device, non_blocking=True) - 这样做较慢,因为 pin_memory() 是阻塞的

安全模式

# Recommended: Let DataLoader handle pinning
loader = DataLoader(dataset, pin_memory=True)
for data, labels in loader:
    data = data.to(device, non_blocking=True)
    labels = labels.to(device, non_blocking=True)

另请参阅

更多详情,请参阅 pin_memory 教程

让我们将 pin_memory=True 添加到我们的配置中

pinmem_loader = DataLoader(
    benchmark_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    prefetch_factor=2,
    pin_memory=torch.cuda.is_available(),
)

if torch.cuda.is_available():
    print("\n+ pin_memory=True:")
    pinmem_time, pinmem_loss = train_and_benchmark(pinmem_loader)
    print(f"  Time: {pinmem_time:.4f}s | Loss: {pinmem_loss:.4f}")
    print(
        f"  Speedup vs baseline: {baseline_time / pinmem_time:.2f}x | vs previous: {prev_time / pinmem_time:.2f}x"
    )
    print(
        "  (pin_memory benefit is modest here because CPU transform time dominates H2D transfer)"
    )
    prev_time = pinmem_time
else:
    print("\n+ pin_memory: skipped (CUDA not available)")
    pinmem_time = workers_time
+ pin_memory=True:
  Time: 9.8940s | Loss: 2.3209
  Speedup vs baseline: 3.32x | vs previous: 1.01x
  (pin_memory benefit is modest here because CPU transform time dominates H2D transfer)

持久化工作进程 (Persistent Workers)#

默认情况下,工作进程会在 epoch 之间关闭并重新启动。这在每个 epoch 边界都会产生启动开销(导入模块、派生进程、重新初始化数据集)。

设置 persistent_workers=True 可以使工作进程在 epoch 之间保持活动状态,从而消除这种重复的启动成本。

何时帮助最大

  • 在较小的数据集上训练多个 epoch 时

  • 当数据集 __init__ 开销很大时(例如加载元数据)

  • 当结合高 num_workers

让我们对比一下在多个 epoch 中使用和不使用持久化工作进程的情况

non_persistent_loader = DataLoader(
    benchmark_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    prefetch_factor=2,
    pin_memory=torch.cuda.is_available(),
    persistent_workers=False,
)

persistent_loader = DataLoader(
    benchmark_dataset,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    prefetch_factor=2,
    pin_memory=torch.cuda.is_available(),
    persistent_workers=True,
)

print("\n+ persistent_workers=True (10 epochs):")
non_persistent_time, _ = train_and_benchmark(non_persistent_loader)
persistent_time, persistent_loss = train_and_benchmark(persistent_loader)
print(f"  Without persistent_workers: {non_persistent_time:.4f}s")
print(f"  With persistent_workers:    {persistent_time:.4f}s")
print(
    f"  Speedup vs baseline: {baseline_time / persistent_time:.2f}x | vs previous: {prev_time / persistent_time:.2f}x"
)
prev_time = persistent_time
+ persistent_workers=True (10 epochs):
  Without persistent_workers: 9.9208s
  With persistent_workers:    8.7311s
  Speedup vs baseline: 3.76x | vs previous: 1.13x

将 H2D 传输与 GPU 计算重叠#

为了获得最大吞吐量,您可以将主机到设备 (H2D) 数据传输与 GPU 计算重叠。这可以确保 GPU 永远不会因等待数据而闲置。

这个想法是在处理当前批次的同时,预取下一批次到 GPU。

注意

当 H2D 传输时间与 GPU 计算时间有意义地重叠时,DataPrefetcher 显示出最大的益处。如果数据加载已经很快,流同步开销可能会超过收益。

class DataPrefetcher:
    """Prefetches data to GPU while previous batch is being processed."""

    def __init__(self, loader, device):
        self.loader = iter(loader)
        self.device = device
        self.stream = torch.cuda.Stream() if torch.cuda.is_available() else None
        self.next_data = None
        self.next_labels = None
        self.preload()

    def preload(self):
        try:
            self.next_data, self.next_labels = next(self.loader)
        except StopIteration:
            self.next_data = None
            self.next_labels = None
            return

        if self.stream is not None:
            with torch.cuda.stream(self.stream):
                self.next_data = self.next_data.to(self.device, non_blocking=True)
                self.next_labels = self.next_labels.to(self.device, non_blocking=True)

    def __iter__(self):
        return self

    def __next__(self):
        if self.stream is not None:
            torch.cuda.current_stream().wait_stream(self.stream)

        data = self.next_data
        labels = self.next_labels

        if data is None:
            raise StopIteration

        # Ensure tensors are ready
        if self.stream is not None:
            data.record_stream(torch.cuda.current_stream())
            labels.record_stream(torch.cuda.current_stream())

        self.preload()
        return data, labels


# Integrate prefetcher into the training loop.
if torch.cuda.is_available():
    print("\n+ DataPrefetcher (overlapping H2D transfer):")
    prefetch_time, prefetch_loss = train_and_benchmark(
        persistent_loader, prefetch_device=device
    )
    print(f"  Time: {prefetch_time:.4f}s | Loss: {prefetch_loss:.4f}")
    print(
        f"  Speedup vs baseline: {baseline_time / prefetch_time:.2f}x | vs previous: {prev_time / prefetch_time:.2f}x"
    )
    prev_time = prefetch_time
else:
    print("\n+ DataPrefetcher: skipped (CUDA not available)")
    prefetch_time = persistent_time
+ DataPrefetcher (overlapping H2D transfer):
  Time: 8.7203s | Loss: 2.3194
  Speedup vs baseline: 3.77x | vs previous: 1.00x

数据集级优化:__getitems__#

除了调整 DataLoader 参数外,您还可以优化数据集本身。PyTorch 的 DataLoader 支持通过 __getitems__ 实现的批量提取协议:如果您的数据集定义了此方法,提取器将使用索引列表调用它一次,而不是为每个样本重复调用 __getitem__

工作原理

  • 默认提取器执行:[dataset[idx] for idx in batch_indices]

  • 使用 __getitems__dataset.__getitems__(batch_indices)

何时会有帮助

  • 当每个样本的开销很大时(例如打开连接、解析标头、获取锁)

  • 当数据可以更高效地批量获取时(例如用一个 SQL 查询获取 N 行而不是执行 N 次查询,或矢量化生成张量)

  • 当变换具有固定的启动成本,可以分摊到整个批次中时

预期签名

def __getitems__(self, indices: list[int]) -> list:
    # Fetch all items at once and return as a list
    ...

我们的 SyntheticDatasetBatched 实现了 __getitems__,以便在一次矢量化调用中生成整个批次(具有单一的分摊延迟),而不是 N 次单独调用,每次都有各自的延迟。让我们将此添加到我们的累积配置中

benchmark_dataset_batched = SyntheticDatasetBatched(
    size=512, feature_dim=224, transform_delay=0.005
)

batched_loader = DataLoader(
    benchmark_dataset_batched,
    batch_size=32,
    shuffle=True,
    num_workers=4,
    prefetch_factor=2,
    pin_memory=torch.cuda.is_available(),
    persistent_workers=True,
)

print("\n+ __getitems__ (batched dataset fetching):")
batched_time, batched_loss = train_and_benchmark(batched_loader)
print(f"  Time: {batched_time:.4f}s | Loss: {batched_loss:.4f}")
print(
    f"  Speedup vs baseline: {baseline_time / batched_time:.2f}x | vs previous: {prev_time / batched_time:.2f}x"
)
prev_time = batched_time
+ __getitems__ (batched dataset fetching):
  Time: 2.8529s | Loss: 2.3205
  Speedup vs baseline: 11.51x | vs previous: 3.06x

in_order 参数#

默认情况下 (in_order=True),DataLoader 按与数据集索引相同的顺序返回批次。这需要缓存从工作进程乱序到达的批次。

何时考虑 ``in_order=False``

  • 当您不需要确定性顺序时(例如不设置检查点时)

  • 当您观察到由于批次缓存导致的训练峰值时

  • 当最大化吞吐量比可重复性更重要时

注意

in_order=False 可能不会增加平均吞吐量,但它可以减少方差,并消除当一个工作进程比其他进程慢时由队头阻塞引起的偶尔缓慢批次。

快照频率 (snapshot_every_n_steps)#

当使用 torchdata 的 StatefulDataLoader(用于设置检查点)时,snapshot_every_n_steps 参数控制 DataLoader 状态保存的频率。

权衡

  • 较高频率(较小的 n): 开销较多,但作业失败时数据丢失较少

  • 较低频率(较大的 n): 开销较少,但恢复时重放的样本较多

根据您的容错要求和重新处理数据的成本进行选择。

共享内存和 set_sharing_strategy#

当使用 num_workers > 0 的多进程时,PyTorch 需要在工作进程和主进程之间传输张量。共享策略决定了如何完成这一过程。

可用策略

PyTorch 通过 torch.multiprocessing.set_sharing_strategy() 提供两种共享策略

  1. file_descriptor (大多数系统的默认策略)

    • 使用文件描述符共享内存

    • 受系统打开文件描述符限制 (ulimit -n) 的约束

    • 对于小张量更高效

  2. file_system

    • 使用 /dev/shm 中的共享内存文件

    • 不受文件描述符数量限制

    • 更适合大量张量

    • 变换开销低

如何更改策略

import torch.multiprocessing as mp

# Switch to file_system strategy
# Must be called before creating any DataLoader workers
mp.set_sharing_strategy('file_system')

选择正确的策略

场景

推荐策略

原因

大量小张量

file_descriptor (默认)

每个张量的开销较低

少量大张量

file_system

避免文件描述符限制

高 num_workers

file_system

避免文件描述符耗尽

警告

set_sharing_strategy() 必须在创建任何 num_workers > 0 的 DataLoader 之前 调用。在此之后更改对现有工作进程没有影响。

处理共享内存不足 (/dev/shm)#

当使用 num_workers > 0 时,PyTorch 使用共享内存 (/dev/shm) 在工作进程和主进程之间高效传递数据。如果您遇到如下错误:

RuntimeError: unable to open shared memory object </torch_xxx>
ERROR: Unexpected bus error encountered in worker

这通常意味着您已耗尽了分配的共享内存。

解决方案

1. 增加 /dev/shm 大小(如果可以)

2. 减少 DataLoader 的内存压力

# Reduce number of workers
DataLoader(dataset, num_workers=2)  # Instead of 8+

# Reduce prefetch factor
DataLoader(dataset, num_workers=4, prefetch_factor=1)  # Instead of 2

# Use smaller batch sizes
DataLoader(dataset, batch_size=16)  # Smaller batches = less shm

3. 切换共享策略

import torch.multiprocessing as mp
mp.set_sharing_strategy('file_system')

4. 清理泄露的共享内存

# List shared memory segments
ls -la /dev/shm/

# Remove orphaned PyTorch segments (be careful!)
rm /dev/shm/torch_*

注意

如果工作进程在没有正确清理的情况下崩溃,可能会发生共享内存泄露。

最终总结#

这是应用于我们训练循环的每项优化的累积效果。每一行都包含之前行的所有优化

配置

对比基准

对比上一步

基准 (num_workers=0, 无内存固定)

1.00x

+ num_workers=4, prefetch_factor=2

~2.7x

~2.7x

+ pin_memory=True

~2.8x

~1.0x

+ persistent_workers=True

~3.7x

~1.3x

+ DataPrefetcher (H2D 重叠)

~3.6x

~1.0x

+ __getitems__ (批量提取)

~10x

~2.9x

注意

这些结果是基于我们的基准数据集。实际加速效果将取决于您的具体工作负载、硬件、数据集大小和变换复杂度。

总结与最佳实践#

  1. 从中等批次大小开始 (32-128),如果显存允许则逐步扩大。

  2. 当变换开销大时,使用 ``num_workers > 0``。从 2-4 个工作进程开始,根据内存容量增加。越高并不总是越好。

  3. 使用加速器时,启用 ``pin_memory=True``

  4. 使用 ``persistent_workers=True`` 以避免 epoch 之间的工作进程重启开销。

  5. 分析您的流水线 以识别数据集访问、变换等过程中的 CPU 瓶颈。

  6. 为 GPU 工作负载实现数据预取,以将数据传输与计算重叠。

  7. 遇到文件描述符限制时,使用 ``file_system`` 共享策略

结论#

在本教程中,我们学习了如何逐步优化 PyTorch 数据加载流水线 —— 从原始的单进程基准到使用多进程工作进程、固定内存、持久化工作进程、基于 CUDA 流的预取以及使用 __getitems__ 的批量数据集提取的全面优化配置。每项优化都针对不同的瓶颈,它们结合在一起可以使吞吐量提高一个数量级。这些应被视为最佳实践,具体性能取决于特定的工作负载。

其他资源#

脚本总运行时间: (1 分 25.251 秒)