Skip to content

Commit 2730fe6

Browse files
Yi Wufacebook-github-bot
authored andcommitted
Fix ingested file and direcotry not being sync (#5435)
Summary: It it not safe to assume application had sync the SST file before ingest it into DB. Also the directory to put the ingested file needs to be fsync, otherwise the file can be lost. For integrity of RocksDB we need to sync the ingested file and directory before apply the change to manifest. Also syncing after writing global sequence when write_global_seqno=true was removed in #4172. Adding it back. Fixes #5287. Pull Request resolved: #5435 Test Plan: Test ingest file with ldb command and observe fsync/fdatasync in strace output. Tried both move_files=true and move_files=false. https://gist.github.com/yiwu-arbug/650a4023f57979056d83485fa863bef9 More test suggestions are welcome. Differential Revision: D15941675 Pulled By: riversand963 fbshipit-source-id: 389533f3923065a96df2cdde23ff4724a1810d78
1 parent 1bfeffa commit 2730fe6

9 files changed

Lines changed: 270 additions & 44 deletions

HISTORY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number.
3030
* Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level.
3131
* Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family.
32+
* Fix ingested file and directory not being fsync.
3233
* Return TryAgain status in place of Corruption when new tail is not visible to TransactionLogIterator.
3334
* Fix a bug caused by secondary not skipping the beginning of new MANIFEST.
3435

db/db_impl/db_impl.cc

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -861,16 +861,6 @@ Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
861861
return ret_dir;
862862
}
863863

864-
Directory* DBImpl::Directories::GetDataDir(size_t path_id) const {
865-
assert(path_id < data_dirs_.size());
866-
Directory* ret_dir = data_dirs_[path_id].get();
867-
if (ret_dir == nullptr) {
868-
// Should use db_dir_
869-
return db_dir_.get();
870-
}
871-
return ret_dir;
872-
}
873-
874864
Status DBImpl::SetOptions(
875865
ColumnFamilyHandle* column_family,
876866
const std::unordered_map<std::string, std::string>& options_map) {
@@ -3644,7 +3634,7 @@ Status DBImpl::IngestExternalFiles(
36443634
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
36453635
ingestion_jobs.emplace_back(env_, versions_.get(), cfd,
36463636
immutable_db_options_, env_options_,
3647-
&snapshots_, arg.options);
3637+
&snapshots_, arg.options, &directories_);
36483638
}
36493639
std::vector<std::pair<bool, Status>> exec_results;
36503640
for (size_t i = 0; i != num_cfs; ++i) {

db/db_impl/db_impl.h

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,38 @@ struct JobContext;
7777
struct ExternalSstFileInfo;
7878
struct MemTableInfo;
7979

80+
// Class to maintain directories for all database paths other than main one.
81+
class Directories {
82+
public:
83+
Status SetDirectories(Env* env, const std::string& dbname,
84+
const std::string& wal_dir,
85+
const std::vector<DbPath>& data_paths);
86+
87+
Directory* GetDataDir(size_t path_id) const {
88+
assert(path_id < data_dirs_.size());
89+
Directory* ret_dir = data_dirs_[path_id].get();
90+
if (ret_dir == nullptr) {
91+
// Should use db_dir_
92+
return db_dir_.get();
93+
}
94+
return ret_dir;
95+
}
96+
97+
Directory* GetWalDir() {
98+
if (wal_dir_) {
99+
return wal_dir_.get();
100+
}
101+
return db_dir_.get();
102+
}
103+
104+
Directory* GetDbDir() { return db_dir_.get(); }
105+
106+
private:
107+
std::unique_ptr<Directory> db_dir_;
108+
std::vector<std::unique_ptr<Directory>> data_dirs_;
109+
std::unique_ptr<Directory> wal_dir_;
110+
};
111+
80112
// While DB is the public interface of RocksDB, and DBImpl is the actual
81113
// class implementing it. It's the entrance of the core RocksdB engine.
82114
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
@@ -1047,30 +1079,6 @@ class DBImpl : public DB {
10471079
}
10481080
};
10491081

