1616
1717#include " storage/fts/dict/ob_ft_range_dict.h"
1818
19+ #include " storage/fts/dict/ob_ik_dic.h"
1920#include " lib/allocator/page_arena.h"
2021#include " lib/charset/ob_charset.h"
2122#include " lib/mysqlclient/ob_isql_client.h"
@@ -41,6 +42,194 @@ namespace oceanbase
4142{
4243namespace storage
4344{
45+
46+
47+ int ObFTRangeDict::build_cache_from_ik_dict (const ObFTDictDesc &desc, ObFTCacheRangeContainer &range_container)
48+ {
49+ int ret = OB_SUCCESS ;
50+
51+ ObIKDictLoader::RawDict raw_dict;
52+ switch (desc.type_ ) {
53+ case ObFTDictType::DICT_IK_MAIN : {
54+ raw_dict = ObIKDictLoader::dict_text ();
55+ } break ;
56+ case ObFTDictType::DICT_IK_QUAN : {
57+ raw_dict = ObIKDictLoader::dict_quen_text ();
58+ } break ;
59+ case ObFTDictType::DICT_IK_STOP : {
60+ raw_dict = ObIKDictLoader::dict_stop ();
61+ } break ;
62+ default :
63+ ret = OB_NOT_SUPPORTED ;
64+ LOG_WARN (" Not supported dict type." , K (ret));
65+ }
66+
67+ if (OB_SUCC (ret)) {
68+ ObIKDictIterator iter (raw_dict);
69+ if (OB_FAIL (iter.init ())) {
70+ LOG_WARN (" Failed to init iterator." , K (ret));
71+ } else if (OB_FAIL (ObFTRangeDict::build_ranges_concurrently_thread_pool (desc, iter, range_container))) {
72+ LOG_WARN (" Failed to build ranges." , K (ret));
73+ }
74+ }
75+
76+ return ret;
77+ }
78+
79+ // Thread pool for building DATs concurrently
80+ class DATBuilderThreadPool : public lib ::Threads
81+ {
82+ public:
83+ DATBuilderThreadPool ()
84+ : all_tries_(nullptr ),
85+ desc_ (nullptr ),
86+ container_(nullptr ),
87+ error_code_(OB_SUCCESS )
88+ {}
89+
90+ void set_tries (ObVector<ObFTTrie<void > *, ObArenaAllocator> *tries) { all_tries_ = tries; }
91+ void set_desc (const ObFTDictDesc *desc) { desc_ = desc; }
92+ void set_container (ObFTCacheRangeContainer *container) { container_ = container; }
93+ int64_t get_range_count () const { return all_tries_ ? all_tries_->size () : 0 ; }
94+ int get_error_code () const { return error_code_.load (); }
95+
96+ void run1 () override
97+ {
98+ int ret = OB_SUCCESS ;
99+ int64_t idx = get_thread_idx ();
100+
101+ if (OB_ISNULL (all_tries_) || idx >= static_cast <int64_t >(all_tries_->size ())) {
102+ return ;
103+ }
104+
105+ ObFTTrie<void > *trie = (*all_tries_)[idx];
106+ ObArenaAllocator dat_alloc (lib::ObMemAttr (OB_SERVER_TENANT_ID , " DATBuild" ));
107+ ObFTDATBuilder<void > builder (dat_alloc);
108+
109+ ObFTDAT *dat_buff = nullptr ;
110+ size_t buffer_size = 0 ;
111+ ObFTCacheRangeHandle *info = nullptr ;
112+
113+ if (OB_FAIL (builder.init (*trie))) {
114+ LOG_WARN (" Failed to init builder." , K (ret), K (idx));
115+ } else if (OB_FAIL (builder.build_from_trie (*trie))) {
116+ LOG_WARN (" Failed to build datrie." , K (ret), K (idx));
117+ } else if (OB_FAIL (builder.get_mem_block (dat_buff, buffer_size))) {
118+ LOG_WARN (" Failed to get mem block." , K (ret), K (idx));
119+ } else if (OB_FAIL (container_->fetch_info_for_dict (info))) {
120+ LOG_WARN (" Failed to fetch info for dict." , K (ret), K (idx));
121+ } else if (OB_FAIL (ObFTCacheDict::make_and_fetch_cache_entry (*desc_,
122+ dat_buff,
123+ buffer_size,
124+ static_cast <int32_t >(idx),
125+ info->value_ ,
126+ info->handle_ ))) {
127+ LOG_WARN (" Failed to put dict into kv cache" , K (ret), K (idx));
128+ }
129+
130+ if (OB_FAIL (ret)) {
131+ int expected = OB_SUCCESS ;
132+ error_code_.compare_exchange_strong (expected, ret);
133+ }
134+
135+ dat_alloc.reset ();
136+ }
137+
138+ private:
139+ ObVector<ObFTTrie<void > *, ObArenaAllocator> *all_tries_;
140+ const ObFTDictDesc *desc_;
141+ ObFTCacheRangeContainer *container_;
142+ std::atomic<int > error_code_;
143+ };
144+
145+ int ObFTRangeDict::build_ranges_concurrently_thread_pool (const ObFTDictDesc &desc,
146+ ObIFTDictIterator &iter,
147+ ObFTCacheRangeContainer &range_container)
148+ {
149+ int ret = OB_SUCCESS ;
150+
151+ // Phase 1: Collect words into tries range by range
152+ ObArenaAllocator tmp_alloc (lib::ObMemAttr (MTL_ID (), " Tmp Allocator" ));
153+ ObVector<ObFTTrie<void > *, ObArenaAllocator> all_tries (&tmp_alloc);
154+
155+ bool build_next_range = true ;
156+ while (OB_SUCC (ret) && build_next_range) {
157+ ObFTTrie<void > *trie = OB_NEWx (ObFTTrie<void >, &tmp_alloc, tmp_alloc, desc.coll_type_ );
158+ if (OB_ISNULL (trie)) {
159+ ret = OB_ALLOCATE_MEMORY_FAILED ;
160+ LOG_WARN (" Failed to allocate ObFTTrie" , K (ret));
161+ break ;
162+ }
163+
164+ int count = 0 ;
165+ int64_t first_char_len = 0 ;
166+ ObFTSingleWord end_char;
167+ bool range_end = false ;
168+
169+ while (OB_SUCC (ret) && !range_end) {
170+ ObString key;
171+ if (OB_FAIL (iter.get_key (key))) {
172+ LOG_WARN (" Failed to get key" , K (ret));
173+ } else {
174+ ++count;
175+
176+ if (count >= DEFAULT_KEY_PER_RANGE
177+ && OB_FAIL (ObCharset::first_valid_char (desc.coll_type_ ,
178+ key.ptr (),
179+ key.length (),
180+ first_char_len))) {
181+ LOG_WARN (" First char is not valid." );
182+ } else if (DEFAULT_KEY_PER_RANGE == count
183+ && OB_FAIL (end_char.set_word (key.ptr (), first_char_len))) {
184+ LOG_WARN (" Failed to record first char." , K (ret));
185+ } else if (count > DEFAULT_KEY_PER_RANGE
186+ && (end_char.get_word () != ObString (first_char_len, key.ptr ()))) {
187+ range_end = true ;
188+ } else {
189+ if (OB_FAIL (trie->insert (key, {}))) {
190+ LOG_WARN (" Failed to insert key to trie" , K (ret));
191+ } else if (OB_FAIL (iter.next ()) && OB_ITER_END != ret) {
192+ LOG_WARN (" Failed to step to next word entry." , K (ret));
193+ }
194+ }
195+ }
196+ }
197+
198+ if (OB_ITER_END == ret) {
199+ build_next_range = false ;
200+ ret = OB_SUCCESS ;
201+ }
202+
203+ if (OB_SUCC (ret) && trie->node_num () > 0 ) {
204+ if (OB_FAIL (all_tries.push_back (trie))) {
205+ LOG_WARN (" Failed to push back trie" , K (ret));
206+ }
207+ }
208+ }
209+
210+ // Phase 2: Build DATs concurrently using DATBuilderThreadPool
211+ if (OB_SUCC (ret) && all_tries.size () > 0 ) {
212+ DATBuilderThreadPool pool;
213+ pool.set_tries (&all_tries);
214+ pool.set_desc (&desc);
215+ pool.set_container (&range_container);
216+ pool.set_thread_count (static_cast <int64_t >(all_tries.size ()));
217+
218+ if (OB_FAIL (pool.start ())) {
219+ LOG_WARN (" Failed to start thread pool" , K (ret));
220+ } else {
221+ pool.wait ();
222+ ret = pool.get_error_code ();
223+ if (OB_FAIL (ret)) {
224+ LOG_WARN (" Thread pool encountered error" , K (ret));
225+ }
226+ }
227+ }
228+
229+ LOG_INFO (" build_ranges_concurrently_thread_pool completed" , K (ret), K (all_tries.size ()));
230+ return ret;
231+ }
232+
44233int ObFTRangeDict::build_one_range (const ObFTDictDesc &desc,
45234 const int32_t range_id,
46235 ObIFTDictIterator &iter,
0 commit comments