@@ -83,63 +83,64 @@ class DATBuilderThreadPool : public lib::Threads
8383 DATBuilderThreadPool ()
8484 : all_tries_(nullptr ),
8585 desc_ (nullptr ),
86- container_( nullptr ),
87- error_code_( OB_SUCCESS )
86+ error_code_( OB_SUCCESS ),
87+ handles_( nullptr )
8888 {}
8989
9090 void set_tries (ObVector<ObFTTrie<void > *, ObArenaAllocator> *tries) { all_tries_ = tries; }
9191 void set_desc (const ObFTDictDesc *desc) { desc_ = desc; }
92- void set_container (ObFTCacheRangeContainer *container) { container_ = container; }
9392 int64_t get_range_count () const { return all_tries_ ? all_tries_->size () : 0 ; }
9493 int get_error_code () const { return error_code_.load (); }
95-
94+ void set_handles (ObArray<ObFTCacheRangeHandle *> *handles) { handles_ = handles; }
9695 void run1 () override
9796 {
9897 int ret = OB_SUCCESS ;
9998 int64_t idx = get_thread_idx ();
10099
101- if (OB_ISNULL (all_tries_) || idx >= static_cast <int64_t >(all_tries_->size ())) {
102- return ;
103- }
104-
105- ObFTTrie<void > *trie = (*all_tries_)[idx];
106- ObArenaAllocator dat_alloc (lib::ObMemAttr (OB_SERVER_TENANT_ID , " DATBuild" ));
107- ObFTDATBuilder<void > builder (dat_alloc);
108-
109- ObFTDAT *dat_buff = nullptr ;
110- size_t buffer_size = 0 ;
111- ObFTCacheRangeHandle *info = nullptr ;
100+ if (OB_ISNULL (all_tries_) || idx >= static_cast <int64_t >(all_tries_->size ()) || OB_ISNULL (all_tries_->at (idx))) {
101+ ret = OB_ARRAY_OUT_OF_RANGE ;
102+ LOG_WARN (" all_tries_ is null or idx is out of range" , K (idx), K (all_tries_->size ()));
103+ } else if (OB_ISNULL (handles_) || idx >= static_cast <int64_t >(handles_->size ()) || OB_ISNULL (handles_->at (idx))) {
104+ ret = OB_ARRAY_OUT_OF_RANGE ;
105+ LOG_WARN (" handles_ is null or idx is out of range" , K (idx), K (handles_->size ()));
106+ } else {
112107
113- if (OB_FAIL (builder.init (*trie))) {
114- LOG_WARN (" Failed to init builder." , K (ret), K (idx));
115- } else if (OB_FAIL (builder.build_from_trie (*trie))) {
116- LOG_WARN (" Failed to build datrie." , K (ret), K (idx));
117- } else if (OB_FAIL (builder.get_mem_block (dat_buff, buffer_size))) {
118- LOG_WARN (" Failed to get mem block." , K (ret), K (idx));
119- } else if (OB_FAIL (container_->fetch_info_for_dict (info))) {
120- LOG_WARN (" Failed to fetch info for dict." , K (ret), K (idx));
121- } else if (OB_FAIL (ObFTCacheDict::make_and_fetch_cache_entry (*desc_,
122- dat_buff,
123- buffer_size,
124- static_cast <int32_t >(idx),
125- info->value_ ,
126- info->handle_ ))) {
127- LOG_WARN (" Failed to put dict into kv cache" , K (ret), K (idx));
108+ ObFTTrie<void > *trie = (*all_tries_)[idx];
109+ ObArenaAllocator dat_alloc (lib::ObMemAttr (MTL_ID (), " DATBuild" ));
110+ ObFTDATBuilder<void > builder (dat_alloc);
111+
112+ ObFTDAT *dat_buff = nullptr ;
113+ size_t buffer_size = 0 ;
114+ ObFTCacheRangeHandle *info = handles_->at (idx);
115+
116+ if (OB_FAIL (builder.init (*trie))) {
117+ LOG_WARN (" Failed to init builder." , K (ret), K (idx));
118+ } else if (OB_FAIL (builder.build_from_trie (*trie))) {
119+ LOG_WARN (" Failed to build datrie." , K (ret), K (idx));
120+ } else if (OB_FAIL (builder.get_mem_block (dat_buff, buffer_size))) {
121+ LOG_WARN (" Failed to get mem block." , K (ret), K (idx));
122+ } else if (OB_FAIL (ObFTCacheDict::make_and_fetch_cache_entry (*desc_,
123+ dat_buff,
124+ buffer_size,
125+ static_cast <int32_t >(idx),
126+ info->value_ ,
127+ info->handle_ ))) {
128+ LOG_WARN (" Failed to put dict into kv cache" , K (ret), K (idx));
129+ }
130+ dat_alloc.reset ();
128131 }
129132
130133 if (OB_FAIL (ret)) {
131134 int expected = OB_SUCCESS ;
132135 error_code_.compare_exchange_strong (expected, ret);
133136 }
134-
135- dat_alloc.reset ();
136137 }
137138
138139private:
139140 ObVector<ObFTTrie<void > *, ObArenaAllocator> *all_tries_;
140141 const ObFTDictDesc *desc_;
141- ObFTCacheRangeContainer *container_;
142142 std::atomic<int > error_code_;
143+ ObArray<ObFTCacheRangeHandle *> *handles_;
143144};
144145
145146int ObFTRangeDict::build_ranges_concurrently_thread_pool (const ObFTDictDesc &desc,
@@ -209,19 +210,37 @@ int ObFTRangeDict::build_ranges_concurrently_thread_pool(const ObFTDictDesc &des
209210
210211 // Phase 2: Build DATs concurrently using DATBuilderThreadPool
211212 if (OB_SUCC (ret) && all_tries.size () > 0 ) {
212- DATBuilderThreadPool pool;
213- pool.set_tries (&all_tries);
214- pool.set_desc (&desc);
215- pool.set_container (&range_container);
216- pool.set_thread_count (static_cast <int64_t >(all_tries.size ()));
217-
218- if (OB_FAIL (pool.start ())) {
219- LOG_WARN (" Failed to start thread pool" , K (ret));
213+ ObArray<ObFTCacheRangeHandle *> handles;
214+ handles.set_attr (lib::ObMemAttr (MTL_ID (), " DATBuild" ));
215+ for (int64_t i = 0 ; OB_SUCC (ret) && i < all_tries.size (); i++) {
216+ ObFTCacheRangeHandle *handle = nullptr ;
217+ if (OB_FAIL (range_container.fetch_info_for_dict (handle))) {
218+ LOG_WARN (" Failed to fetch info for dict." , K (ret), K (i));
219+ } else if (OB_FAIL (handles.push_back (handle))) {
220+ OB_DELETEx (ObFTCacheRangeHandle, &tmp_alloc, handle);
221+ LOG_WARN (" Failed to push back handle" , K (ret), K (i));
222+ }
223+ }
224+ if (OB_FAIL (ret)) {
225+ for (int64_t i = 0 ; i < handles.size (); i++) {
226+ OB_DELETEx (ObFTCacheRangeHandle, &tmp_alloc, handles[i]);
227+ }
228+ handles.reset ();
220229 } else {
221- pool.wait ();
222- ret = pool.get_error_code ();
223- if (OB_FAIL (ret)) {
224- LOG_WARN (" Thread pool encountered error" , K (ret));
230+ DATBuilderThreadPool pool;
231+ pool.set_tries (&all_tries);
232+ pool.set_desc (&desc);
233+ pool.set_thread_count (static_cast <int64_t >(all_tries.size ()));
234+ pool.set_handles (&handles);
235+
236+ if (OB_FAIL (pool.start ())) {
237+ LOG_WARN (" Failed to start thread pool" , K (ret));
238+ } else {
239+ pool.wait ();
240+ ret = pool.get_error_code ();
241+ if (OB_FAIL (ret)) {
242+ LOG_WARN (" Thread pool encountered error" , K (ret));
243+ }
225244 }
226245 }
227246 }
0 commit comments