1050-
// Class to maintain directories for all database paths other than main one.
1051-
class Directories {
1052-
public:
1053-
Status SetDirectories(Env* env, const std::string& dbname,
1054-
const std::string& wal_dir,
1055-
const std::vector<DbPath>& data_paths);
1056-
1057-
Directory* GetDataDir(size_t path_id) const;
1058-
1059-
Directory* GetWalDir() {
1060-
if (wal_dir_) {
1061-
return wal_dir_.get();
1062-
}
1063-
return db_dir_.get();
1064-
}
1065-
1066-
Directory* GetDbDir() { return db_dir_.get(); }
1067-
1068-
private:
1069-
std::unique_ptr<Directory> db_dir_;
1070-
std::vector<std::unique_ptr<Directory>> data_dirs_;
1071-
std::unique_ptr<Directory> wal_dir_;
1072-
};
1073-
10741082
struct LogFileNumberSize {
10751083
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
10761084
void AddSize(uint64_t new_size) { size += new_size; }

db/db_impl/db_impl_open.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
265265
return env->NewDirectory(dirname, directory);
266266
}
267267

268-
Status DBImpl::Directories::SetDirectories(
269-
Env* env, const std::string& dbname, const std::string& wal_dir,
270-
const std::vector<DbPath>& data_paths) {
268+
Status Directories::SetDirectories(Env* env, const std::string& dbname,
269+
const std::string& wal_dir,
270+
const std::vector<DbPath>& data_paths) {
271271
Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_);
272272
if (!s.ok()) {
273273
return s;

db/external_sst_file_basic_test.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "port/port.h"
1010
#include "port/stack_trace.h"
1111
#include "rocksdb/sst_file_writer.h"
12+
#include "test_util/fault_injection_test_env.h"
1213
#include "test_util/testutil.h"
1314

1415
namespace rocksdb {
@@ -20,6 +21,7 @@ class ExternalSSTFileBasicTest
2021
public:
2122
ExternalSSTFileBasicTest() : DBTestBase("/external_sst_file_basic_test") {
2223
sst_files_dir_ = dbname_ + "/sst_files/";
24+
fault_injection_test_env_.reset(new FaultInjectionTestEnv(Env::Default()));
2325
DestroyAndRecreateExternalSSTFilesDir();
2426
}
2527

@@ -140,6 +142,7 @@ class ExternalSSTFileBasicTest
140142

141143
protected:
142144
std::string sst_files_dir_;
145+
std::unique_ptr<FaultInjectionTestEnv> fault_injection_test_env_;
143146
};
144147

145148
TEST_F(ExternalSSTFileBasicTest, Basic) {
@@ -689,6 +692,59 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
689692
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
690693
}
691694

695+
TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
696+
Options options;
697+
options.create_if_missing = true;
698+
options.env = fault_injection_test_env_.get();
699+
700+
std::vector<std::pair<std::string, std::string>> test_cases = {
701+
{"ExternalSstFileIngestionJob::BeforeSyncIngestedFile",
702+
"ExternalSstFileIngestionJob::AfterSyncIngestedFile"},
703+
{"ExternalSstFileIngestionJob::BeforeSyncDir",
704+
"ExternalSstFileIngestionJob::AfterSyncDir"},
705+
{"ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno",
706+
"ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"}};
707+
708+
for (size_t i = 0; i < test_cases.size(); i++) {
709+
SyncPoint::GetInstance()->SetCallBack(test_cases[i].first, [&](void*) {
710+
fault_injection_test_env_->SetFilesystemActive(false);
711+
});
712+
SyncPoint::GetInstance()->SetCallBack(test_cases[i].second, [&](void*) {
713+
fault_injection_test_env_->SetFilesystemActive(true);
714+
});
715+
SyncPoint::GetInstance()->EnableProcessing();
716+
717+
DestroyAndReopen(options);
718+
if (i == 2) {
719+
ASSERT_OK(Put("foo", "v1"));
720+
}
721+
722+
Options sst_file_writer_options;
723+
std::unique_ptr<SstFileWriter> sst_file_writer(
724+
new SstFileWriter(EnvOptions(), sst_file_writer_options));
725+
std::string file_name =
726+
sst_files_dir_ + "sync_failure_test_" + ToString(i) + ".sst";
727+
ASSERT_OK(sst_file_writer->Open(file_name));
728+
ASSERT_OK(sst_file_writer->Put("bar", "v2"));
729+
ASSERT_OK(sst_file_writer->Finish());
730+
731+
IngestExternalFileOptions ingest_opt;
732+
if (i == 0) {
733+
ingest_opt.move_files = true;
734+
}
735+
const Snapshot* snapshot = db_->GetSnapshot();
736+
if (i == 2) {
737+
ingest_opt.write_global_seqno = true;
738+
}
739+
ASSERT_FALSE(db_->IngestExternalFile({file_name}, ingest_opt).ok());
740+
db_->ReleaseSnapshot(snapshot);
741+
742+
SyncPoint::GetInstance()->DisableProcessing();
743+
SyncPoint::GetInstance()->ClearAllCallBacks();
744+
Destroy(options);
745+
}
746+
}
747+
692748
TEST_P(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
693749
int kNumLevels = 7;
694750
Options options = CurrentOptions();

db/external_sst_file_ingestion_job.cc

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
#include "db/external_sst_file_ingestion_job.h"
99

10-
#include <cinttypes>
1110
#include <algorithm>
11+
#include <cinttypes>
1212
#include <string>
13+
#include <unordered_set>
1314
#include <vector>
1415

16+
#include "db/db_impl/db_impl.h"
1517
#include "db/version_edit.h"
1618
#include "file/file_util.h"
1719
#include "table/merging_iterator.h"
@@ -86,6 +88,7 @@ Status ExternalSstFileIngestionJob::Prepare(
8688
}
8789

8890
// Copy/Move external files into DB
91+
std::unordered_set<size_t> ingestion_path_ids;
8992
for (IngestedFileInfo& f : files_to_ingest_) {
9093
f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
9194
f.copy_file = false;
@@ -95,8 +98,26 @@ Status ExternalSstFileIngestionJob::Prepare(
9598
f.fd.GetPathId());
9699
if (ingestion_options_.move_files) {
97100
status = env_->LinkFile(path_outside_db, path_inside_db);
98-
if (status.IsNotSupported() &&
99-
ingestion_options_.failed_move_fall_back_to_copy) {
101+
if (status.ok()) {
102+
// It is unsafe to assume application had sync the file and file
103+
// directory before ingest the file. For integrity of RocksDB we need
104+
// to sync the file.
105+
std::unique_ptr<WritableFile> file_to_sync;
106+
status = env_->ReopenWritableFile(path_inside_db, &file_to_sync,
107+
env_options_);
108+
if (status.ok()) {
109+
TEST_SYNC_POINT(
110+
"ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
111+
status = SyncIngestedFile(file_to_sync.get());
112+
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
113+
if (!status.ok()) {
114+
ROCKS_LOG_WARN(db_options_.info_log,
115+
"Failed to sync ingested file %s: %s",
116+
path_inside_db.c_str(), status.ToString().c_str());
117+
}
118+
}
119+
} else if (status.IsNotSupported() &&
120+
ingestion_options_.failed_move_fall_back_to_copy) {
100121
// Original file is on a different FS, use copy instead of hard linking.
101122
f.copy_file = true;
102123
}
@@ -107,6 +128,7 @@ Status ExternalSstFileIngestionJob::Prepare(
107128
if (f.copy_file) {
108129
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
109130
nullptr);
131+
// CopyFile also sync the new file.
110132
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
111133
db_options_.use_fsync);
112134
}
@@ -115,8 +137,25 @@ Status ExternalSstFileIngestionJob::Prepare(
115137
break;
116138
}
117139
f.internal_file_path = path_inside_db;
140+
ingestion_path_ids.insert(f.fd.GetPathId());
141+
}
142+
143+
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
144+
if (status.ok()) {
145+
for (auto path_id : ingestion_path_ids) {
146+
status = directories_->GetDataDir(path_id)->Fsync();
147+
if (!status.ok()) {
148+
ROCKS_LOG_WARN(db_options_.info_log,
149+
"Failed to sync directory %" ROCKSDB_PRIszt
150+
" while ingest file: %s",
151+
path_id, status.ToString().c_str());
152+
break;
153+
}
154+
}
118155
}
156+
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
119157

158+
// TODO: The following is duplicated with Cleanup().
120159
if (!status.ok()) {
121160
// We failed, remove all files that we copied into the db
122161
for (IngestedFileInfo& f : files_to_ingest_) {
@@ -559,6 +598,18 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
559598
std::string seqno_val;
560599
PutFixed64(&seqno_val, seqno);
561600
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
601+
if (status.ok()) {
602+
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
603+
status = SyncIngestedFile(rwfile.get());
604+
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
605+
if (!status.ok()) {
606+
ROCKS_LOG_WARN(db_options_.info_log,
607+
"Failed to sync ingested file %s after writing global "
608+
"sequence number: %s",
609+
file_to_ingest->internal_file_path.c_str(),
610+
status.ToString().c_str());
611+
}
612+
}
562613
if (!status.ok()) {
563614
return status;
564615
}
@@ -599,6 +650,16 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
599650
return true;
600651
}
601652

653+
template <typename TWritableFile>
654+
Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) {
655+
assert(file != nullptr);
656+
if (db_options_.use_fsync) {
657+
return file->Fsync();
658+
} else {
659+
return file->Sync();
660+
}
661+
}
662+
602663
} // namespace rocksdb
603664

604665
#endif // !ROCKSDB_LITE

db/external_sst_file_ingestion_job.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
namespace rocksdb {
2222

23+
class Directories;
24+
2325
struct IngestedFileInfo {
2426
// External file path
2527
std::string external_file_path;
@@ -77,16 +79,20 @@ class ExternalSstFileIngestionJob {
7779
Env* env, VersionSet* versions, ColumnFamilyData* cfd,
7880
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
7981
SnapshotList* db_snapshots,
80-
const IngestExternalFileOptions& ingestion_options)
82+
const IngestExternalFileOptions& ingestion_options,
83+
Directories* directories)
8184
: env_(env),
8285
versions_(versions),
8386
cfd_(cfd),
8487
db_options_(db_options),
8588
env_options_(env_options),
8689
db_snapshots_(db_snapshots),
8790
ingestion_options_(ingestion_options),
91+
directories_(directories),
8892
job_start_time_(env_->NowMicros()),
89-
consumed_seqno_(false) {}
93+
consumed_seqno_(false) {
94+
assert(directories != nullptr);
95+
}
9096

9197
// Prepare the job by copying external files into the DB.
9298
Status Prepare(const std::vector<std::string>& external_files_paths,
@@ -153,6 +159,10 @@ class ExternalSstFileIngestionJob {
153159
bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest,
154160
int level);
155161

162+
// Helper method to sync given file.
163+
template <typename TWritableFile>
164+
Status SyncIngestedFile(TWritableFile* file);
165+
156166
Env* env_;
157167
VersionSet* versions_;
158168
ColumnFamilyData* cfd_;
@@ -161,6 +171,7 @@ class ExternalSstFileIngestionJob {
161171
SnapshotList* db_snapshots_;
162172
autovector<IngestedFileInfo> files_to_ingest_;
163173
const IngestExternalFileOptions& ingestion_options_;
174+
Directories* directories_;
164175
VersionEdit edit_;
165176
uint64_t job_start_time_;
166177
bool consumed_seqno_;

0 commit comments

Comments
 (0)