Skip to content

Commit 340ed4f

Browse files
riversand963facebook-github-bot
authored andcommitted
Add support for timestamp in Get/Put (#5079)
Summary: It's useful to be able to (optionally) associate key-value pairs with user-provided timestamps. This PR is an early effort towards this goal and continues the work of #4942. A suite of new unit tests exist in DBBasicTestWithTimestampWithParam. Support for timestamp requires the user to provide timestamp as a slice in `ReadOptions` and `WriteOptions`. All timestamps of the same database must share the same length, format, etc. The format of the timestamp is the same throughout the same database, and the user is responsible for providing a comparator function (Comparator) to order the <key, timestamp> tuples. Once created, the format and length of the timestamp cannot change (at least for now). Test plan (on devserver): ``` $COMPILE_WITH_ASAN=1 make -j32 all $./db_basic_test --gtest_filter=Timestamp/DBBasicTestWithTimestampWithParam.PutAndGet/* $make check ``` All tests must pass. We also run the following db_bench tests to verify whether there is regression on Get/Put while timestamp is not enabled. ``` $TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillseq,readrandom -num=1000000 $TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillrandom -num=1000000 ``` Repeat for 6 times for both versions. Results are as follows: ``` | | readrandom | fillrandom | | master | 16.77 MB/s | 47.05 MB/s | | PR5079 | 16.44 MB/s | 47.03 MB/s | ``` Pull Request resolved: #5079 Differential Revision: D15132946 Pulled By: riversand963 fbshipit-source-id: 833a0d657eac21182f0f206c910a6438154c742c
1 parent cb1bf09 commit 340ed4f

14 files changed

Lines changed: 318 additions & 40 deletions

HISTORY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* Partitions of partitioned indexes no longer affect the read amplification statistics.
66
* Due to a refactoring, block cache eviction statistics for indexes are temporarily broken. We plan to reintroduce them in a later phase.
77
* options.keep_log_file_num will be enforced strictly all the time. File names of all log files will be tracked, which may take significantly amount of memory if options.keep_log_file_num is large and either of options.max_log_file_size or options.log_file_time_to_roll is set.
8+
* Add initial support for Get/Put with user timestamps. Users can specify timestamps via ReadOptions and WriteOptions when calling DB::Get and DB::Put.
89

910
### New Features
1011
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.

db/db_basic_test.cc

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,157 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
12841284
}
12851285
}
12861286
}
1287+
1288+
class DBBasicTestWithTimestampWithParam
1289+
: public DBTestBase,
1290+
public testing::WithParamInterface<bool> {
1291+
public:
1292+
DBBasicTestWithTimestampWithParam()
1293+
: DBTestBase("/db_basic_test_with_timestamp") {}
1294+
1295+
protected:
1296+
class TestComparator : public Comparator {
1297+
private:
1298+
const Comparator* cmp_without_ts_;
1299+
1300+
public:
1301+
explicit TestComparator(size_t ts_sz)
1302+
: Comparator(ts_sz), cmp_without_ts_(nullptr) {
1303+
cmp_without_ts_ = BytewiseComparator();
1304+
}
1305+
1306+
const char* Name() const override { return "TestComparator"; }
1307+
1308+
void FindShortSuccessor(std::string*) const override {}
1309+
1310+
void FindShortestSeparator(std::string*, const Slice&) const override {}
1311+
1312+
int Compare(const Slice& a, const Slice& b) const override {
1313+
int r = CompareWithoutTimestamp(a, b);
1314+
if (r != 0 || 0 == timestamp_size()) {
1315+
return r;
1316+
}
1317+
return CompareTimestamp(
1318+
Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
1319+
Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
1320+
}
1321+
1322+
int CompareWithoutTimestamp(const Slice& a, const Slice& b) const override {
1323+
assert(a.size() >= timestamp_size());
1324+
assert(b.size() >= timestamp_size());
1325+
Slice k1 = StripTimestampFromUserKey(a, timestamp_size());
1326+
Slice k2 = StripTimestampFromUserKey(b, timestamp_size());
1327+
1328+
return cmp_without_ts_->Compare(k1, k2);
1329+
}
1330+
1331+
int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
1332+
if (!ts1.data() && !ts2.data()) {
1333+
return 0;
1334+
} else if (ts1.data() && !ts2.data()) {
1335+
return 1;
1336+
} else if (!ts1.data() && ts2.data()) {
1337+
return -1;
1338+
}
1339+
assert(ts1.size() == ts2.size());
1340+
uint64_t low1 = 0;
1341+
uint64_t low2 = 0;
1342+
uint64_t high1 = 0;
1343+
uint64_t high2 = 0;
1344+
auto* ptr1 = const_cast<Slice*>(&ts1);
1345+
auto* ptr2 = const_cast<Slice*>(&ts2);
1346+
if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
1347+
!GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
1348+
assert(false);
1349+
}
1350+
if (high1 < high2) {
1351+
return 1;
1352+
} else if (high1 > high2) {
1353+
return -1;
1354+
}
1355+
if (low1 < low2) {
1356+
return 1;
1357+
} else if (low1 > low2) {
1358+
return -1;
1359+
}
1360+
return 0;
1361+
}
1362+
};
1363+
1364+
Slice EncodeTimestamp(uint64_t low, uint64_t high, std::string* ts) {
1365+
assert(nullptr != ts);
1366+
ts->clear();
1367+
PutFixed64(ts, low);
1368+
PutFixed64(ts, high);
1369+
assert(ts->size() == sizeof(low) + sizeof(high));
1370+
return Slice(*ts);
1371+
}
1372+
};
1373+
1374+
TEST_P(DBBasicTestWithTimestampWithParam, PutAndGet) {
1375+
const int kNumKeysPerFile = 8192;
1376+
const size_t kNumTimestamps = 6;
1377+
bool memtable_only = GetParam();
1378+
Options options = CurrentOptions();
1379+
options.create_if_missing = true;
1380+
options.env = env_;
1381+
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
1382+
std::string tmp;
1383+
size_t ts_sz = EncodeTimestamp(0, 0, &tmp).size();
1384+
TestComparator test_cmp(ts_sz);
1385+
options.comparator = &test_cmp;
1386+
BlockBasedTableOptions bbto;
1387+
bbto.filter_policy.reset(NewBloomFilterPolicy(
1388+
10 /*bits_per_key*/, false /*use_block_based_builder*/));
1389+
bbto.whole_key_filtering = true;
1390+
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1391+
DestroyAndReopen(options);
1392+
CreateAndReopenWithCF({"pikachu"}, options);
1393+
size_t num_cfs = handles_.size();
1394+
ASSERT_EQ(2, num_cfs);
1395+
std::vector<std::string> write_ts_strs(kNumTimestamps);
1396+
std::vector<std::string> read_ts_strs(kNumTimestamps);
1397+
std::vector<Slice> write_ts_list;
1398+
std::vector<Slice> read_ts_list;
1399+
1400+
for (size_t i = 0; i != kNumTimestamps; ++i) {
1401+
write_ts_list.emplace_back(EncodeTimestamp(i * 2, 0, &write_ts_strs[i]));
1402+
read_ts_list.emplace_back(EncodeTimestamp(1 + i * 2, 0, &read_ts_strs[i]));
1403+
const Slice& write_ts = write_ts_list.back();
1404+
WriteOptions wopts;
1405+
wopts.timestamp = &write_ts;
1406+
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
1407+
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
1408+
ASSERT_OK(Put(cf, "key" + std::to_string(j),
1409+
"value_" + std::to_string(j) + "_" + std::to_string(i),
1410+
wopts));
1411+
}
1412+
if (!memtable_only) {
1413+
ASSERT_OK(Flush(cf));
1414+
}
1415+
}
1416+
}
1417+
const auto& verify_db_func = [&]() {
1418+
for (size_t i = 0; i != kNumTimestamps; ++i) {
1419+
ReadOptions ropts;
1420+
ropts.timestamp = &read_ts_list[i];
1421+
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
1422+
ColumnFamilyHandle* cfh = handles_[cf];
1423+
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
1424+
std::string value;
1425+
ASSERT_OK(db_->Get(ropts, cfh, "key" + std::to_string(j), &value));
1426+
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
1427+
value);
1428+
}
1429+
}
1430+
}
1431+
};
1432+
verify_db_func();
1433+
}
1434+
1435+
INSTANTIATE_TEST_CASE_P(Timestamp, DBBasicTestWithTimestampWithParam,
1436+
::testing::Bool());
1437+
12871438
} // namespace rocksdb
12881439

