Skip to content

Commit d47c871

Browse files
riversand963facebook-github-bot
authored andcommitted
Fix data race to VersionSet::io_status_ (#7034)
Summary: After #6949 , VersionSet::io_status_ can be concurrently accessed by multiple threads without lock, causing tsan test to fail. For example, a bg flush thread resets io_status_ before calling LogAndApply(), while another thread already in the process of LogAndApply() reads io_status_. This is a bug. We do not have to reset io_status_ each time we call LogAndApply(). io_status_ is part of the state of VersionSet, and it indicates the outcome of preceding MANIFEST/CURRENT files IO operations. Its value should be updated only when: 1. MANIFEST/CURRENT files IO fail for the first time. 2. MANIFEST/CURRENT files IO succeed as part of recovering from a prior failure without process restart, e.g. calling Resume(). Test Plan (devserver): COMPILE_WITH_TSAN=1 make check COMPILE_WITH_TSAN=1 make db_test2 ./db_test2 --gtest_filter=DBTest2.CompactionStall Pull Request resolved: #7034 Reviewed By: zhichao-cao Differential Revision: D22247137 Pulled By: riversand963 fbshipit-source-id: 77b83e05390f3ee3cd2d96d3fdd6fe4f225e3216
1 parent b9d51b8 commit d47c871

5 files changed

Lines changed: 28 additions & 34 deletions

File tree

db/compaction/compaction_job.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,6 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
724724
cfd->internal_stats()->AddCompactionStats(
725725
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
726726

727-
versions_->SetIOStatus(IOStatus::OK());
728727
if (status.ok()) {
729728
status = InstallCompactionResults(mutable_cf_options);
730729
}

db/db_impl/db_impl_compaction_flush.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2705,7 +2705,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
27052705
for (const auto& f : *c->inputs(0)) {
27062706
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
27072707
}
2708-
versions_->SetIOStatus(IOStatus::OK());
27092708
status = versions_->LogAndApply(c->column_family_data(),
27102709
*c->mutable_cf_options(), c->edit(),
27112710
&mutex_, directories_.GetDbDir());
@@ -2763,7 +2762,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
27632762
}
27642763
}
27652764

2766-
versions_->SetIOStatus(IOStatus::OK());
27672765
status = versions_->LogAndApply(c->column_family_data(),
27682766
*c->mutable_cf_options(), c->edit(),
27692767
&mutex_, directories_.GetDbDir());

db/memtable_list.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
470470
}
471471

472472
// this can release and reacquire the mutex.
473-
vset->SetIOStatus(IOStatus::OK());
474473
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
475474
db_directory);
476475
*io_s = vset->io_status();

db/version_set.cc

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3878,10 +3878,6 @@ Status VersionSet::ProcessManifestWrites(
38783878
}
38793879
#endif // NDEBUG
38803880

3881-
uint64_t new_manifest_file_size = 0;
3882-
Status s;
3883-
IOStatus io_s;
3884-
38853881
assert(pending_manifest_file_number_ == 0);
38863882
if (!descriptor_log_ ||
38873883
manifest_file_size_ > db_options_->max_manifest_file_size) {
@@ -3911,6 +3907,9 @@ Status VersionSet::ProcessManifestWrites(
39113907
}
39123908
}
39133909

3910+
uint64_t new_manifest_file_size = 0;
3911+
Status s;
3912+
IOStatus io_s;
39143913
{
39153914
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
39163915
mu->Unlock();
@@ -3947,9 +3946,9 @@ Status VersionSet::ProcessManifestWrites(
39473946
std::string descriptor_fname =
39483947
DescriptorFileName(dbname_, pending_manifest_file_number_);
39493948
std::unique_ptr<FSWritableFile> descriptor_file;
3950-
s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
3951-
opt_file_opts);
3952-
if (s.ok()) {
3949+
io_s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
3950+
opt_file_opts);
3951+
if (io_s.ok()) {
39533952
descriptor_file->SetPreallocationBlockSize(
39543953
db_options_->manifest_preallocation_size);
39553954

@@ -3958,7 +3957,10 @@ Status VersionSet::ProcessManifestWrites(
39583957
nullptr, db_options_->listeners));
39593958
descriptor_log_.reset(
39603959
new log::Writer(std::move(file_writer), 0, false));
3961-
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
3960+
s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get(),
3961+
io_s);
3962+
} else {
3963+
s = io_s;
39623964
}
39633965
}
39643966

