Skip to content

Crashes and valgrind errors when using GetMergeOperands #9066

@abuldakov

Description

@abuldakov

Hi!
I found a bug while reading data with GetMergeOperands. In stress cases there're 2 possible situations:

  1. PinnedSlice got copy of data from some Slice from MergeContext::operand_list_, and it points to somewhere in deleted (flushed) memtable
  2. The same, but the Slice points to some block's content which has been recently removed from the blocks cache.

There's a test below, which reproduces the bug. Sometimes it crashes with SIGSEGV, but easier to catch errors under valgrind.
This is reading from removed block (reproduced on rocksdb 6.25.3):

==31050== Invalid read of size 1
==31050==    at 0x4C2EB4E: memcpy@@GLIBC_2.14 (vg_replace_strmem.c:1035)
==31050==    by 0x573B54A: std::string::_M_replace_safe(unsigned long, unsigned long, char const*, unsigned long) (in /usr/lib64/libstdc++.so.6.0.19)
==31050==    by 0x60831B: PinSelf (slice.h:178)
==31050==    by 0x60831B: rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions&) (db_impl.cc:1908)
==31050==    by 0x61F638: rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperan
dsOptions*, int*) (db_impl.h:192)
==31050==  Address 0x655e971 is 17 bytes inside a block of size 2,399 free'd
==31050==    at 0x4C2BB8F: operator delete[](void*) (vg_replace_malloc.c:651)
==31050==    by 0x6AE9E7: operator() (memory_allocator.h:20)
==31050==    by 0x6AE9E7: ~unique_ptr (unique_ptr.h:377)
==31050==    by 0x6AE9E7: ~BlockContents (format.h:232)
==31050==    by 0x6AE9E7: rocksdb::Block::~Block() (block.cc:910)
==31050==    by 0x545CB8: rocksdb::cache_entry_roles_detail::RegisteredDeleter<rocksdb::Block, (rocksdb::CacheEntryRole)0>::Delete(rocksdb::Slice const&, void*) (cache_entry_roles.h:93)
==31050==    by 0x5CA884: Free (lru_cache.h:205)
==31050==    by 0x5CA884: rocksdb::LRUCacheShard::Release(rocksdb::Cache::Handle*, bool) (lru_cache.cc:546)
==31050==    by 0x470184: DoCleanup (cleanable.h:62)
==31050==    by 0x470184: Reset (cleanable.h:38)
==31050==    by 0x470184: ReleasePinnedData (pinned_iterators_manager.h:71)
==31050==    by 0x470184: rocksdb::PinnedIteratorsManager::~PinnedIteratorsManager() (pinned_iterators_manager.h:24)
==31050==    by 0x48DC3A: rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, std::string*, rocksdb::Status*, rocksdb::MergeContext*, unsig
ned long*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) (functional:2030)
==31050==    by 0x607CA5: rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions&) (db_impl.cc:1882)
==31050==    by 0x61F638: rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperan
dsOptions*, int*) (db_impl.h:192)

