Skip to content

Commit 87ab54c

Browse files
obdevhnwyllmm
authored andcommitted
Reduce IK Parser Load Time
Co-authored-by: hnwyllmm <hnwyllmm@126.com>
1 parent 5226ac4 commit 87ab54c

4 files changed

Lines changed: 197 additions & 30 deletions

File tree

src/storage/fts/dict/ob_dic_loader.cpp

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -174,36 +174,8 @@ int ObTenantDicLoader::try_load_dictionary_in_trans(const uint64_t tenant_id)
174174
int ObTenantDicLoader::check_need_load_dic(const uint64_t tenant_id, bool &is_need_load_dic)
175175
{
176176
int ret = OB_SUCCESS;
177-
ObSqlString sql;
177+
// we keep the code here even though we don't load data into system table anymore.
178178
is_need_load_dic = false;
179-
if (OB_UNLIKELY(!is_inited_)) {
180-
ret = OB_NOT_INIT;
181-
LOG_WARN("the dic loader is not initialized", K(ret), K(tenant_id));
182-
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
183-
ret = OB_INVALID_ARGUMENT;
184-
LOG_WARN("invalid tenant id", K(ret), K(tenant_id));
185-
} else {
186-
SMART_VAR(ObMySQLProxy::MySQLResult, res)
187-
{
188-
if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s LIMIT 1", dic_tables_info_.at(0).table_name_))) {
189-
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
190-
} else if (OB_FAIL(GCTX.sql_proxy_->read(res, tenant_id, sql.ptr()))) {
191-
LOG_WARN("fail to execute sql", KR(ret), K(sql), K(tenant_id));
192-
} else if (OB_ISNULL(res.get_result())) {
193-
ret = OB_ERR_UNEXPECTED;
194-
LOG_WARN("fail to get sql result", KR(ret), K(sql), K(tenant_id));
195-
} else if (OB_FAIL(res.get_result()->next())) {
196-
if (OB_ITER_END != ret) {
197-
LOG_WARN("fail to get next row", KR(ret), K(tenant_id));
198-
} else {
199-
ret = OB_SUCCESS;
200-
is_need_load_dic = true;
201-
}
202-
} else {
203-
is_need_load_dic = false;
204-
}
205-
}
206-
}
207179
return ret;
208180
}
209181