12891440
int main(int argc, char** argv) {

db/db_impl/db_impl.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,16 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
13761376
Status DBImpl::Get(const ReadOptions& read_options,
13771377
ColumnFamilyHandle* column_family, const Slice& key,
13781378
PinnableSlice* value) {
1379-
return GetImpl(read_options, column_family, key, value);
1379+
if (nullptr == read_options.timestamp) {
1380+
return GetImpl(read_options, column_family, key, value);
1381+
}
1382+
Slice akey;
1383+
std::string buf;
1384+
Status s = AppendTimestamp(key, *(read_options.timestamp), &akey, &buf);
1385+
if (s.ok()) {
1386+
s = GetImpl(read_options, column_family, akey, value);
1387+
}
1388+
return s;
13801389
}
13811390

13821391
Status DBImpl::GetImpl(const ReadOptions& read_options,

db/db_impl/db_impl_write.cc

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,11 +1677,25 @@ size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
16771677
// can call if they wish
16781678
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
16791679
const Slice& key, const Slice& value) {
1680-
// Pre-allocate size of write batch conservatively.
1681-
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1682-
// and we allocate 11 extra bytes for key length, as well as value length.
1683-
WriteBatch batch(key.size() + value.size() + 24);
1684-
Status s = batch.Put(column_family, key, value);
1680+
if (nullptr == opt.timestamp) {
1681+
// Pre-allocate size of write batch conservatively.
1682+
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
1683+
// and we allocate 11 extra bytes for key length, as well as value length.
1684+
WriteBatch batch(key.size() + value.size() + 24);
1685+
Status s = batch.Put(column_family, key, value);
1686+
if (!s.ok()) {
1687+
return s;
1688+
}
1689+
return Write(opt, &batch);
1690+
}
1691+
Slice akey;
1692+
std::string buf;
1693+
Status s = AppendTimestamp(key, *(opt.timestamp), &akey, &buf);
1694+
if (!s.ok()) {
1695+
return s;
1696+
}
1697+
WriteBatch batch(akey.size() + value.size() + 24);
1698+
s = batch.Put(column_family, akey, value);
16851699
if (!s.ok()) {
16861700
return s;
16871701
}

db/dbformat.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,17 @@ inline Slice ExtractUserKey(const Slice& internal_key) {
151151
return Slice(internal_key.data(), internal_key.size() - 8);
152152
}
153153

154+
inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key,
155+
size_t ts_sz) {
156+
assert(internal_key.size() >= 8 + ts_sz);
157+
return Slice(internal_key.data(), internal_key.size() - 8 - ts_sz);
158+
}
159+
160+
inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
161+
assert(user_key.size() >= ts_sz);
162+
return Slice(user_key.data(), user_key.size() - ts_sz);
163+
}
164+
154165
inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
155166
assert(internal_key.size() >= 8);
156167
const size_t n = internal_key.size();
@@ -658,4 +669,20 @@ struct ParsedInternalKeyComparator {
658669
const InternalKeyComparator* cmp;
659670
};
660671

