-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Crashes and valgrind errors when using GetMergeOperands #9066
Copy link
Copy link
Closed
Labels
bugConfirmed RocksDB bugsConfirmed RocksDB bugs
Description
Hi!
I found a bug while reading data with GetMergeOperands. In stress cases there're 2 possible situations:
- PinnedSlice got copy of data from some Slice from MergeContext::operand_list_, and it points to somewhere in deleted (flushed) memtable
- The same, but the Slice points to some block's content which has been recently removed from the blocks cache.
There's a test below, which reproduces the bug. Sometimes it crashes with SIGSEGV, but easier to catch errors under valgrind.
This is reading from removed block (reproduced on rocksdb 6.25.3):
==31050== Invalid read of size 1
==31050== at 0x4C2EB4E: memcpy@@GLIBC_2.14 (vg_replace_strmem.c:1035)
==31050== by 0x573B54A: std::string::_M_replace_safe(unsigned long, unsigned long, char const*, unsigned long) (in /usr/lib64/libstdc++.so.6.0.19)
==31050== by 0x60831B: PinSelf (slice.h:178)
==31050== by 0x60831B: rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions&) (db_impl.cc:1908)
==31050== by 0x61F638: rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperan
dsOptions*, int*) (db_impl.h:192)
==31050== Address 0x655e971 is 17 bytes inside a block of size 2,399 free'd
==31050== at 0x4C2BB8F: operator delete[](void*) (vg_replace_malloc.c:651)
==31050== by 0x6AE9E7: operator() (memory_allocator.h:20)
==31050== by 0x6AE9E7: ~unique_ptr (unique_ptr.h:377)
==31050== by 0x6AE9E7: ~BlockContents (format.h:232)
==31050== by 0x6AE9E7: rocksdb::Block::~Block() (block.cc:910)
==31050== by 0x545CB8: rocksdb::cache_entry_roles_detail::RegisteredDeleter<rocksdb::Block, (rocksdb::CacheEntryRole)0>::Delete(rocksdb::Slice const&, void*) (cache_entry_roles.h:93)
==31050== by 0x5CA884: Free (lru_cache.h:205)
==31050== by 0x5CA884: rocksdb::LRUCacheShard::Release(rocksdb::Cache::Handle*, bool) (lru_cache.cc:546)
==31050== by 0x470184: DoCleanup (cleanable.h:62)
==31050== by 0x470184: Reset (cleanable.h:38)
==31050== by 0x470184: ReleasePinnedData (pinned_iterators_manager.h:71)
==31050== by 0x470184: rocksdb::PinnedIteratorsManager::~PinnedIteratorsManager() (pinned_iterators_manager.h:24)
==31050== by 0x48DC3A: rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, std::string*, rocksdb::Status*, rocksdb::MergeContext*, unsig
ned long*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) (functional:2030)
==31050== by 0x607CA5: rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions&) (db_impl.cc:1882)
==31050== by 0x61F638: rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperan
dsOptions*, int*) (db_impl.h:192)
And this is reading from a flushed memtable (reproduced on rocksdb 6.5.2, on version 6.25.3 I couldn't catch it)
==6741== Invalid read of size 8
==6741== at 0x4C2E8AF: memcpy@@GLIBC_2.14 (vg_replace_strmem.c:1035)
==6741== by 0x573B54A: std::string::_M_replace_safe(unsigned long, unsigned long, char const*, unsigned long) (in /usr/lib64/libstdc++.so.6.0.19)
==6741== by 0x5A1FF3: rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions) (in rocksdb/test)
==6741== by 0x5B9E69: rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperandsOptions*, int*) (in rocksdb/test)
==6741== Address 0xa46bb58 is 72 bytes inside a block of size 9,424 free'd
==6741== at 0x4C2BB8F: operator delete[](void*) (vg_replace_malloc.c:651)
==6741== by 0x4C707C: rocksdb::Arena::~Arena() (in rocksdb/test)
==6741== by 0x467378: rocksdb::MemTable::~MemTable() (in rocksdb/test)
==6741== by 0x58106C: rocksdb::SuperVersion::~SuperVersion() (in rocksdb/test)
==6741== by 0x5E427C: rocksdb::DBImpl::BackgroundCallFlush(rocksdb::Env::Priority) (in rocksdb/test)
==6741== by 0x675B63: rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long) (in rocksdb/test)
==6741== by 0x675CDD: rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper(void*) (in rocksdb/test)
==6741== Block was alloc'd at
==6741== at 0x4C2AC38: operator new[](unsigned long) (vg_replace_malloc.c:433)
==6741== by 0x4C74D2: rocksdb::Arena::AllocateNewBlock(unsigned long) (in rocksdb/test)
==6741== by 0x469B73: rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*) (in rocksdb/test)
==6741== by 0x4C7EBD: rocksdb::(anonymous namespace)::SkipListRep::Allocate(unsigned long, char**) (in rocksdb/test)
==6741== by 0x465CCA: rocksdb::MemTable::Add(unsigned long, rocksdb::ValueType, rocksdb::Slice const&, rocksdb::Slice const&, bool, rocksdb::MemTablePostProcessInfo*, void**) (in rocksdb/test)
==6741== by 0x4B08F2: rocksdb::MemTableInserter::MergeCF(unsigned int, rocksdb::Slice const&, rocksdb::Slice const&) (in rocksdb/test)
==6741== by 0x4A9617: rocksdb::WriteBatchInternal::Iterate(rocksdb::WriteBatch const*, rocksdb::WriteBatch::Handler*, unsigned long, unsigned long) (in rocksdb/test)
==6741== by 0x4A9DB9: rocksdb::WriteBatch::Iterate(rocksdb::WriteBatch::Handler*) const (in rocksdb/test)
==6741== by 0x4ACFF8: rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) (in rocksdb/test)
==6741== by 0x5CFCE3: rocksdb::DBImpl::WriteImpl(rocksdb::WriteOptions const&, rocksdb::WriteBatch*, rocksdb::WriteCallback*, unsigned long*, unsigned long, bool, unsigned long*, unsigned long, rocksdb::PreReleaseCallback*) (in rocksdb/test)
==6741== by 0x5D0921: rocksdb::DBImpl::Write(rocksdb::WriteOptions const&, rocksdb::WriteBatch*) (in rocksdb/test)
==6741== by 0x5D0E50: rocksdb::DB::Merge(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&) (in rocksdb/test)
This is how I build the test (the file test.cc is in rocksdb dir):
g++ --std=c++11 -Iinclude test.cc librocksdb.a -lzstd -lsnappy -lz -ldl -lbz2 -lpthread -o test
The source:
#include <cassert>
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include <string>
#include <thread>
// a simple string appending merge operator
class StringAppendOperator : public rocksdb::AssociativeMergeOperator {
public:
virtual bool Merge(
const rocksdb::Slice& key,
const rocksdb::Slice* existing_value,
const rocksdb::Slice& value,
std::string* new_value,
rocksdb::Logger* logger) const override {
if (existing_value)
new_value->append(existing_value->ToString());
return true;
}
virtual const char* Name() const override {
return "StringAppendOperator";
}
};
int main()
{
rocksdb::DB* db;
rocksdb::Options options;
options.create_if_missing = true;
options.merge_operator.reset(new StringAppendOperator);
options.write_buffer_size = 128 * 1024;
options.max_write_buffer_number = 16;
options.min_write_buffer_number_to_merge = 4;
rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);
assert(status.ok());
//
// Let's crash it!
//
const size_t KEYS = 100;
const size_t COMMITS = 1000;
const size_t READERS = 5;
std::atomic<bool> finished{false};
// writer thread
std::thread writer([&]()
{
for (size_t i = 0; i < COMMITS; i++)
{
rocksdb::WriteOptions opts;
size_t SEED = rand();
for (unsigned long num = 0; num < 10000; num++)
db->Merge(opts, std::string("KEY") + std::to_string(num % KEYS), std::string((SEED % 10000) + 10, 'x'));
//db->Flush(rocksdb::FlushOptions());
}
// tell the readers to stop
finished = true;
});
// single read
auto one_read = [KEYS, db](const std::string & key) -> std::string
{
rocksdb::GetMergeOperandsOptions opts;
rocksdb::ReadOptions ropts;
int number_of_operands = 10;
std::vector<rocksdb::PinnableSlice> pinned(number_of_operands);
rocksdb::Status s;
for (int i = 0; i < 10; i++) // protection of deadloop - 10 attempts only
{
opts.expected_max_number_of_operands = pinned.size();
s = db->GetMergeOperands(ropts, db->DefaultColumnFamily(), key, pinned.data(), &opts, &number_of_operands);
if (s.IsIncomplete() ||
(size_t)number_of_operands > pinned.size()) // paranoia
{
// if given number is not enough
// take the returned number plus some reserve
pinned = std::vector<rocksdb::PinnableSlice>(number_of_operands + 10);
}
else
break;
}
if (s.IsNotFound())
return "";
std::string res;
for (size_t i = 0; i < (size_t)number_of_operands; i++)
res.append(pinned[i].data(), pinned[i].size());
return res;
};
// reader threads
auto reader_func = [db, &finished, &one_read]()
{
while (!finished)
for (int i = 0; i < 100; i++)
one_read(std::string("KEY") + std::to_string(i % KEYS));
};
std::vector<std::thread> readers;
for (size_t i = 0; i < READERS; i++)
readers.emplace_back(reader_func);
// Wait for completion
writer.join();
for (auto & r : readers)
r.join();
delete db;
}
And this is a patch fixing the problem. But I'm not sure it is totally correct.
diff --git a/db/memtable.cc b/db/memtable.cc
index dd6604514..e82655d2d 100644
--- a/db/memtable.cc
+++ b/db/memtable.cc
@@ -694,13 +694,13 @@ static bool SaveValue(void* arg, const char* entry) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
- v, s->inplace_update_support == false /* operand_pinned */);
+ v, false /* operand_pinned */);
}
} else if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
- v, s->inplace_update_support == false /* operand_pinned */);
+ v, false /* operand_pinned */);
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
@@ -743,7 +743,7 @@ static bool SaveValue(void* arg, const char* entry) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->merge_in_progress) = true;
merge_context->PushOperand(
- v, s->inplace_update_support == false /* operand_pinned */);
+ v, s->do_merge /* operand_pinned */);
if (s->do_merge && merge_operator->ShouldMerge(
merge_context->GetOperandsDirectionBackward())) {
*(s->status) = MergeHelper::TimedFullMerge(
diff --git a/db/version_set.cc b/db/version_set.cc
index 36e9c527d..45a2f221f 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -1730,7 +1730,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
tracing_get_id);
// Pin blocks that we read to hold merge operands
- if (merge_operator_) {
+ if (merge_operator_ && do_merge) {
pinned_iters_mgr.StartPinning();
}
Reactions are currently unavailable
Metadata
Metadata
Labels
bugConfirmed RocksDB bugsConfirmed RocksDB bugs