Skip to content

Commit 9502856

Browse files
hx235facebook-github-bot
authored andcommitted
Add missing range conflict check between file ingestion and RefitLevel() (#10988)
Summary: **Context:** File ingestion never checks whether the key range it acts on overlaps with an ongoing RefitLevel() (used in `CompactRange()` with `change_level=true`). That's because RefitLevel() doesn't register and make its key range known to file ingestion. Though it checks overlapping with other compactions by https://github.com/facebook/rocksdb/blob/7.8.fb/db/external_sst_file_ingestion_job.cc#L998. RefitLevel() (used in `CompactRange()` with `change_level=true`) doesn't check whether the key range it acts on overlaps with an ongoing file ingestion. That's because file ingestion does not register and make its key range known to other compactions. - Note that non-refitlevel-compaction (e.g, manual compaction w/o RefitLevel() or general compaction) also does not check key range overlap with ongoing file ingestion for the same reason. - But it's fine. Credited to cbi42's discovery, `WaitForIngestFile` was called by background and foreground compactions. They were introduced in 0f88160, 5c64fb6 and 87dfc1d. - Regardless, this PR registers file ingestion like a compaction is a general approach that will also add range conflict check between file ingestion and non-refitlevel-compaction, though it has not been the issue motivated this PR. Above are bugs resulting in two bad consequences: - If file ingestion and RefitLevel() creates files in the same level, then range-overlapped files will be created at that level and caught as corruption by `force_consistency_checks=true` - If file ingestion and RefitLevel() creates file in different levels, then with one further compaction on the ingested file, it can result in two same keys both with seqno 0 in two different levels. Then with iterator's [optimization](https://github.com/facebook/rocksdb/blame/c62f3221698fd273b673d4f7e54eabb8329a4369/db/db_iter.cc#L342-L343) that assumes no two same keys both with seqno 0, it will either break this assertion in debug build or, even worst, return value of this same key for the key after it, which is the wrong value to return, in release build. Therefore we decide to introduce range conflict check for file ingestion and RefitLevel() inspired from the existing range conflict check among compactions. **Summary:** - Treat file ingestion job and RefitLevel() as `Compaction` of new compaction reasons: `CompactionReason::kExternalSstIngestion` and `CompactionReason::kRefitLevel` and register/unregister them. File ingestion is treated as compaction from L0 to different levels and RefitLevel() as compaction from source level to target level. - Check for `RangeOverlapWithCompaction` with other ongoing compactions, `RegisterCompaction()` on this "compaction" before changing the LSM state in `VersionStorageInfo`, and `UnregisterCompaction()` after changing. - Replace scattered fixes (0f88160, 5c64fb6 and 87dfc1d.) that prevents overlapping between file ingestion and non-refit-level compaction with this fix cuz those practices are easy to overlook. - Misc: logic cleanup, see PR comments Pull Request resolved: #10988 Test Plan: - New unit test `DBCompactionTestWithOngoingFileIngestionParam*` that failed pre-fix and passed afterwards. - Made compatible with existing tests, see PR comments - make check - [Ongoing] Stress test rehearsal with normal value and aggressive CI value #10761 Reviewed By: cbi42 Differential Revision: D41535685 Pulled By: hx235 fbshipit-source-id: 549833a577ba1496d20a870583d4caa737da1258
1 parent cc6f323 commit 9502856

19 files changed

Lines changed: 516 additions & 184 deletions

HISTORY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* Fixed a bug in LockWAL() leading to re-locking mutex (#11020).
1616
* Fixed a heap use after free bug in async scan prefetching when the scan thread and another thread try to read and load the same seek block into cache.
1717
* Fixed a heap use after free in async scan prefetching if dictionary compression is enabled, in which case sync read of the compression dictionary gets mixed with async prefetching
18+
* Fixed a data race bug of `CompactRange()` under `change_level=true` acts on overlapping range with an ongoing file ingestion for level compaction. This will either result in overlapping file ranges corruption at a certain level caught by `force_consistency_checks=true` or protentially two same keys both with seqno 0 in two different levels (i.e, new data ends up in lower/older level). The latter will be caught by assertion in debug build but go silently and result in read returning wrong result in release build. This fix is general so it also replaced previous fixes to a similar problem for `CompactFiles()` (#4665), general `CompactRange()` and auto compaction (commit 5c64fb6 and 87dfc1d).
1819

1920
### New Features
2021
* When an SstPartitionerFactory is configured, CompactRange() now automatically selects for compaction any files overlapping a partition boundary that is in the compaction range, even if no actual entries are in the requested compaction range. With this feature, manual compaction can be used to (re-)establish SST partition points when SstPartitioner changes, without a full compaction.

db/column_family.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,6 +1218,7 @@ Compaction* ColumnFamilyData::CompactRange(
12181218
if (result != nullptr) {
12191219
result->SetInputVersion(current_);
12201220
}
1221+
TEST_SYNC_POINT("ColumnFamilyData::CompactRange:Return");
12211222
return result;
12221223
}
12231224

db/compaction/compaction.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,19 @@ Compaction::Compaction(
235235
inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))),
236236
grandparents_(std::move(_grandparents)),
237237
score_(_score),
238-
bottommost_level_(IsBottommostLevel(output_level_, vstorage, inputs_)),
238+
bottommost_level_(
239+
// For simplicity, we don't support the concept of "bottommost level"
240+
// with
241+
// `CompactionReason::kExternalSstIngestion` and
242+
// `CompactionReason::kRefitLevel`
243+
(_compaction_reason == CompactionReason::kExternalSstIngestion ||
244+
_compaction_reason == CompactionReason::kRefitLevel)
245+
? false
246+
: IsBottommostLevel(output_level_, vstorage, inputs_)),
239247
is_full_compaction_(IsFullCompaction(vstorage, inputs_)),
240248
is_manual_compaction_(_manual_compaction),
241249
trim_ts_(_trim_ts),
242250
is_trivial_move_(false),
243-
244251
compaction_reason_(_compaction_reason),
245252
notify_on_compaction_completion_(false),
246253
enable_blob_garbage_collection_(
@@ -255,8 +262,15 @@ Compaction::Compaction(
255262
_blob_garbage_collection_age_cutoff > 1
256263
? mutable_cf_options()->blob_garbage_collection_age_cutoff
257264
: _blob_garbage_collection_age_cutoff),
258-
penultimate_level_(EvaluatePenultimateLevel(
259-
vstorage, immutable_options_, start_level_, output_level_)) {
265+
penultimate_level_(
266+
// For simplicity, we don't support the concept of "penultimate level"
267+
// with `CompactionReason::kExternalSstIngestion` and
268+
// `CompactionReason::kRefitLevel`
269+
_compaction_reason == CompactionReason::kExternalSstIngestion ||
270+
_compaction_reason == CompactionReason::kRefitLevel
271+
? Compaction::kInvalidLevel
272+
: EvaluatePenultimateLevel(vstorage, immutable_options_,
273+
start_level_, output_level_)) {
260274
MarkFilesBeingCompacted(true);
261275
if (is_manual_compaction_) {
262276
compaction_reason_ = CompactionReason::kManualCompaction;

db/compaction/compaction_job.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
9999
return "ForcedBlobGC";
100100
case CompactionReason::kRoundRobinTtl:
101101
return "RoundRobinTtl";
102+
case CompactionReason::kRefitLevel:
103+
return "RefitLevel";
102104
case CompactionReason::kNumOfReasons:
103105
// fall through
104106
default:

db/compaction/compaction_picker.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,11 @@ void CompactionPicker::RegisterCompaction(Compaction* c) {
11261126
c->output_level() == 0 ||
11271127
!FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(),
11281128
c->GetPenultimateLevel()));
1129-
if (c->start_level() == 0 ||
1129+
// CompactionReason::kExternalSstIngestion's start level is just a placeholder
1130+
// number without actual meaning as file ingestion technically does not have
1131+
// an input level like other compactions
1132+
if ((c->start_level() == 0 &&
1133+
c->compaction_reason() != CompactionReason::kExternalSstIngestion) ||
11301134
ioptions_.compaction_style == kCompactionStyleUniversal) {
11311135
level0_compactions_in_progress_.insert(c);
11321136
}

db/compaction/compaction_picker_level.cc

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -447,21 +447,21 @@ bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() {
447447
compaction_inputs_.push_back(output_level_inputs_);
448448
}
449449

450+
// In some edge cases we could pick a compaction that will be compacting
451+
// a key range that overlap with another running compaction, and both
452+
// of them have the same output level. This could happen if
453+
// (1) we are running a non-exclusive manual compaction
454+
// (2) AddFile ingest a new file into the LSM tree
455+
// We need to disallow this from happening.
456+
if (compaction_picker_->FilesRangeOverlapWithCompaction(
457+
compaction_inputs_, output_level_,
458+
Compaction::EvaluatePenultimateLevel(
459+
vstorage_, ioptions_, start_level_, output_level_))) {
460+
// This compaction output could potentially conflict with the output
461+
// of a currently running compaction, we cannot run it.
462+
return false;
463+
}
450464
if (!is_l0_trivial_move_) {
451-
// In some edge cases we could pick a compaction that will be compacting
452-
// a key range that overlap with another running compaction, and both
453-
// of them have the same output level. This could happen if
454-
// (1) we are running a non-exclusive manual compaction
455-
// (2) AddFile ingest a new file into the LSM tree
456-
// We need to disallow this from happening.
457-
if (compaction_picker_->FilesRangeOverlapWithCompaction(
458-
compaction_inputs_, output_level_,
459-
Compaction::EvaluatePenultimateLevel(
460-
vstorage_, ioptions_, start_level_, output_level_))) {
461-
// This compaction output could potentially conflict with the output
462-
// of a currently running compaction, we cannot run it.
463-
return false;
464-
}
465465
compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_,
466466
output_level_inputs_, &grandparents_);
467467
}

db/db_bloom_filter_test.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,7 @@ TEST_P(ChargeFilterConstructionTestWithParam, Basic) {
12291229
*
12301230
* The test is designed in a way such that the reservation for (p1 - b')
12311231
* will trigger at least another dummy entry insertion
1232-
* (or equivelantly to saying, creating another peak).
1232+
* (or equivalently to saying, creating another peak).
12331233
*
12341234
* kStandard128Ribbon + FullFilter +
12351235
* detect_filter_construct_corruption
@@ -2618,8 +2618,7 @@ TEST_F(DBBloomFilterTest, OptimizeFiltersForHits) {
26182618
BottommostLevelCompaction::kSkip;
26192619
compact_options.change_level = true;
26202620
compact_options.target_level = 7;
2621-
ASSERT_TRUE(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)
2622-
.IsNotSupported());
2621+
ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
26232622

26242623
ASSERT_EQ(trivial_move, 1);
26252624
ASSERT_EQ(non_trivial_move, 0);

db/db_compaction_test.cc

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6245,6 +6245,231 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
62456245
Close();
62466246
}
62476247

6248+
class DBCompactionTestWithOngoingFileIngestionParam
6249+
: public DBCompactionTest,
6250+
public testing::WithParamInterface<std::string> {
6251+
public:
6252+
DBCompactionTestWithOngoingFileIngestionParam() : DBCompactionTest() {
6253+
compaction_path_to_test_ = GetParam();
6254+
}
6255+
void SetupOptions() {
6256+
options_ = CurrentOptions();
6257+
options_.create_if_missing = true;
6258+
6259+
if (compaction_path_to_test_ == "RefitLevelCompactRange") {
6260+
options_.num_levels = 7;
6261+
} else {
6262+
options_.num_levels = 3;
6263+
}
6264+
options_.compaction_style = CompactionStyle::kCompactionStyleLevel;
6265+
if (compaction_path_to_test_ == "AutoCompaction") {
6266+
options_.disable_auto_compactions = false;
6267+
options_.level0_file_num_compaction_trigger = 1;
6268+
} else {
6269+
options_.disable_auto_compactions = true;
6270+
}
6271+
}
6272+
6273+
void PauseCompactionThread() {
6274+
sleeping_task_.reset(new test::SleepingBackgroundTask());
6275+
env_->SetBackgroundThreads(1, Env::LOW);
6276+
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
6277+
sleeping_task_.get(), Env::Priority::LOW);
6278+
sleeping_task_->WaitUntilSleeping();
6279+
}
6280+
6281+
void ResumeCompactionThread() {
6282+
if (sleeping_task_) {
6283+
sleeping_task_->WakeUp();
6284+
sleeping_task_->WaitUntilDone();
6285+
}
6286+
}
6287+
6288+
void SetupFilesToForceFutureFilesIngestedToCertainLevel() {
6289+
SstFileWriter sst_file_writer(EnvOptions(), options_);
6290+
std::string dummy = dbname_ + "/dummy.sst";
6291+
ASSERT_OK(sst_file_writer.Open(dummy));
6292+
ASSERT_OK(sst_file_writer.Put("k2", "dummy"));
6293+
ASSERT_OK(sst_file_writer.Finish());
6294+
ASSERT_OK(db_->IngestExternalFile({dummy}, IngestExternalFileOptions()));
6295+
// L2 is made to contain a file overlapped with files to be ingested in
6296+
// later steps on key "k2". This will force future files ingested to L1 or
6297+
// above.
6298+
ASSERT_EQ("0,0,1", FilesPerLevel(0));
6299+
}
6300+
6301+
void SetupSyncPoints() {
6302+
if (compaction_path_to_test_ == "AutoCompaction") {
6303+
SyncPoint::GetInstance()->SetCallBack(
6304+
"ExternalSstFileIngestionJob::Run", [&](void*) {
6305+
SyncPoint::GetInstance()->LoadDependency(
6306+
{{"DBImpl::BackgroundCompaction():AfterPickCompaction",
6307+
"VersionSet::LogAndApply:WriteManifest"}});
6308+
});
6309+
} else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
6310+
SyncPoint::GetInstance()->SetCallBack(
6311+
"ExternalSstFileIngestionJob::Run", [&](void*) {
6312+
SyncPoint::GetInstance()->LoadDependency(
6313+
{{"ColumnFamilyData::CompactRange:Return",
6314+
"VersionSet::LogAndApply:WriteManifest"}});
6315+
});
6316+
} else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
6317+
SyncPoint::GetInstance()->SetCallBack(
6318+
"ExternalSstFileIngestionJob::Run", [&](void*) {
6319+
SyncPoint::GetInstance()->LoadDependency(
6320+
{{"DBImpl::CompactRange:PostRefitLevel",
6321+
"VersionSet::LogAndApply:WriteManifest"}});
6322+
});
6323+
} else if (compaction_path_to_test_ == "CompactFiles") {
6324+
SyncPoint::GetInstance()->SetCallBack(
6325+
"ExternalSstFileIngestionJob::Run", [&](void*) {
6326+
SyncPoint::GetInstance()->LoadDependency(
6327+
{{"DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles",
6328+
"VersionSet::LogAndApply:WriteManifest"}});
6329+
});
6330+
} else {
6331+
assert(false);
6332+
}
6333+
SyncPoint::GetInstance()->LoadDependency(
6334+
{{"ExternalSstFileIngestionJob::Run", "PreCompaction"}});
6335+
SyncPoint::GetInstance()->EnableProcessing();
6336+
}
6337+
6338+
void RunCompactionOverlappedWithFileIngestion() {
6339+
if (compaction_path_to_test_ == "AutoCompaction") {
6340+
TEST_SYNC_POINT("PreCompaction");
6341+
ResumeCompactionThread();
6342+
// Without proper range conflict check,
6343+
// this would have been `Status::Corruption` about overlapping ranges
6344+
Status s = dbfull()->TEST_WaitForCompact();
6345+
EXPECT_OK(s);
6346+
} else if (compaction_path_to_test_ == "NonRefitLevelCompactRange") {
6347+
CompactRangeOptions cro;
6348+
cro.change_level = false;
6349+
std::string start_key = "k1";
6350+
Slice start(start_key);
6351+
std::string end_key = "k4";
6352+
Slice end(end_key);
6353+
TEST_SYNC_POINT("PreCompaction");
6354+
// Without proper range conflict check,
6355+
// this would have been `Status::Corruption` about overlapping ranges
6356+
Status s = dbfull()->CompactRange(cro, &start, &end);
6357+
EXPECT_OK(s);
6358+
} else if (compaction_path_to_test_ == "RefitLevelCompactRange") {
6359+
CompactRangeOptions cro;
6360+
cro.change_level = true;
6361+
cro.target_level = 5;
6362+
std::string start_key = "k1";
6363+
Slice start(start_key);
6364+
std::string end_key = "k4";
6365+
Slice end(end_key);
6366+
TEST_SYNC_POINT("PreCompaction");
6367+
Status s = dbfull()->CompactRange(cro, &start, &end);
6368+
// Without proper range conflict check,
6369+
// this would have been `Status::Corruption` about overlapping ranges
6370+
// To see this, remove the fix AND replace
6371+
// `DBImpl::CompactRange:PostRefitLevel` in sync point dependency with
6372+
// `DBImpl::ReFitLevel:PostRegisterCompaction`
6373+
EXPECT_TRUE(s.IsNotSupported());
6374+
EXPECT_TRUE(s.ToString().find("some ongoing compaction's output") !=
6375+
std::string::npos);
6376+
} else if (compaction_path_to_test_ == "CompactFiles") {
6377+
ColumnFamilyMetaData cf_meta_data;
6378+
db_->GetColumnFamilyMetaData(&cf_meta_data);
6379+
ASSERT_EQ(cf_meta_data.levels[0].files.size(), 1);
6380+
std::vector<std::string> input_files;
6381+
for (const auto& file : cf_meta_data.levels[0].files) {
6382+
input_files.push_back(file.name);
6383+
}
6384+
TEST_SYNC_POINT("PreCompaction");
6385+
Status s = db_->CompactFiles(CompactionOptions(), input_files, 1);
6386+
// Without proper range conflict check,
6387+
// this would have been `Status::Corruption` about overlapping ranges
6388+
EXPECT_TRUE(s.IsAborted());
6389+
EXPECT_TRUE(
6390+
s.ToString().find(
6391+
"A running compaction is writing to the same output level") !=
6392+
std::string::npos);
6393+
} else {
6394+
assert(false);
6395+
}
6396+
}
6397+
6398+
void DisableSyncPoints() {
6399+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
6400+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
6401+
}
6402+
6403+
protected:
6404+
std::string compaction_path_to_test_;
6405+
Options options_;
6406+
std::shared_ptr<test::SleepingBackgroundTask> sleeping_task_;
6407+
};
6408+
6409+
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithOngoingFileIngestionParam,
6410+
DBCompactionTestWithOngoingFileIngestionParam,
6411+
::testing::Values("AutoCompaction",
6412+
"NonRefitLevelCompactRange",
6413+
"RefitLevelCompactRange",
6414+
"CompactFiles"));
6415+
6416+
TEST_P(DBCompactionTestWithOngoingFileIngestionParam, RangeConflictCheck) {
6417+
SetupOptions();
6418+
DestroyAndReopen(options_);
6419+
6420+
if (compaction_path_to_test_ == "AutoCompaction") {
6421+
PauseCompactionThread();
6422+
}
6423+
6424+
if (compaction_path_to_test_ != "RefitLevelCompactRange") {
6425+
SetupFilesToForceFutureFilesIngestedToCertainLevel();
6426+
}
6427+
6428+
// Create s1
6429+
ASSERT_OK(Put("k1", "v"));
6430+
ASSERT_OK(Put("k4", "v"));
6431+
ASSERT_OK(Flush());
6432+
if (compaction_path_to_test_ == "RefitLevelCompactRange") {
6433+
MoveFilesToLevel(6 /* level */);
6434+
ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel(0));
6435+
} else {
6436+
ASSERT_EQ("1,0,1", FilesPerLevel(0));
6437+
}
6438+
6439+
// To coerce following sequence of events
6440+
// Timeline Thread 1 (Ingest s2) Thread 2 (Compact s1)
6441+
// t0 | Decide to output to Lk
6442+
// t1 | Release lock in LogAndApply()
6443+
// t2 | Acquire lock
6444+
// t3 | Decides to compact to Lk
6445+
// | Expected to fail due to range
6446+
// | conflict check with file
6447+
// | ingestion
6448+
// t4 | Release lock in LogAndApply()
6449+
// t5 | Acquire lock again and finish
6450+
// t6 | Acquire lock again and finish
6451+
SetupSyncPoints();
6452+
6453+
// Ingest s2
6454+
port::Thread thread1([&] {
6455+
SstFileWriter sst_file_writer(EnvOptions(), options_);
6456+
std::string s2 = dbname_ + "/ingested_s2.sst";
6457+
ASSERT_OK(sst_file_writer.Open(s2));
6458+
ASSERT_OK(sst_file_writer.Put("k2", "v2"));
6459+
ASSERT_OK(sst_file_writer.Put("k3", "v2"));
6460+
ASSERT_OK(sst_file_writer.Finish());
6461+
ASSERT_OK(db_->IngestExternalFile({s2}, IngestExternalFileOptions()));
6462+
});
6463+
6464+
// Compact s1. Without proper range conflict check,
6465+
// this will encounter overlapping file corruption.
6466+
port::Thread thread2([&] { RunCompactionOverlappedWithFileIngestion(); });
6467+
6468+
thread1.join();
6469+
thread2.join();
6470+
DisableSyncPoints();
6471+
}
6472+
62486473
TEST_F(DBCompactionTest, ConsistencyFailTest) {
62496474
Options options = CurrentOptions();
62506475
options.force_consistency_checks = true;

0 commit comments

Comments
 (0)