And this is reading from a flushed memtable (reproduced on rocksdb 6.5.2, on version 6.25.3 I couldn't catch it)

==6741== Invalid read of size 8
==6741==    at 0x4C2E8AF: memcpy@@GLIBC_2.14 (vg_replace_strmem.c:1035)
==6741==    by 0x573B54A: std::string::_M_replace_safe(unsigned long, unsigned long, char const*, unsigned long) (in /usr/lib64/libstdc++.so.6.0.19)
==6741==    by 0x5A1FF3: rocksdb::DBImpl::GetImpl(rocksdb::ReadOptions const&, rocksdb::Slice const&, rocksdb::DBImpl::GetImplOptions) (in  rocksdb/test)
==6741==    by 0x5B9E69: rocksdb::DBImpl::GetMergeOperands(rocksdb::ReadOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::PinnableSlice*, rocksdb::GetMergeOperandsOptions*, int*) (in  rocksdb/test)
==6741==  Address 0xa46bb58 is 72 bytes inside a block of size 9,424 free'd
==6741==    at 0x4C2BB8F: operator delete[](void*) (vg_replace_malloc.c:651)
==6741==    by 0x4C707C: rocksdb::Arena::~Arena() (in  rocksdb/test)
==6741==    by 0x467378: rocksdb::MemTable::~MemTable() (in  rocksdb/test)
==6741==    by 0x58106C: rocksdb::SuperVersion::~SuperVersion() (in  rocksdb/test)
==6741==    by 0x5E427C: rocksdb::DBImpl::BackgroundCallFlush(rocksdb::Env::Priority) (in  rocksdb/test)
==6741==    by 0x675B63: rocksdb::ThreadPoolImpl::Impl::BGThread(unsigned long) (in  rocksdb/test)
==6741==    by 0x675CDD: rocksdb::ThreadPoolImpl::Impl::BGThreadWrapper(void*) (in  rocksdb/test)
==6741==  Block was alloc'd at
==6741==    at 0x4C2AC38: operator new[](unsigned long) (vg_replace_malloc.c:433)
==6741==    by 0x4C74D2: rocksdb::Arena::AllocateNewBlock(unsigned long) (in  rocksdb/test)
==6741==    by 0x469B73: rocksdb::ConcurrentArena::AllocateAligned(unsigned long, unsigned long, rocksdb::Logger*) (in  rocksdb/test)
==6741==    by 0x4C7EBD: rocksdb::(anonymous namespace)::SkipListRep::Allocate(unsigned long, char**) (in  rocksdb/test)
==6741==    by 0x465CCA: rocksdb::MemTable::Add(unsigned long, rocksdb::ValueType, rocksdb::Slice const&, rocksdb::Slice const&, bool, rocksdb::MemTablePostProcessInfo*, void**) (in  rocksdb/test)
==6741==    by 0x4B08F2: rocksdb::MemTableInserter::MergeCF(unsigned int, rocksdb::Slice const&, rocksdb::Slice const&) (in  rocksdb/test)
==6741==    by 0x4A9617: rocksdb::WriteBatchInternal::Iterate(rocksdb::WriteBatch const*, rocksdb::WriteBatch::Handler*, unsigned long, unsigned long) (in  rocksdb/test)
==6741==    by 0x4A9DB9: rocksdb::WriteBatch::Iterate(rocksdb::WriteBatch::Handler*) const (in  rocksdb/test)
==6741==    by 0x4ACFF8: rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) (in  rocksdb/test)
==6741==    by 0x5CFCE3: rocksdb::DBImpl::WriteImpl(rocksdb::WriteOptions const&, rocksdb::WriteBatch*, rocksdb::WriteCallback*, unsigned long*, unsigned long, bool, unsigned long*, unsigned long, rocksdb::PreReleaseCallback*) (in  rocksdb/test)
==6741==    by 0x5D0921: rocksdb::DBImpl::Write(rocksdb::WriteOptions const&, rocksdb::WriteBatch*) (in  rocksdb/test)
==6741==    by 0x5D0E50: rocksdb::DB::Merge(rocksdb::WriteOptions const&, rocksdb::ColumnFamilyHandle*, rocksdb::Slice const&, rocksdb::Slice const&) (in  rocksdb/test)

This is how I build the test (the file test.cc is in rocksdb dir):

g++ --std=c++11 -Iinclude test.cc librocksdb.a -lzstd -lsnappy -lz -ldl -lbz2 -lpthread -o test

The source:

#include <cassert>
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"

#include <string>
#include <thread>

// a simple string appending merge operator
class StringAppendOperator : public rocksdb::AssociativeMergeOperator {
 public:
  virtual bool Merge(
    const rocksdb::Slice& key,
    const rocksdb::Slice* existing_value,
    const rocksdb::Slice& value,
    std::string* new_value,
    rocksdb::Logger* logger) const override {

    if (existing_value)
        new_value->append(existing_value->ToString());
    return true;
  }

  virtual const char* Name() const override {
    return "StringAppendOperator";
   }
};

