Skip to content

Commit c42e5f7

Browse files
obdevfootkagaopy3
authored andcommitted
using ObLinkQueueThreadPool refactor io callback threads
Co-authored-by: footka <672528926@qq.com> Co-authored-by: gaopy3 <gao.panyu@qq.com>
1 parent 9e2b6ba commit c42e5f7

6 files changed

Lines changed: 90 additions & 447 deletions

File tree

src/share/io/ob_io_define.cpp

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -418,12 +418,6 @@ bool ObIOFlag::is_need_close_dev_and_fd() const
418418
return need_close_dev_and_fd_;
419419
}
420420

421-
422-
bool oceanbase::common::is_atomic_write_callback(const ObIOCallbackType type)
423-
{
424-
return (ObIOCallbackType::ATOMIC_WRITE_CALLBACK == type);
425-
}
426-
427421
/****************** IOCallback **********************/
428422
ObIOCallback::ObIOCallback(const ObIOCallbackType type)
429423
: type_(type), compat_mode_(static_cast<lib::Worker::CompatMode>(lib::get_compat_mode()))
@@ -2265,10 +2259,9 @@ int ObTenantIOConfig::get_group_config(const uint64_t index, int64_t &min, int64
22652259

22662260
int64_t ObTenantIOConfig::get_callback_thread_count() const
22672261
{
2268-
int64_t memory_benchmark = 4L * 1024L * 1024L * 1024L; //4G memory
2269-
//Based on 4G memory, one thread will be added for each additional 4G of memory, and the maximum number of callback_thread_count is 16
2270-
int64_t callback_thread_num = 0 == param_config_.callback_thread_count_? min(16, (param_config_.memory_limit_ / memory_benchmark) + 1) : param_config_.callback_thread_count_;
2271-
LOG_INFO("get callback thread by memory success", K(param_config_.memory_limit_), K(callback_thread_num));
2262+
const int64_t DEFAULT_CALLBACK_THREAD_COUNT = 16;
2263+
int64_t callback_thread_num = 0 == param_config_.callback_thread_count_? DEFAULT_CALLBACK_THREAD_COUNT : param_config_.callback_thread_count_;
2264+
LOG_INFO("get callback thread success", K(callback_thread_num));
22722265
return callback_thread_num;
22732266
}
22742267

src/share/io/ob_io_define.h

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "lib/profile/ob_trace_id.h"
2828
#include "lib/restore/ob_storage.h"
2929
#include "lib/tc/ob_tc.h"
30+
#include "lib/thread/thread_mgr_interface.h"
3031
#include "lib/worker.h"
3132
#include "share/resource_manager/ob_resource_plan_info.h"
3233
#include "storage/ob_storage_checked_object_base.h"
@@ -50,6 +51,7 @@ namespace common
5051
{
5152

5253
class ObObjectDevice;
54+
class ObIOCallbackManager;
5355

5456
// the timestamp adjustment will not adjust until the queue is idle for more than this time
5557
static constexpr int64_t CLOCK_IDLE_THRESHOLD_US = 100 * 1000L; // 100ms
@@ -215,25 +217,22 @@ struct ObIOFlag final
215217

216218
// different io callback types enqueue different io callback thread queue
217219
enum class ObIOCallbackType : uint8_t {
218-
ATOMIC_WRITE_CALLBACK = 0,
219-
ASYNC_SINGLE_MICRO_BLOCK_CALLBACK = 1,
220-
MULTI_DATA_BLOCK_CALLBACK = 2,
221-
SYNC_SINGLE_MICRO_BLOCK_CALLBACK = 3,
222-
SS_CACHE_LOAD_FROM_REMOTE_CALLBACK = 4,
223-
SS_CACHE_LOAD_FROM_LOCAL_CALLBACK = 5,
224-
SS_MC_PREWARM_CALLBACK = 6,
225-
STORAGE_META_CALLBACK = 7,
226-
TMP_PAGE_CALLBACK = 8,
227-
TMP_MULTI_PAGE_CALLBACK = 9,
228-
TMP_DIRECT_READ_PAGE_CALLBACK = 10,
229-
TEST_CALLBACK = 11, // just for unittest
230-
SS_TMP_FILE_CALLBACK = 12,
231-
TMP_CACHED_READ_CALLBACK = 13,
232-
MAX_CALLBACK_TYPE = 14
220+
ASYNC_SINGLE_MICRO_BLOCK_CALLBACK = 0,
221+
MULTI_DATA_BLOCK_CALLBACK = 1,
222+
SYNC_SINGLE_MICRO_BLOCK_CALLBACK = 2,
223+
SS_CACHE_LOAD_FROM_REMOTE_CALLBACK = 3,
224+
SS_CACHE_LOAD_FROM_LOCAL_CALLBACK = 4,
225+
SS_MC_PREWARM_CALLBACK = 5,
226+
STORAGE_META_CALLBACK = 6,
227+
TMP_PAGE_CALLBACK = 7,
228+
TMP_MULTI_PAGE_CALLBACK = 8,
229+
TMP_DIRECT_READ_PAGE_CALLBACK = 9,
230+
TEST_CALLBACK = 10, // just for unittest
231+
SS_TMP_FILE_CALLBACK = 11,
232+
TMP_CACHED_READ_CALLBACK = 12,
233+
MAX_CALLBACK_TYPE = 13
233234
};
234235

235-
bool is_atomic_write_callback(const ObIOCallbackType type);
236-
237236
class ObIOCallback
238237
{
239238
public:
@@ -528,7 +527,7 @@ class ObIOResult final
528527
friend class ObTenantIOManager;
529528
friend class ObAsyncIOChannel;
530529
friend class ObSyncIOChannel;
531-
friend class ObIORunner;
530+
friend class ObIOCallbackManager;
532531
friend class backup::ObBackupDeviceHelper;
533532
bool is_inited_;
534533
bool is_finished_;
@@ -602,7 +601,7 @@ class ObIORequest : public common::ObDLinkBase<ObIORequest>
602601
friend class ObDeviceChannel;
603602
friend class ObIOManager;
604603
friend class ObIOResult;
605-
friend class ObIORunner;
604+
friend class ObIOCallbackManager;
606605
friend class ObMClockQueue;
607606
friend class ObAsyncIOChannel;
608607
friend class ObSyncIOChannel;
@@ -618,6 +617,7 @@ class ObIORequest : public common::ObDLinkBase<ObIORequest>
618617
int hold_storage_accesser(const ObIOFd &fd, ObObjectDevice &object_device);
619618
int calc_io_offset_and_size_();
620619
public:
620+
common::LinkTask req_node_;
621621
ObIOResult *io_result_;
622622
TCRequest qsched_req_;
623623
protected:

src/share/io/ob_io_manager.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1366,7 +1366,8 @@ int ObTenantIOManager::start()
13661366
LOG_WARN("not init", K(ret), K(is_inited_));
13671367
} else if (is_working()) {
13681368
// do nothing
1369-
} else if (OB_FAIL(callback_mgr_.init(tenant_id_, callback_thread_count, DEFAULT_QUEUE_DEPTH, &io_allocator_))) {
1369+
} else if (OB_FAIL(callback_mgr_.init(tenant_id_, callback_thread_count,
1370+
callback_thread_count * DEFAULT_QUEUE_DEPTH))) {
13701371
LOG_WARN("init callback manager failed", K(ret), K(tenant_id_), K(callback_thread_count));
13711372
} else {
13721373
is_working_ = true;

0 commit comments

Comments
 (0)