672+
// TODO (yanqin): this causes extra memory allocation and copy. Should be
673+
// addressed in the future.
674+
inline Status AppendTimestamp(const Slice& key, const Slice& timestamp,
675+
Slice* ret_key, std::string* ret_buf) {
676+
assert(ret_key != nullptr);
677+
assert(ret_buf != nullptr);
678+
if (key.data() + key.size() == timestamp.data()) {
679+
*ret_key = Slice(key.data(), key.size() + timestamp.size());
680+
} else {
681+
ret_buf->assign(key.data(), key.size());
682+
ret_buf->append(timestamp.data(), timestamp.size());
683+
*ret_key = Slice(*ret_buf);
684+
}
685+
return Status::OK();
686+
}
687+
661688
} // namespace rocksdb

db/memtable.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,8 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
493493
p = EncodeVarint32(p, val_size);
494494
memcpy(p, value.data(), val_size);
495495
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
496+
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
497+
496498
if (!allow_concurrent) {
497499
// Extract prefix for insert with hint.
498500
if (insert_with_hint_prefix_extractor_ != nullptr &&
@@ -525,7 +527,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
525527
bloom_filter_->Add(prefix_extractor_->Transform(key));
526528
}
527529
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
528-
bloom_filter_->Add(key);
530+
bloom_filter_->Add(StripTimestampFromUserKey(key, ts_sz));
529531
}
530532

531533
// The first sequence number inserted into the memtable
@@ -559,7 +561,7 @@ bool MemTable::Add(SequenceNumber s, ValueType type,
559561
bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
560562
}
561563
if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
562-
bloom_filter_->AddConcurrently(key);
564+
bloom_filter_->AddConcurrently(StripTimestampFromUserKey(key, ts_sz));
563565
}
564566

565567
// atomically update first_seqno_ and earliest_seqno_.
@@ -632,8 +634,10 @@ static bool SaveValue(void* arg, const char* entry) {
632634
// all entries with overly large sequence numbers.
633635
uint32_t key_length;
634636
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
635-
if (s->mem->GetInternalKeyComparator().user_comparator()->Equal(
636-
Slice(key_ptr, key_length - 8), s->key->user_key())) {
637+
Slice user_key_slice = Slice(key_ptr, key_length - 8);
638+
if (s->mem->GetInternalKeyComparator()
639+
.user_comparator()
640+
->CompareWithoutTimestamp(user_key_slice, s->key->user_key()) == 0) {
637641
// Correct user key
638642
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
639643
ValueType type;
@@ -767,11 +771,13 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
767771
bool found_final_value = false;
768772
bool merge_in_progress = s->IsMergeInProgress();
769773
bool may_contain = true;
774+
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
770775
if (bloom_filter_) {
771776
// when both memtable_whole_key_filtering and prefix_extractor_ are set,
772777
// only do whole key filtering for Get() to save CPU
773778
if (moptions_.memtable_whole_key_filtering) {
774-
may_contain = bloom_filter_->MayContain(user_key);
779+
may_contain =
780+
bloom_filter_->MayContain(StripTimestampFromUserKey(user_key, ts_sz));
775781
} else {
776782
assert(prefix_extractor_);
777783
may_contain =

0 commit comments

Comments
 (0)