Skip to content

Commit b5d96ef

Browse files
obdevhnwyllmm
authored andcommitted
fix loading ik words concurrency bug
Co-authored-by: hnwyllmm <hnwyllmm@126.com>
1 parent 0e6a73a commit b5d96ef

3 files changed

Lines changed: 66 additions & 45 deletions

File tree

src/storage/fts/dict/ob_ft_cache.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class ObDictCacheKey : public common::ObIKVCacheKey
8989
return ret;
9090
}
9191

92+
TO_STRING_KV(K_(name), K_(tenant), K_(dict_type), K_(range_id));
9293
private:
9394
// to change to name
9495
uint64_t name_; // when build dict

src/storage/fts/dict/ob_ft_cache_container.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct ObFTCacheRangeHandle
3636
const ObDictCacheKey *key_;
3737
const ObDictCacheValue *value_;
3838

39+
TO_STRING_KV(K_(type), K_(handle), KPC_(key), KP_(value));
3940
public:
4041
ObFTCacheRangeHandle()
4142
: type_(ObFTDictType::DICT_TYPE_INVALID), handle_(), key_(nullptr), value_(nullptr)

src/storage/fts/dict/ob_ft_range_dict.cpp

Lines changed: 64 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -83,63 +83,64 @@ class DATBuilderThreadPool : public lib::Threads
8383
DATBuilderThreadPool()
8484
: all_tries_(nullptr),
8585
desc_(nullptr),
86-
container_(nullptr),
87-
error_code_(OB_SUCCESS)
86+
error_code_(OB_SUCCESS),
87+
handles_(nullptr)
8888
{}
8989

9090
void set_tries(ObVector<ObFTTrie<void> *, ObArenaAllocator> *tries) { all_tries_ = tries; }
9191
void set_desc(const ObFTDictDesc *desc) { desc_ = desc; }
92-
void set_container(ObFTCacheRangeContainer *container) { container_ = container; }
9392
int64_t get_range_count() const { return all_tries_ ? all_tries_->size() : 0; }
9493
int get_error_code() const { return error_code_.load(); }
95-
94+
void set_handles(ObArray<ObFTCacheRangeHandle *> *handles) { handles_ = handles; }
9695
void run1() override
9796
{
9897
int ret = OB_SUCCESS;
9998
int64_t idx = get_thread_idx();
10099

101-
if (OB_ISNULL(all_tries_) || idx >= static_cast<int64_t>(all_tries_->size())) {
102-
return;
103-
}
104-
105-
ObFTTrie<void> *trie = (*all_tries_)[idx];
106-
ObArenaAllocator dat_alloc(lib::ObMemAttr(OB_SERVER_TENANT_ID, "DATBuild"));
107-
ObFTDATBuilder<void> builder(dat_alloc);
108-
109-
ObFTDAT *dat_buff = nullptr;
110-
size_t buffer_size = 0;
111-
ObFTCacheRangeHandle *info = nullptr;
100+
if (OB_ISNULL(all_tries_) || idx >= static_cast<int64_t>(all_tries_->size()) || OB_ISNULL(all_tries_->at(idx))) {
101+
ret = OB_ARRAY_OUT_OF_RANGE;
102+
LOG_WARN("all_tries_ is null or idx is out of range", K(idx), K(all_tries_->size()));
103+
} else if (OB_ISNULL(handles_) || idx >= static_cast<int64_t>(handles_->size()) || OB_ISNULL(handles_->at(idx))) {
104+
ret = OB_ARRAY_OUT_OF_RANGE;
105+
LOG_WARN("handles_ is null or idx is out of range", K(idx), K(handles_->size()));
106+
} else {
112107

113-
if (OB_FAIL(builder.init(*trie))) {
114-
LOG_WARN("Failed to init builder.", K(ret), K(idx));
115-
} else if (OB_FAIL(builder.build_from_trie(*trie))) {
116-
LOG_WARN("Failed to build datrie.", K(ret), K(idx));
117-
} else if (OB_FAIL(builder.get_mem_block(dat_buff, buffer_size))) {
118-
LOG_WARN("Failed to get mem block.", K(ret), K(idx));
119-
} else if (OB_FAIL(container_->fetch_info_for_dict(info))) {
120-
LOG_WARN("Failed to fetch info for dict.", K(ret), K(idx));
121-
} else if (OB_FAIL(ObFTCacheDict::make_and_fetch_cache_entry(*desc_,
122-
dat_buff,
123-
buffer_size,
124-
static_cast<int32_t>(idx),
125-
info->value_,
126-
info->handle_))) {
127-
LOG_WARN("Failed to put dict into kv cache", K(ret), K(idx));
108+
ObFTTrie<void> *trie = (*all_tries_)[idx];
109+
ObArenaAllocator dat_alloc(lib::ObMemAttr(MTL_ID(), "DATBuild"));
110+
ObFTDATBuilder<void> builder(dat_alloc);
111+
112+
ObFTDAT *dat_buff = nullptr;
113+
size_t buffer_size = 0;
114+
ObFTCacheRangeHandle *info = handles_->at(idx);
115+
116+
if (OB_FAIL(builder.init(*trie))) {
117+
LOG_WARN("Failed to init builder.", K(ret), K(idx));
118+
} else if (OB_FAIL(builder.build_from_trie(*trie))) {
119+
LOG_WARN("Failed to build datrie.", K(ret), K(idx));
120+
} else if (OB_FAIL(builder.get_mem_block(dat_buff, buffer_size))) {
121+
LOG_WARN("Failed to get mem block.", K(ret), K(idx));
122+
} else if (OB_FAIL(ObFTCacheDict::make_and_fetch_cache_entry(*desc_,
123+
dat_buff,
124+
buffer_size,
125+
static_cast<int32_t>(idx),
126+
info->value_,
127+
info->handle_))) {
128+
LOG_WARN("Failed to put dict into kv cache", K(ret), K(idx));
129+
}
130+
dat_alloc.reset();
128131
}
129132

130133
if (OB_FAIL(ret)) {
131134
int expected = OB_SUCCESS;
132135
error_code_.compare_exchange_strong(expected, ret);
133136
}
134-
135-
dat_alloc.reset();
136137
}
137138

138139
private:
139140
ObVector<ObFTTrie<void> *, ObArenaAllocator> *all_tries_;
140141
const ObFTDictDesc *desc_;
141-
ObFTCacheRangeContainer *container_;
142142
std::atomic<int> error_code_;
143+
ObArray<ObFTCacheRangeHandle *> *handles_;
143144
};
144145

