Skip to content

Commit be10ffb

Browse files
author
lixinyu
committed
Update on "DataPipe naming convension update"
This PR is to change the naming convention of what previously called 'dataset'. Instead, we named them DataPipe. This PR is specifically for ListDirFilesIterableDataset and LoadFilesFromDiskIterableDataset. And we provide the following way to import them. 1. partial import as a `datapipes` module ``` import torch.utils.data.datapipes as dp dp.iter.ListDirFiles dp.iter.LoadFilesFromDisk ``` 2. direct import the DataPipe class ``` from torch.utils.data.datapipes import ListDirFiles, LoadFilesFromDisk ``` This PR also added the support for recursively scanning the folders. Next step will be Tar/Zip/Gz dataset -> datapipe Differential Revision: [D26120628](https://our.internmc.facebook.com/intern/diff/D26120628) [ghstack-poisoned]
2 parents 48c97f9 + 8b27c2c commit be10ffb

19 files changed

Lines changed: 475 additions & 133 deletions

File tree

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
name: 'Close stale pull requests'
2+
on:
3+
schedule:
4+
- cron: '30 1 * * *'
5+
workflow_dispatch:
6+
7+
jobs:
8+
stale:
9+
if: ${{ github.repository_owner == 'pytorch' }}
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/stale@v3
13+
with:
14+
stale-pr-message: >
15+
Looks like this PR hasn't been updated in a while! Going to go ahead and mark this as `stale`.
16+
Feel free to update / remove the `stale` label if you feel this is a mistake
17+
`stale` pull requests will automatically be closed 30 days after being marked `stale`
18+
exempt-pr-labels: "no-stale,open source,high priority"
19+
days-before-stale: 60
20+
days-before-close: 90
21+
stale-open-source:
22+
if: ${{ github.repository_owner == 'pytorch' }}
23+
runs-on: ubuntu-latest
24+
steps:
25+
- uses: actions/stale@v3
26+
with:
27+
stale-pr-message: >
28+
Looks like this PR hasn't been updated in a while! Going to go ahead and mark this as `stale`.
29+
Feel free to update / remove the `stale` label if you feel this is a mistake
30+
`stale` pull requests will automatically be closed 30 days after being marked `stale`
31+
exempt-pr-labels: "no-stale,high priority"
32+
only-labels: "open source"
33+
days-before-stale: 150
34+
days-before-close: 180

aten/src/ATen/native/BatchLinearAlgebraKernel.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,12 @@ Tensor& orgqr_kernel_impl(Tensor& result, const Tensor& tau, Tensor& infos, int6
101101
REGISTER_ARCH_DISPATCH(eig_stub, DEFAULT, &eig_kernel_impl);
102102
REGISTER_AVX_DISPATCH(eig_stub, &eig_kernel_impl);
103103
REGISTER_AVX2_DISPATCH(eig_stub, &eig_kernel_impl);
104+
REGISTER_VSX_DISPATCH(eig_stub, &eig_kernel_impl);
104105

105106
REGISTER_ARCH_DISPATCH(orgqr_stub, DEFAULT, &orgqr_kernel_impl);
106107
REGISTER_AVX_DISPATCH(orgqr_stub, &orgqr_kernel_impl);
107108
REGISTER_AVX2_DISPATCH(orgqr_stub, &orgqr_kernel_impl);
109+
REGISTER_VSX_DISPATCH(orgqr_stub, &orgqr_kernel_impl);
110+
108111

109112
}} // namespace at::native

benchmarks/distributed/pipeline/pipe.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
import os
44
import time
55

6-
from .benchmark_dataset import BenchmarkLMDataset, collate_sentences_lm
6+
from benchmark_dataset import BenchmarkLMDataset, collate_sentences_lm
77
import torch
88
from torch.distributed import rpc
99
import torch.nn as nn
1010
from torch.utils.data import DataLoader
1111