@@ -3994,7 +3996,6 @@ Status VersionSet::ProcessManifestWrites(
39943996
#endif /* !NDEBUG */
39953997
io_s = descriptor_log_->AddRecord(record);
39963998
if (!io_s.ok()) {
3997-
io_status_ = io_s;
39983999
s = io_s;
39994000
break;
40004001
}
@@ -4005,12 +4006,9 @@ Status VersionSet::ProcessManifestWrites(
40054006
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
40064007
}
40074008
if (!io_s.ok()) {
4008-
io_status_ = io_s;
40094009
s = io_s;
40104010
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
40114011
s.ToString().c_str());
4012-
} else if (io_status_.IsIOError()) {
4013-
io_status_ = io_s;
40144012
}
40154013
}
40164014

@@ -4020,10 +4018,7 @@ Status VersionSet::ProcessManifestWrites(
40204018
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
40214019
db_directory);
40224020
if (!io_s.ok()) {
4023-
io_status_ = io_s;
40244021
s = io_s;
4025-
} else if (io_status_.IsIOError()) {
4026-
io_status_ = io_s;
40274022
}
40284023
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
40294024
}
@@ -4044,6 +4039,14 @@ Status VersionSet::ProcessManifestWrites(
40444039
mu->Lock();
40454040
}
40464041

4042+
if (!io_s.ok()) {
4043+
if (io_status_.ok()) {
4044+
io_status_ = io_s;
4045+
}
4046+
} else if (!io_status_.ok()) {
4047+
io_status_ = io_s;
4048+
}
4049+
40474050
// Append the old manifest file to the obsolete_manifest_ list to be deleted
40484051
// by PurgeObsoleteFiles later.
40494052
if (s.ok() && new_descriptor_log) {
@@ -5297,7 +5300,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
52975300

52985301
Status VersionSet::WriteCurrentStateToManifest(
52995302
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5300-
log::Writer* log) {
5303+
log::Writer* log, IOStatus& io_s) {
53015304
// TODO: Break up into multiple records to reduce memory usage on recovery?
53025305

53035306
// WARNING: This method doesn't hold a mutex!!
@@ -5306,6 +5309,7 @@ Status VersionSet::WriteCurrentStateToManifest(
53065309
// LogAndApply. Column family manipulations can only happen within LogAndApply
53075310
// (the same single thread), so we're safe to iterate.
53085311

5312+
assert(io_s.ok());
53095313
if (db_options_->write_dbid_to_manifest) {
53105314
VersionEdit edit_for_db_id;
53115315
assert(!db_id_.empty());
@@ -5315,10 +5319,9 @@ Status VersionSet::WriteCurrentStateToManifest(
53155319
return Status::Corruption("Unable to Encode VersionEdit:" +
53165320
edit_for_db_id.DebugString(true));
53175321
}
5318-
IOStatus io_s = log->AddRecord(db_id_record);
5322+
io_s = log->AddRecord(db_id_record);
53195323
if (!io_s.ok()) {
5320-
io_status_ = io_s;
5321-
return std::move(io_s);
5324+
return io_s;
53225325
}
53235326
}
53245327

@@ -5345,10 +5348,9 @@ Status VersionSet::WriteCurrentStateToManifest(
53455348
return Status::Corruption(
53465349
"Unable to Encode VersionEdit:" + edit.DebugString(true));
53475350
}
5348-
IOStatus io_s = log->AddRecord(record);
5351+
io_s = log->AddRecord(record);
53495352
if (!io_s.ok()) {
5350-
io_status_ = io_s;
5351-
return std::move(io_s);
5353+
return io_s;
53525354
}
53535355
}
53545356

@@ -5398,10 +5400,9 @@ Status VersionSet::WriteCurrentStateToManifest(
53985400
return Status::Corruption(
53995401
"Unable to Encode VersionEdit:" + edit.DebugString(true));
54005402
}
5401-
IOStatus io_s = log->AddRecord(record);
5403+
io_s = log->AddRecord(record);
54025404
if (!io_s.ok()) {
5403-
io_status_ = io_s;
5404-
return std::move(io_s);
5405+
return io_s;
54055406
}
54065407
}
54075408
}

db/version_set.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,10 +1159,7 @@ class VersionSet {
11591159
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
11601160

11611161
// Get the IO Status returned by written Manifest.
1162-
IOStatus io_status() const { return io_status_; }
1163-
1164-
// Set the IO Status to OK. Called before Manifest write if needed.
1165-
void SetIOStatus(const IOStatus& s) { io_status_ = s; }
1162+
const IOStatus& io_status() const { return io_status_; }
11661163

11671164
protected:
11681165
using VersionBuilderMap =
@@ -1205,7 +1202,7 @@ class VersionSet {
12051202
// Save current contents to *log
12061203
Status WriteCurrentStateToManifest(
12071204
const std::unordered_map<uint32_t, MutableCFState>& curr_state,
1208-
log::Writer* log);
1205+
log::Writer* log, IOStatus& io_s);
12091206

12101207
void AppendVersion(ColumnFamilyData* column_family_data, Version* v);
12111208

0 commit comments

Comments
 (0)