145146
int ObFTRangeDict::build_ranges_concurrently_thread_pool(const ObFTDictDesc &desc,
@@ -209,19 +210,37 @@ int ObFTRangeDict::build_ranges_concurrently_thread_pool(const ObFTDictDesc &des
209210

210211
// Phase 2: Build DATs concurrently using DATBuilderThreadPool
211212
if (OB_SUCC(ret) && all_tries.size() > 0) {
212-
DATBuilderThreadPool pool;
213-
pool.set_tries(&all_tries);
214-
pool.set_desc(&desc);
215-
pool.set_container(&range_container);
216-
pool.set_thread_count(static_cast<int64_t>(all_tries.size()));
217-
218-
if (OB_FAIL(pool.start())) {
219-
LOG_WARN("Failed to start thread pool", K(ret));
213+
ObArray<ObFTCacheRangeHandle *> handles;
214+
handles.set_attr(lib::ObMemAttr(MTL_ID(), "DATBuild"));
215+
for (int64_t i = 0; OB_SUCC(ret) && i < all_tries.size(); i++) {
216+
ObFTCacheRangeHandle *handle = nullptr;
217+
if (OB_FAIL(range_container.fetch_info_for_dict(handle))) {
218+
LOG_WARN("Failed to fetch info for dict.", K(ret), K(i));
219+
} else if (OB_FAIL(handles.push_back(handle))) {
220+
OB_DELETEx(ObFTCacheRangeHandle, &tmp_alloc, handle);
221+
LOG_WARN("Failed to push back handle", K(ret), K(i));
222+
}
223+
}
224+
if (OB_FAIL(ret)) {
225+
for (int64_t i = 0; i < handles.size(); i++) {
226+
OB_DELETEx(ObFTCacheRangeHandle, &tmp_alloc, handles[i]);
227+
}
228+
handles.reset();
220229
} else {
221-
pool.wait();
222-
ret = pool.get_error_code();
223-
if (OB_FAIL(ret)) {
224-
LOG_WARN("Thread pool encountered error", K(ret));
230+
DATBuilderThreadPool pool;
231+
pool.set_tries(&all_tries);
232+
pool.set_desc(&desc);
233+
pool.set_thread_count(static_cast<int64_t>(all_tries.size()));
234+
pool.set_handles(&handles);
235+
236+
if (OB_FAIL(pool.start())) {
237+
LOG_WARN("Failed to start thread pool", K(ret));
238+
} else {
239+
pool.wait();
240+
ret = pool.get_error_code();
241+
if (OB_FAIL(ret)) {
242+
LOG_WARN("Thread pool encountered error", K(ret));
243+
}
225244
}
226245
}
227246
}

0 commit comments

Comments
 (0)