src/storage/fts/dict/ob_ft_dict_hub.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ int ObFTDictHub::build_cache(const ObFTDictDesc &desc, ObFTCacheRangeContainer &
7878

7979
if (OB_FAIL(ret)) {
8080
if (OB_ENTRY_NOT_EXIST == ret) {
81-
if (OB_FAIL(ObFTRangeDict::build_cache(desc, container))) {
81+
if (OB_FAIL(ObFTRangeDict::build_cache_from_ik_dict(desc, container))) {
8282
LOG_WARN("Failed to build cache", K(ret));
8383
} else if (FALSE_IT(info.range_count_ = container.get_handles().size())) {
8484
} else if (OB_FAIL(put_dict_info(key, info))) {

src/storage/fts/dict/ob_ft_range_dict.cpp

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "storage/fts/dict/ob_ft_range_dict.h"
1818

19+
#include "storage/fts/dict/ob_ik_dic.h"
1920
#include "lib/allocator/page_arena.h"
2021
#include "lib/charset/ob_charset.h"
2122
#include "lib/mysqlclient/ob_isql_client.h"
@@ -41,6 +42,194 @@ namespace oceanbase
4142
{
4243
namespace storage
4344
{
45+
46+
47+
int ObFTRangeDict::build_cache_from_ik_dict(const ObFTDictDesc &desc, ObFTCacheRangeContainer &range_container)
48+
{
49+
int ret = OB_SUCCESS;
50+
51+
ObIKDictLoader::RawDict raw_dict;
52+
switch (desc.type_) {
53+
case ObFTDictType::DICT_IK_MAIN: {
54+
raw_dict = ObIKDictLoader::dict_text();
55+
} break;
56+
case ObFTDictType::DICT_IK_QUAN: {
57+
raw_dict = ObIKDictLoader::dict_quen_text();
58+
} break;
59+
case ObFTDictType::DICT_IK_STOP: {
60+
raw_dict = ObIKDictLoader::dict_stop();
61+
} break;
62+
default:
63+
ret = OB_NOT_SUPPORTED;
64+
LOG_WARN("Not supported dict type.", K(ret));
65+
}
66+
67+
if (OB_SUCC(ret)) {
68+
ObIKDictIterator iter(raw_dict);
69+
if (OB_FAIL(iter.init())) {
70+
LOG_WARN("Failed to init iterator.", K(ret));
71+
} else if (OB_FAIL(ObFTRangeDict::build_ranges_concurrently_thread_pool(desc, iter, range_container))) {
72+
LOG_WARN("Failed to build ranges.", K(ret));
73+
}
74+
}
75+
76+
return ret;
77+
}
78+
79+
// Thread pool for building DATs concurrently
80+
class DATBuilderThreadPool : public lib::Threads
81+
{
82+
public:
83+
DATBuilderThreadPool()
84+
: all_tries_(nullptr),
85+
desc_(nullptr),
86+
container_(nullptr),
87+
error_code_(OB_SUCCESS)
88+
{}
89+
90+
void set_tries(ObVector<ObFTTrie<void> *, ObArenaAllocator> *tries) { all_tries_ = tries; }
91+
void set_desc(const ObFTDictDesc *desc) { desc_ = desc; }
92+
void set_container(ObFTCacheRangeContainer *container) { container_ = container; }
93+
int64_t get_range_count() const { return all_tries_ ? all_tries_->size() : 0; }
94+
int get_error_code() const { return error_code_.load(); }
95+
96+
void run1() override
97+
{
98+
int ret = OB_SUCCESS;
99+
int64_t idx = get_thread_idx();
100+
101+
if (OB_ISNULL(all_tries_) || idx >= static_cast<int64_t>(all_tries_->size())) {
102+
return;
103+
}
104+
105+
ObFTTrie<void> *trie = (*all_tries_)[idx];
106+
ObArenaAllocator dat_alloc(lib::ObMemAttr(OB_SERVER_TENANT_ID, "DATBuild"));
107+
ObFTDATBuilder<void> builder(dat_alloc);
108+
109+
ObFTDAT *dat_buff = nullptr;
110+
size_t buffer_size = 0;
111+
ObFTCacheRangeHandle *info = nullptr;
112+
113+
if (OB_FAIL(builder.init(*trie))) {
114+
LOG_WARN("Failed to init builder.", K(ret), K(idx));
115+
} else if (OB_FAIL(builder.build_from_trie(*trie))) {
116+
LOG_WARN("Failed to build datrie.", K(ret), K(idx));
117+
} else if (OB_FAIL(builder.get_mem_block(dat_buff, buffer_size))) {
118+
LOG_WARN("Failed to get mem block.", K(ret), K(idx));
119+
} else if (OB_FAIL(container_->fetch_info_for_dict(info))) {
120+
LOG_WARN("Failed to fetch info for dict.", K(ret), K(idx));
121+
} else if (OB_FAIL(ObFTCacheDict::make_and_fetch_cache_entry(*desc_,
122+
dat_buff,
123+
buffer_size,
124+
static_cast<int32_t>(idx),
125+
info->value_,
126+
info->handle_))) {
127+
LOG_WARN("Failed to put dict into kv cache", K(ret), K(idx));
128+
}
129+
130+
if (OB_FAIL(ret)) {
131+
int expected = OB_SUCCESS;
132+
error_code_.compare_exchange_strong(expected, ret);
133+
}
134+
135+
dat_alloc.reset();
136+
}
137+
138+
private:
139+
ObVector<ObFTTrie<void> *, ObArenaAllocator> *all_tries_;
140+
const ObFTDictDesc *desc_;
141+
ObFTCacheRangeContainer *container_;
142+
std::atomic<int> error_code_;
143+
};
144+
145+
int ObFTRangeDict::build_ranges_concurrently_thread_pool(const ObFTDictDesc &desc,
146+
ObIFTDictIterator &iter,
147+
ObFTCacheRangeContainer &range_container)
148+
{
149+
int ret = OB_SUCCESS;
150+
151+
// Phase 1: Collect words into tries range by range
152+
ObArenaAllocator tmp_alloc(lib::ObMemAttr(MTL_ID(), "Tmp Allocator"));
153+
ObVector<ObFTTrie<void> *, ObArenaAllocator> all_tries(&tmp_alloc);
154+
155+
bool build_next_range = true;
156+
while (OB_SUCC(ret) && build_next_range) {
157+
ObFTTrie<void> *trie = OB_NEWx(ObFTTrie<void>, &tmp_alloc, tmp_alloc, desc.coll_type_);
158+
if (OB_ISNULL(trie)) {
159+
ret = OB_ALLOCATE_MEMORY_FAILED;
160+
LOG_WARN("Failed to allocate ObFTTrie", K(ret));
161+
break;
162+
}
163+
164+
int count = 0;
165+
int64_t first_char_len = 0;
166+
ObFTSingleWord end_char;
167+
bool range_end = false;
168+
169+
while (OB_SUCC(ret) && !range_end) {
170+
ObString key;
171+
if (OB_FAIL(iter.get_key(key))) {
172+
LOG_WARN("Failed to get key", K(ret));
173+
} else {
174+
++count;
175+
176+
if (count >= DEFAULT_KEY_PER_RANGE
177+
&& OB_FAIL(ObCharset::first_valid_char(desc.coll_type_,
178+
key.ptr(),
179+
key.length(),
180+
first_char_len))) {
181+
LOG_WARN("First char is not valid.");
182+
} else if (DEFAULT_KEY_PER_RANGE == count
183+
&& OB_FAIL(end_char.set_word(key.ptr(), first_char_len))) {
184+
LOG_WARN("Failed to record first char.", K(ret));
185+
} else if (count > DEFAULT_KEY_PER_RANGE
186+
&& (end_char.get_word() != ObString(first_char_len, key.ptr()))) {
187+
range_end = true;
188+
} else {
189+
if (OB_FAIL(trie->insert(key, {}))) {
190+
LOG_WARN("Failed to insert key to trie", K(ret));
191+
} else if (OB_FAIL(iter.next()) && OB_ITER_END != ret) {
192+
LOG_WARN("Failed to step to next word entry.", K(ret));
193+
}
194+
}
195+
}
196+
}
197+
198+
if (OB_ITER_END == ret) {
199+
build_next_range = false;
200+
ret = OB_SUCCESS;
201+
}
202+
203+
if (OB_SUCC(ret) && trie->node_num() > 0) {
204+
if (OB_FAIL(all_tries.push_back(trie))) {
205+
LOG_WARN("Failed to push back trie", K(ret));
206+
}
207+
}
208+
}
209+
210+
// Phase 2: Build DATs concurrently using DATBuilderThreadPool
211+
if (OB_SUCC(ret) && all_tries.size() > 0) {
212+
DATBuilderThreadPool pool;
213+
pool.set_tries(&all_tries);
214+
pool.set_desc(&desc);
215+
pool.set_container(&range_container);
216+
pool.set_thread_count(static_cast<int64_t>(all_tries.size()));
217+
218+
if (OB_FAIL(pool.start())) {
219+
LOG_WARN("Failed to start thread pool", K(ret));
220+
} else {
221+
pool.wait();
222+
ret = pool.get_error_code();
223+
if (OB_FAIL(ret)) {
224+
LOG_WARN("Thread pool encountered error", K(ret));
225+
}
226+
}
227+
}
228+
229+
LOG_INFO("build_ranges_concurrently_thread_pool completed", K(ret), K(all_tries.size()));
230+
return ret;
231+
}
232+
44233
int ObFTRangeDict::build_one_range(const ObFTDictDesc &desc,
45234
const int32_t range_id,
46235
ObIFTDictIterator &iter,

src/storage/fts/dict/ob_ft_range_dict.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class ObFTRangeDict final : public ObIFTDict
6868
ObFTCacheRangeContainer &range_container);
6969
static int build_cache(const ObFTDictDesc &desc, ObFTCacheRangeContainer &range_container);
7070

71+
static int build_cache_from_ik_dict(const ObFTDictDesc &desc, ObFTCacheRangeContainer &range_container);
72+
7173
private:
7274
// build cache
7375
static int build_ranges(const ObFTDictDesc &desc,
@@ -81,6 +83,10 @@ class ObFTRangeDict final : public ObIFTDict
8183
ObFTCacheRangeContainer &container,
8284
bool &build_next_range);
8385

86+
static int build_ranges_concurrently_thread_pool(const ObFTDictDesc &desc,
87+
ObIFTDictIterator &iter,
88+
ObFTCacheRangeContainer &range_container);
89+
8490
private:
8591
void destroy()
8692
{

0 commit comments

Comments
 (0)