1212
from torch.distributed.pipeline.sync import Pipe
13-
from torch.testing._internal.distributed.pipeline.utils import convert_to_balance
13+
from torch.distributed.pipeline.sync.utils import partition_model
1414
from torch.optim import Adam # type: ignore
1515

1616
def sizeof_fmt(num, suffix='B'):
@@ -248,7 +248,7 @@ def bench_single_process(args):
248248
model = blob["model"]
249249

250250
balance = generate_balance(num_devices, len(model))
251-
model = convert_to_balance(model, balance)
251+
model = partition_model(model, balance)
252252
p = Pipe(
253253
model, chunks=args.chunks, checkpoint=args.checkpoint
254254
)

c10/cuda/CUDACachingAllocator.cpp

Lines changed: 72 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ namespace {
5757

5858
using stream_set = std::unordered_set<cuda::CUDAStream>;
5959

60-
constexpr size_t kMinBlockSize = 512; // all sizes are rounded to at least 512 bytes
61-
constexpr size_t kSmallSize = 1048576; // largest "small" allocation is 1 MiB
62-
constexpr size_t kSmallBuffer = 2097152; // "small" allocations are packed in 2 MiB blocks
63-
constexpr size_t kLargeBuffer = 20971520; // "large" allocations may be packed in 20 MiB blocks
60+
constexpr size_t kMinBlockSize = 512; // all sizes are rounded to at least 512 bytes
61+
constexpr size_t kSmallSize = 1048576; // largest "small" allocation is 1 MiB
62+
constexpr size_t kSmallBuffer = 2097152; // "small" allocations are packed in 2 MiB blocks
63+
constexpr size_t kLargeBuffer = 20971520; // "large" allocations may be packed in 20 MiB blocks
6464
constexpr size_t kMinLargeAlloc = 10485760; // allocations between 1 and 10 MiB may use kLargeBuffer
65-
constexpr size_t kRoundLarge = 2097152; // round up large allocations to 2 MiB
65+
constexpr size_t kRoundLarge = 2097152; // round up large allocations to 2 MiB
6666

6767
typedef std::bitset<static_cast<size_t>(StatType::NUM_TYPES)> StatTypes;
6868

@@ -242,56 +242,57 @@ class DeviceCachingAllocator {
242242
// Free all non-split cached blocks and retry alloc.
243243
|| (free_cached_blocks() && alloc_block(params, true));
244244

245-
TORCH_INTERNAL_ASSERT((!block_found && params.err != cudaSuccess) || params.block);
246245
if (!block_found) {
247-
if (params.err == cudaErrorMemoryAllocation) {
248-
size_t device_free;
249-
size_t device_total;
250-
C10_CUDA_CHECK(cudaMemGetInfo(&device_free, &device_total));
251-
std::string allowed_info;
252-
253-
if (set_fraction) {
254-
allowed_info = format_size(allowed_memory_maximum) + " allowed; ";
255-
}
246+
// For any error code other than cudaErrorMemoryAllocation,
247+
// alloc_block should have thrown an exception already.
248+
TORCH_INTERNAL_ASSERT(params.err == cudaErrorMemoryAllocation);
256249

257-
stats.num_ooms += 1;
258-
259-
// "total capacity": total global memory on GPU
260-
// "allowed": memory is allowed to use, which set by fraction.
261-
// "already allocated": memory allocated by the program using the
262-
// caching allocator
263-
// "free": free memory as reported by the CUDA API
264-
// "cached": memory held by the allocator but not used by the program
265-
//
266-
// The "allocated" amount does not include memory allocated outside
267-
// of the caching allocator, such as memory allocated by other programs
268-
// or memory held by the driver.
269-
//
270-
// The sum of "allocated" + "free" + "cached" may be less than the
271-
// total capacity due to memory held by the driver and usage by other
272-
// programs.
273-
//
274-
// Note that at this point free_cached_blocks has already returned all
275-
// possible "cached" memory to the driver. The only remaining "cached"
276-
// memory is split from a larger block that is partially in-use.
277-
TORCH_CHECK_WITH(CUDAOutOfMemoryError, false,
278-
"CUDA out of memory. Tried to allocate ", format_size(alloc_size),
279-
" (GPU ", device, "; ",
280-
format_size(device_total), " total capacity; ",
281-
format_size(stats.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
282-
" already allocated; ",
283-
format_size(device_free), " free; ",
284-
allowed_info,
285-
format_size(stats.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
286-
" reserved in total by PyTorch)");
287-
} else {
288-
C10_CUDA_CHECK(params.err);
250+
size_t device_free;
251+
size_t device_total;
252+
C10_CUDA_CHECK(cudaMemGetInfo(&device_free, &device_total));
253+
std::string allowed_info;
254+
255+
if (set_fraction) {
256+
allowed_info = format_size(allowed_memory_maximum) + " allowed; ";
289257
}
258+
259+
stats.num_ooms += 1;
260+
261+
// "total capacity": total global memory on GPU
262+
// "allowed": memory is allowed to use, which set by fraction.
263+
// "already allocated": memory allocated by the program using the
264+
// caching allocator
265+
// "free": free memory as reported by the CUDA API
266+
// "cached": memory held by the allocator but not used by the program
267+
//
268+
// The "allocated" amount does not include memory allocated outside
269+
// of the caching allocator, such as memory allocated by other programs
270+
// or memory held by the driver.
271+
//
272+
// The sum of "allocated" + "free" + "cached" may be less than the
273+
// total capacity due to memory held by the driver and usage by other
274+
// programs.
275+
//
276+
// Note that at this point free_cached_blocks has already returned all
277+
// possible "cached" memory to the driver. The only remaining "cached"
278+
// memory is split from a larger block that is partially in-use.
279+
TORCH_CHECK_WITH(CUDAOutOfMemoryError, false,
280+
"CUDA out of memory. Tried to allocate ", format_size(alloc_size),
281+
" (GPU ", device, "; ",
282+
format_size(device_total), " total capacity; ",
283+
format_size(stats.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
284+
" already allocated; ",
285+
format_size(device_free), " free; ",
286+
allowed_info,
287+
format_size(stats.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].current),
288+
" reserved in total by PyTorch)");
290289
}
291290

291+
TORCH_INTERNAL_ASSERT(params.err == cudaSuccess &&
292+
params.block != nullptr &&
293+
params.block->ptr != nullptr);
292294
Block* block = params.block;
293295
Block* remaining = nullptr;
294-
TORCH_INTERNAL_ASSERT(block);
295296

296297
const bool already_split = block->is_split();
297298
if (should_split(block, size)) {
@@ -647,30 +648,46 @@ class DeviceCachingAllocator {
647648
}
648649

649650
bool alloc_block(AllocParams& p, bool isRetry) {
651+
// Defensively checks for preexisting CUDA error state.
652+
C10_CUDA_CHECK(cudaGetLastError());
653+
650654
size_t size = p.alloc_size;
651655
void* ptr;
652656

653657
if (isRetry) {
654658
stats.num_alloc_retries += 1;
655659
}
660+
656661
if (set_fraction && total_allocated_memory + size > allowed_memory_maximum) {
657662
p.err = cudaErrorMemoryAllocation;
663+
return false;
658664
} else {
659665
p.err = cudaMalloc(&ptr, size);
660-
}
661-
662-
if (p.err != cudaSuccess) {
663-
if (!isRetry || p.err == cudaErrorMemoryAllocation)
664-
cudaGetLastError(); // clear CUDA error
665-
return false;
666+
if (p.err != cudaSuccess) {
667+
if (p.err == cudaErrorMemoryAllocation) {
668+
// If this is the first attempt (!isRetry), we can forgive and clear CUDA's
669+
// internal error state.
670+
// If this is the second attempt (isRetry), malloc's TORCH_CHECK_WITH will take
671+
// over to throw a helpful exception. The user can choose to catch the exception,
672+
// free some stuff in their script, and attempt their allocation again.
673+
// In this case, we can also forgive and clear CUDA's internal error state.
674+
cudaGetLastError();
675+
} else {
676+
// If the error's unrelated to memory allocation, we should throw immediately.
677+
C10_CUDA_CHECK(p.err);
678+
}
679+
return false;
680+
}
666681
}
667682

668683
total_allocated_memory += size;
669684
p.block = new Block(p.device(), p.stream(), size, p.pool, (char*)ptr);
670685
update_stat_array(stats.segment, 1, p.stat_types);
671686
update_stat_array(stats.reserved_bytes, size, p.stat_types);
672687

673-
return (p.block != nullptr);
688+
// p.block came from new, not cudaMalloc. It should not be nullptr here.
689+
TORCH_INTERNAL_ASSERT(p.block != nullptr && p.block->ptr != nullptr);
690+
return true;
674691
}
675692

676693
bool free_cached_blocks()

test/distributed/pipeline/sync/skip/test_gpipe.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from torch.distributed.pipeline.sync import Pipe
1212
from torch.distributed.pipeline.sync.skip import pop, skippable, stash
1313
from torch.distributed.pipeline.sync.skip.portal import PortalBlue, PortalCopy, PortalOrange
14-
from torch.testing._internal.distributed.pipeline.utils import convert_to_balance
14+
from torch.distributed.pipeline.sync.utils import partition_model
1515

1616

1717
@pytest.mark.skipif(not torch.cuda.is_available(), reason="cuda required")
@@ -53,7 +53,7 @@ def forward(self, input):
5353
return output
5454

5555
model = nn.Sequential(Layer1(), Layer2(), Layer3())
56-
model = convert_to_balance(model, balance)
56+
model = partition_model(model, balance)
5757
model = Pipe(model, chunks=3, checkpoint=checkpoint)
5858

5959
in_device = model.devices[0]

test/quantization/test_quantize_fx.py

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,11 @@ def forward(self, x):
347347
qconfig_dict = {'': qconfig}
348348
prepared = prepare_fx(m, qconfig_dict)
349349
quantized = convert_fx(prepared, debug=True)
350-
qparams = (quantized._scale_0, quantized._zero_point_0)
350+
qparams = (quantized._input_scale_0, quantized._input_zero_point_0)
351351
weight_obs = qconfig.weight()
352352
weight_obs(quantized.weight)
353-
ref_qparams = weight_obs.calculate_qparams()
353+
# Get the actual value to avoid tensor size mismatch error, torch.Size([]) vs torch.Size([1])
354+
ref_qparams = (weight_obs.calculate_qparams()[0].item(), weight_obs.calculate_qparams()[1].item())
354355
self.assertEqual(qparams, ref_qparams)
355356

356357
def test_conv_bn_relu(self):
@@ -983,6 +984,46 @@ def forward(self, x):
983984
# make sure it runs
984985
m(torch.randn(2, 1, 3, 3))
985986

987+
def test_qconfig_for_call_func(self):
988+
class Linear(torch.nn.Module):
989+
def __init__(self):
990+
super().__init__()
991+
self.w = torch.ones(5, 5)
992+
self.b = torch.zeros(5)
993+
994+
def forward(self, x):
995+
return torch.nn.functional.linear(x, self.w, self.b)
996+
997+
class M(torch.nn.Module):
998+
def __init__(self):
999+
super().__init__()
1000+
self.mods1 = torch.nn.Sequential(
1001+
Linear(),
1002+
Linear()
1003+
)
1004+
self.mods2 = Linear()
1005+
1006+
def forward(self, x):
1007+
x = self.mods1(x)
1008+
x = self.mods2(x)
1009+
return x
1010+
1011+
model = M().eval()
1012+
qconfig_dict = {"": default_qconfig, "module_name": [("mods2", None)]}
1013+
m = prepare_fx(model, qconfig_dict)
1014+
m(torch.rand(5, 5))
1015+
1016+
m = convert_fx(m)
1017+
node_list = [
1018+
ns.call_function(torch.quantize_per_tensor),
1019+
ns.call_function(torch.ops.quantized.linear),
1020+
ns.call_function(torch.ops.quantized.linear),
1021+
ns.call_method('dequantize'),
1022+
ns.call_function(torch.nn.functional.linear)
1023+
]
1024+
self.checkGraphModuleNodes(m, expected_node_list=node_list)
1025+
m(torch.rand(5, 5))
1026+
9861027
def test_preserve_attributes(self):
9871028
class M(torch.nn.Module):
9881029
def __init__(self):
@@ -1455,6 +1496,68 @@ def test_convtranspose_per_channel_fails_early(self):
14551496
str(context.exception) ==
14561497
'Per channel weight observer is not supported yet for ConvTranspose{n}d.')
14571498

1499+
@skipIfNoFBGEMM
1500+
def test_qparams_buffers(self):
1501+
class Linear(torch.nn.Module):
1502+
def __init__(self):
1503+
super().__init__()
1504+
self.w = torch.ones(5, 5)
1505+
self.b = torch.zeros(5)
1506+
1507+
def forward(self, x):
1508+
return torch.nn.functional.linear(x, self.w, self.b)
1509+
1510+
class M(torch.nn.Module):
1511+
def __init__(self):
1512+
super().__init__()
1513+
self.mods1 = torch.nn.Sequential(
1514+
Linear(),
1515+
Linear()
1516+
)
1517+
self.mods2 = Linear()
1518+
1519+
def forward(self, x):
1520+
x = self.mods1(x)
1521+
x = self.mods2(x)
1522+
return x
1523+
1524+
model = M().eval()
1525+
qconfig_dict = {"": default_qconfig}
1526+
m = prepare_fx(model, qconfig_dict)
1527+
m(torch.rand(5, 5))
1528+
1529+
m = convert_fx(m)
1530+
keys = m.state_dict().keys()
1531+
quant_scale_count = quant_zero_point = scale_count = zero_point_count = 0
1532+
for k in keys:
1533+
if 'input_scale' in k:
1534+
quant_scale_count = quant_scale_count + 1
1535+
elif 'input_zero_point' in k:
1536+
quant_zero_point = quant_zero_point + 1
1537+
elif 'scale' in k:
1538+
scale_count = scale_count + 1
1539+
elif 'zero_point' in k:
1540+
zero_point_count = zero_point_count + 1
1541+
1542+
# Expect each quantized linear op to have a scale and zero point
1543+
self.assertTrue(scale_count == 3, "Expect each quantized linear op to have a scale in state_dict")
1544+
self.assertTrue(zero_point_count == 3, "Expect each quantized linear op to have a zero_point in state_dict")
1545+
# ensure it runs
1546+
m(torch.rand(5, 5))
1547+
# ensure it is scriptable
1548+
scripted = torch.jit.script(m)
1549+
scripted_keys = scripted.state_dict().keys()
1550+
self.assertTrue(scripted_keys == keys, "Expected the scripted model to preserve the state_dict")
1551+
assert hasattr(m, "mods1_0_input_scale_0")
1552+
assert hasattr(m, "mods1_0_input_zero_point_0")
1553+
assert hasattr(m, "mods1_0_scale_0")
1554+
assert hasattr(m, "mods1_0_zero_point_0")
1555+
assert hasattr(m, "mods1_1_scale_0")
1556+
assert hasattr(m, "mods1_1_zero_point_0")
1557+
assert hasattr(m, "mods2_scale_0")
1558+
assert hasattr(m, "mods2_zero_point_0")
1559+
1560+
14581561
@skipIfNoFBGEMM
14591562
class TestQuantizeFxOps(QuantizationTestCase):
14601563
"""Unit tests for individual ops

0 commit comments

Comments
 (0)