int main()
{
    rocksdb::DB* db;
    rocksdb::Options options;
    options.create_if_missing = true;
    options.merge_operator.reset(new StringAppendOperator);
    options.write_buffer_size = 128 * 1024;
    options.max_write_buffer_number = 16;
    options.min_write_buffer_number_to_merge = 4;
    rocksdb::Status status = rocksdb::DB::Open(options, "/tmp/testdb", &db);

    assert(status.ok());

    //
    // Let's crash it!
    //

    const size_t KEYS = 100;
    const size_t COMMITS = 1000;
    const size_t READERS = 5;
    std::atomic<bool> finished{false};

    // writer thread
    std::thread writer([&]()
    {
        for (size_t i = 0; i < COMMITS; i++)
        {
            rocksdb::WriteOptions opts;
            size_t SEED = rand();
            for (unsigned long num = 0; num < 10000; num++)
                db->Merge(opts, std::string("KEY") + std::to_string(num % KEYS), std::string((SEED % 10000) + 10, 'x'));
            //db->Flush(rocksdb::FlushOptions());
        }
        // tell the readers to stop
        finished = true;
    });

    // single read
    auto one_read = [KEYS, db](const std::string & key) -> std::string
    {
        rocksdb::GetMergeOperandsOptions opts;
        rocksdb::ReadOptions ropts;

        int number_of_operands = 10;
        std::vector<rocksdb::PinnableSlice> pinned(number_of_operands);

        rocksdb::Status s;

        for (int i = 0; i < 10; i++)    // protection of deadloop - 10 attempts only
        {
            opts.expected_max_number_of_operands = pinned.size();
            s = db->GetMergeOperands(ropts, db->DefaultColumnFamily(), key, pinned.data(), &opts, &number_of_operands);

            if (s.IsIncomplete() ||
                (size_t)number_of_operands > pinned.size())     // paranoia
            {
                // if given number is not enough
                // take the returned number plus some reserve
                pinned = std::vector<rocksdb::PinnableSlice>(number_of_operands + 10);
            }
            else
                break;
        }

        if (s.IsNotFound())
            return "";

        std::string res;
        for (size_t i = 0; i < (size_t)number_of_operands; i++)
            res.append(pinned[i].data(), pinned[i].size());

        return res;
    };

    // reader threads
    auto reader_func = [db, &finished, &one_read]()
    {
        while (!finished)
            for (int i = 0; i < 100; i++)
                one_read(std::string("KEY") + std::to_string(i % KEYS));
    };
    std::vector<std::thread> readers;
    for (size_t i = 0; i < READERS; i++)
        readers.emplace_back(reader_func);

    // Wait for completion
    writer.join();
    for (auto & r : readers)
        r.join();

    delete db;
} 

And this is a patch fixing the problem. But I'm not sure it is totally correct.

diff --git a/db/memtable.cc b/db/memtable.cc
index dd6604514..e82655d2d 100644
--- a/db/memtable.cc
+++ b/db/memtable.cc
@@ -694,13 +694,13 @@ static bool SaveValue(void* arg, const char* entry) {
             // Preserve the value with the goal of returning it as part of
             // raw merge operands to the user
             merge_context->PushOperand(
-                v, s->inplace_update_support == false /* operand_pinned */);
+                v, false /* operand_pinned */);
           }
         } else if (!s->do_merge) {
           // Preserve the value with the goal of returning it as part of
           // raw merge operands to the user
           merge_context->PushOperand(
-              v, s->inplace_update_support == false /* operand_pinned */);
+              v, false /* operand_pinned */);
         } else if (s->value != nullptr) {
           s->value->assign(v.data(), v.size());
         }
@@ -743,7 +743,7 @@ static bool SaveValue(void* arg, const char* entry) {
         Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
         *(s->merge_in_progress) = true;
         merge_context->PushOperand(
-            v, s->inplace_update_support == false /* operand_pinned */);
+            v, s->do_merge /* operand_pinned */);
         if (s->do_merge && merge_operator->ShouldMerge(
                                merge_context->GetOperandsDirectionBackward())) {
           *(s->status) = MergeHelper::TimedFullMerge(
diff --git a/db/version_set.cc b/db/version_set.cc
index 36e9c527d..45a2f221f 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -1730,7 +1730,7 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
       tracing_get_id);

   // Pin blocks that we read to hold merge operands
-  if (merge_operator_) {
+  if (merge_operator_ && do_merge) {
     pinned_iters_mgr.StartPinning();
   }

Metadata

Metadata

Assignees

Labels

bugConfirmed RocksDB bugs

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions