diff --git a/src/Common/MemoryTracker.cpp b/src/Common/MemoryTracker.cpp index 03bd8be94f..445109f8d0 100644 --- a/src/Common/MemoryTracker.cpp +++ b/src/Common/MemoryTracker.cpp @@ -11,6 +11,12 @@ #include #include #include +#include +#include +#include +#include +#include +#include namespace DB @@ -59,12 +65,67 @@ void MemoryTracker::logMemoryUsage(Int64 current) const LOG_DEBUG(&Poco::Logger::get("MemoryTracker"), "Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current)); } +/* +static void profileMemoryUsage(Int64) {} +/*/ +static void profileMemoryUsage(Int64 size) +{ + auto * server = dynamic_cast(&Poco::Util::Application::instance()); + if (!server) + { + return; + } + + auto * context = &server->context(); + if (!context) + { + return; + } + + auto trace_log = context->getTraceLog(); + if (!trace_log) + { + return; + } + + if (!DB::CurrentThread::isInitialized()) + { + return; + } + + + DB::TraceLogElement element; + element.event_time = std::time(nullptr); + element.trace_type = DB::TraceType::Memory; + element.thread_id = DB::CurrentThread::get().thread_id; + element.query_id = DB::CurrentThread::getQueryId().toString(); + + const StackTrace trace; + element.trace.reserve(trace.getSize()); + for (size_t i = 0; i < trace.getSize(); i++) + { + element.trace.emplace_back(trace.getFramePointers()[i]); + } + + element.size = size; + //element.address = reinterpret_cast(address); + + + trace_log->add(element); +} +//*/ void MemoryTracker::alloc(Int64 size) { if (blocker.isCancelled()) return; + if (trace_memory && level == VariableContext::Global) + { + auto untrack_lock = blocker.cancel(); + profileMemoryUsage(size); + } + /** Using memory_order_relaxed means that if allocations are done simultaneously, * we allow exception about memory limit exceeded to be thrown only on next allocation. * So, we allow over-allocations. @@ -154,6 +215,12 @@ void MemoryTracker::free(Int64 size) if (blocker.isCancelled()) return; + if (trace_memory && level == VariableContext::Global) + { + auto untrack_lock = blocker.cancel(); + profileMemoryUsage(-size); + } + std::bernoulli_distribution sample(sample_probability); if (unlikely(sample_probability && sample(thread_local_rng))) { diff --git a/src/Common/MemoryTracker.h b/src/Common/MemoryTracker.h index 8af683ae79..92f25b824b 100644 --- a/src/Common/MemoryTracker.h +++ b/src/Common/MemoryTracker.h @@ -41,6 +41,8 @@ private: void logMemoryUsage(Int64 current) const; public: + bool trace_memory = false; + MemoryTracker(VariableContext level_ = VariableContext::Thread); MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread); diff --git a/src/Common/StackTrace.h b/src/Common/StackTrace.h index 3ae4b96483..a9aea819fe 100644 --- a/src/Common/StackTrace.h +++ b/src/Common/StackTrace.h @@ -34,7 +34,7 @@ public: std::optional file; std::optional line; }; - static constexpr size_t capacity = 32; + static constexpr size_t capacity = 96; using FramePointers = std::array; using Frames = std::array; diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index a3394dc745..b478d9ec3a 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -100,7 +100,7 @@ public: /// Small amount of untracked memory (per thread atomic-less counter) Int64 untracked_memory = 0; /// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters. - Int64 untracked_memory_limit = 4 * 1024 * 1024; + Int64 untracked_memory_limit = 0; //4 * 1024 * 1024; /// Statistics of read and write rows/bytes Progress progress_in; diff --git a/src/Common/TraceCollector.h b/src/Common/TraceCollector.h index 86e9d659d0..f63f47147d 100644 --- a/src/Common/TraceCollector.h +++ b/src/Common/TraceCollector.h @@ -34,7 +34,6 @@ public: /// size - for memory tracing is the amount of memory allocated; for other trace types it is 0. static void collect(TraceType trace_type, const StackTrace & stack_trace, Int64 size); -private: std::shared_ptr trace_log; ThreadFromGlobalPool thread; diff --git a/src/Core/Field.h b/src/Core/Field.h index 152ae29bd1..2b244092d9 100644 --- a/src/Core/Field.h +++ b/src/Core/Field.h @@ -187,6 +187,7 @@ template <> struct NearestFieldTypeImpl> { using Type = template <> struct NearestFieldTypeImpl { using Type = Float64; }; template <> struct NearestFieldTypeImpl { using Type = Float64; }; template <> struct NearestFieldTypeImpl { using Type = String; }; +template <> struct NearestFieldTypeImpl { using Type = NearestFieldType; }; template <> struct NearestFieldTypeImpl { using Type = String; }; template <> struct NearestFieldTypeImpl { using Type = Array; }; template <> struct NearestFieldTypeImpl { using Type = Tuple; }; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 9e8f96aa52..3c15204f12 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -304,6 +304,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \ M(SettingOverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ \ + M(SettingBool, trace_memory, false, "Trace memory allocations.", 0) \ M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.", 0) \ M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.", 0) \ M(SettingUInt64, max_untracked_memory, (4 * 1024 * 1024), "Small allocations and deallocations are grouped in thread local variable and tracked or profiled only when amount (in absolute value) becomes larger than specified value. If the value is higher than 'memory_profiler_step' it will be effectively lowered to 'memory_profiler_step'.", 0) \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 2400e37441..cfbd3eab14 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -981,6 +982,11 @@ void Context::setSetting(const StringRef & name, const Field & value) if (name == "readonly" || name == "allow_ddl" || name == "allow_introspection_functions") calculateAccessRights(); + + if (name == "trace_memory") + { + total_memory_tracker.trace_memory = settings.get(name).get(); + } } @@ -1660,7 +1666,7 @@ std::shared_ptr Context::getPartLog(const String & part_database) std::shared_ptr Context::getTraceLog() { - auto lock = getLock(); + //auto lock = getLock(); if (!shared->system_logs) return {}; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 5a4e959229..f29959bb95 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -524,7 +525,7 @@ public: /// Nullptr if the query log is not ready for this moment. std::shared_ptr getQueryLog(); std::shared_ptr getQueryThreadLog(); - std::shared_ptr getTraceLog(); + __attribute__((weak)) std::shared_ptr getTraceLog(); std::shared_ptr getTextLog(); std::shared_ptr getMetricLog(); std::shared_ptr getAsynchronousMetricLog(); diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index e3e695f80f..e254be7213 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -90,6 +90,7 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_) untracked_memory_limit = settings.max_untracked_memory; if (settings.memory_profiler_step && settings.memory_profiler_step < UInt64(untracked_memory_limit)) untracked_memory_limit = settings.memory_profiler_step; + untracked_memory_limit = 0; #if defined(OS_LINUX) /// Set "nice" value if required.