@@ -71,14 +71,14 @@ const ObString ObEmbeddingTask::USER_KEY_NAME = "user_key";
7171const ObString ObEmbeddingTask::INPUT_NAME = " input" ;
7272const ObString ObEmbeddingTask::DIMENSIONS_NAME = " dimensions" ;
7373
74- const int64_t ObEmbeddingTask::HTTP_REQUEST_TIMEOUT = 20 * 1000 * 1000 ; // default http request timeout. 20 seconds
74+ const int64_t ObEmbeddingTask::HTTP_REQUEST_TIMEOUT = 60 * 1000 * 1000 ; // default http request timeout. 60 seconds
7575
7676// Reschedule related constants
7777const int64_t ObEmbeddingTask::MAX_RESCHEDULE_RETRY_CNT = 3 ;
7878const int64_t ObEmbeddingTask::RESCHEDULE_RETRY_INTERVAL_US = 100 * 1000 ; // 100ms
7979
8080// HTTP retry related constants
81- const int64_t ObEmbeddingTask::MAX_HTTP_RETRY_CNT = 3 ;
81+ const int64_t ObEmbeddingTask::MAX_HTTP_RETRY_CNT = 2 ;
8282const int64_t ObEmbeddingTask::HTTP_RETRY_BASE_INTERVAL_US = 1 * 1000 * 1000 ; // 1 second
8383const int64_t ObEmbeddingTask::HTTP_RETRY_MAX_INTERVAL_US = 10 * 1000 * 1000 ; // 10 seconds
8484const int64_t ObEmbeddingTask::HTTP_RETRY_MULTIPLIER = 2 ;
@@ -175,7 +175,7 @@ ObEmbeddingTask::ObEmbeddingTask() : local_allocator_("EmbeddingTask", OB_MALLOC
175175 internal_error_message_(),
176176 task_lock_(),
177177 batch_size_(10 ),
178- current_batch_idx_(),
178+ current_batch_idx_(0 ),
179179 http_send_time_us_(0 ),
180180 http_response_data_size_(0 ),
181181 http_response_data_(nullptr ),
@@ -188,6 +188,8 @@ ObEmbeddingTask::ObEmbeddingTask() : local_allocator_("EmbeddingTask", OB_MALLOC
188188 http_total_retry_count_(0 ),
189189 http_retry_start_time_us_(0 ),
190190 http_last_retry_time_us_(0 ),
191+ http_max_retry_count_(0 ),
192+ wait_for_completion_timeout_us_(0 ),
191193 need_retry_flag_(false ),
192194 original_batch_size_(batch_size_),
193195 batch_size_adjusted_(false ),
@@ -222,7 +224,7 @@ ObEmbeddingTask::ObEmbeddingTask(ObArenaAllocator &allocator) : local_allocator_
222224 internal_error_message_(),
223225 task_lock_(),
224226 batch_size_(10 ),
225- current_batch_idx_(),
227+ current_batch_idx_(0 ),
226228 http_send_time_us_(0 ),
227229 http_response_data_size_(0 ),
228230 http_response_data_(nullptr ),
@@ -235,6 +237,8 @@ ObEmbeddingTask::ObEmbeddingTask(ObArenaAllocator &allocator) : local_allocator_
235237 http_total_retry_count_(0 ),
236238 http_retry_start_time_us_(0 ),
237239 http_last_retry_time_us_(0 ),
240+ http_max_retry_count_(0 ),
241+ wait_for_completion_timeout_us_(0 ),
238242 need_retry_flag_(false ),
239243 original_batch_size_(batch_size_),
240244 batch_size_adjusted_(false ),
@@ -254,15 +258,19 @@ int ObEmbeddingTask::init(const ObString &model_url,
254258 const ObIArray<ObString> &input_chunks,
255259 int64_t dimension,
256260 int64_t http_timeout_us,
261+ int64_t http_max_retries,
257262 storage::ObEmbeddingIOCallbackHandle *cb_handle)
258263{
259264 int ret = OB_SUCCESS ;
260265 if (is_inited_) {
261266 ret = OB_INIT_TWICE ;
262267 LOG_WARN (" ObEmbeddingTask already inited" , K (ret), K (model_url), K (model_name), K (user_key), K (input_chunks));
268+ } else if (http_timeout_us <= 0 || http_max_retries <= 0 ) {
269+ ret = OB_INVALID_ARGUMENT ;
270+ LOG_WARN (" invalid http timeout" , K (ret), K (http_timeout_us), K (http_max_retries));
263271 } else if (OB_FAIL (input_chunks_.assign (input_chunks))) {
264272 LOG_WARN (" failed to assign input chunks" , K (ret), K (input_chunks));
265- } else if (OB_FAIL (init_curl_handler (model_url, user_key))) {
273+ } else if (OB_FAIL (init_curl_handler (model_url, user_key, http_timeout_us ))) {
266274 LOG_WARN (" failed to init curl handler" , K (ret), K (model_url), K (user_key));
267275 } else if (OB_FAIL (task_cond_.init (ObWaitEventIds::DEFAULT_COND_WAIT ))) {
268276 LOG_WARN (" failed to init completion cond" , K (ret));
@@ -298,13 +306,15 @@ int ObEmbeddingTask::init(const ObString &model_url,
298306 }
299307
300308 output_vectors_.prepare_allocate_and_keep_count (total_chunks_);
301- if (http_timeout_us > 0 ) {
302- http_timeout_us_ = http_timeout_us;
303- } else {
304- http_timeout_us_ = HTTP_REQUEST_TIMEOUT ;
305- }
306309
307310 // Initialize retry related variables
311+ http_timeout_us_ = http_timeout_us;
312+ http_max_retry_count_ = http_max_retries;
313+ // task total timeout for all retries
314+ wait_for_completion_timeout_us_ = (http_max_retry_count_ + 1 ) * http_timeout_us_ * input_chunks.count () / batch_size_ +
315+ (http_max_retry_count_ + 1 ) * HTTP_RETRY_MAX_INTERVAL_US ;
316+
317+
308318 http_retry_start_time_us_ = 0 ;
309319 http_last_retry_time_us_ = 0 ;
310320 http_total_retry_count_ = 0 ;
@@ -313,7 +323,7 @@ int ObEmbeddingTask::init(const ObString &model_url,
313323 current_batch_size_ = batch_size_;
314324 successful_requests_count_ = 0 ;
315325
316- LOG_DEBUG (" task initialized successfully" , K (user_key_), K (task_id_), K (dimension_));
326+ LOG_DEBUG (" task initialized successfully" , K (user_key_), K (task_id_), K (dimension_), K (http_max_retry_count_), K (http_timeout_us_) );
317327 }
318328
319329 return ret;
@@ -579,6 +589,7 @@ int ObEmbeddingTask::check_async_progress()
579589 LOG_WARN (" task not started yet" , K (ret));
580590 } else if (current_phase == OB_EMBEDDING_TASK_DONE ) {
581591 ret = OB_SUCCESS ;
592+ LOG_DEBUG (" check_async_progress" , K (current_phase), K (curl_request_in_progress_));
582593 } else if (current_phase == OB_EMBEDDING_TASK_HTTP_SENT ) {
583594 if (OB_FAIL (check_http_progress ())) {
584595 if (ret == OB_NEED_RETRY ) {
@@ -599,6 +610,10 @@ int ObEmbeddingTask::check_async_progress()
599610 LOG_WARN (" failed to handle retry failure" , K (ret));
600611 }
601612 }
613+ // reset flag
614+ if (OB_SUCC (ret)) {
615+ need_retry_flag_ = false ;
616+ }
602617 } else {
603618 // not time to retry yet, continue waiting
604619 ret = OB_SUCCESS ;
@@ -616,14 +631,9 @@ int ObEmbeddingTask::check_async_progress()
616631 if (OB_FAIL (set_phase (OB_EMBEDDING_TASK_HTTP_COMPLETED ))) {
617632 LOG_WARN (" failed to set phase to HTTP_COMPLETED" , K (ret));
618633 }
619- } else {
620- int64_t current_time = ObTimeUtility::current_time ();
621- int64_t elapsed_time = current_time - http_send_time_us_;
622- if (elapsed_time > http_timeout_us_) {
623- if (OB_FAIL (complete_task (OB_EMBEDDING_TASK_DONE , OB_TIMEOUT , true ))) {
624- LOG_WARN (" failed to handle task failure" , K (ret));
625- }
626- LOG_WARN (" HTTP request timeout" , K (elapsed_time), K (http_timeout_us_), K (*this ));
634+ } else { // http not response
635+ if (REACH_TIME_INTERVAL (10L * 1000L * 1000L )) { // 10s
636+ LOG_INFO (" wait http response" , K (ret), K (*this ));
627637 }
628638 }
629639 } else if (current_phase == OB_EMBEDDING_TASK_HTTP_COMPLETED ) {
@@ -812,7 +822,18 @@ int ObEmbeddingTask::check_http_progress()
812822 }
813823 http_last_retry_time_us_ = ObTimeUtility::current_time ();
814824 }
825+ LOG_WARN (" curl request error, need retry" , K (ret), K (need_retry_flag_), K (http_retry_count_), K (http_max_retry_count_));
826+ }
827+ } else if (res == CURLE_OPERATION_TIMEDOUT ) { // curl timeout
828+ if (++http_retry_count_ < http_max_retry_count_) {
829+ need_retry_flag_ = true ;
830+ http_total_retry_count_++;
831+ http_last_retry_time_us_ = ObTimeUtility::current_time ();
832+ ret = OB_NEED_RETRY ;
833+ } else {
834+ ret = OB_TIMEOUT ;
815835 }
836+ LOG_WARN (" curl request timeot, need retry" , K (ret), K (need_retry_flag_), K (http_retry_count_), K (http_max_retry_count_));
816837 } else {
817838 ret = OB_CURL_ERROR ;
818839 LOG_WARN (" curl request failed" , K (ret), K (res), K (*this ));
@@ -1011,7 +1032,6 @@ int ObEmbeddingTask::do_work(ThreadPoolType *thread_pool)
10111032 }
10121033 }
10131034 }
1014-
10151035 return ret;
10161036}
10171037
@@ -1451,7 +1471,7 @@ bool ObEmbeddingTask::should_retry_http_request(int64_t http_error_code) const
14511471 case 502 :
14521472 case 503 :
14531473 case 504 :
1454- return http_retry_count_ < MAX_HTTP_RETRY_CNT ;
1474+ return http_retry_count_ < http_max_retry_count_ ;
14551475 default :
14561476 return false ;
14571477 }
@@ -1605,13 +1625,16 @@ int ObEmbeddingTask::maybe_callback()
16051625 return ret;
16061626}
16071627
1608- int ObEmbeddingTask::init_curl_handler (const ObString &model_url, const ObString &user_key)
1628+ int ObEmbeddingTask::init_curl_handler (const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us )
16091629{
16101630 int ret = OB_SUCCESS ;
16111631 if (OB_UNLIKELY (curl_multi_handle_ || curl_easy_handle_)) {
16121632 ret = OB_INIT_TWICE ;
16131633 LOG_WARN (" curl handles already initialized" , K (ret), KPC (this ));
1614- }else if (OB_ISNULL (curl_multi_handle_ = curl_multi_init ())) {
1634+ } else if (http_timeout_us <=0 ) {
1635+ ret = OB_INVALID_ARGUMENT ;
1636+ LOG_WARN (" invalid http_timeout_us" , K (ret), K (http_timeout_us));
1637+ } else if (OB_ISNULL (curl_multi_handle_ = curl_multi_init ())) {
16151638 ret = OB_CURL_ERROR ;
16161639 LOG_WARN (" failed to init curl multi handle" , K (ret));
16171640 } else if (OB_ISNULL (curl_easy_handle_ = curl_easy_init ())) {
@@ -1643,8 +1666,8 @@ int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString
16431666 } else {
16441667 curl_easy_setopt (curl_easy_handle_, CURLOPT_WRITEDATA , (void *)curl_response_data_);
16451668
1646- curl_easy_setopt (curl_easy_handle_, CURLOPT_TIMEOUT , HTTP_REQUEST_TIMEOUT / 1000 );
1647- curl_easy_setopt (curl_easy_handle_, CURLOPT_CONNECTTIMEOUT , HTTP_REQUEST_TIMEOUT / 1000 );
1669+ curl_easy_setopt (curl_easy_handle_, CURLOPT_TIMEOUT_MS , http_timeout_us / 1000 );
1670+ curl_easy_setopt (curl_easy_handle_, CURLOPT_CONNECTTIMEOUT_MS , http_timeout_us / 1000 );
16481671
16491672 CURLMcode multi_res = curl_multi_add_handle (curl_multi_handle_, curl_easy_handle_);
16501673 if (multi_res != CURLM_OK ) {
@@ -1654,11 +1677,12 @@ int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString
16541677 }
16551678 }
16561679 }
1680+
16571681 return ret;
16581682
16591683}
16601684
1661- int ObEmbeddingTask::wait_for_completion (const int64_t timeout_ms )
1685+ int ObEmbeddingTask::wait_for_completion ()
16621686{
16631687 int ret = OB_SUCCESS ;
16641688 if (OB_UNLIKELY (!is_inited_)) {
@@ -1669,7 +1693,7 @@ int ObEmbeddingTask::wait_for_completion(const int64_t timeout_ms)
16691693 if (callback_done_) {
16701694 // do nothing
16711695 } else {
1672- if (OB_FAIL (task_cond_.wait (timeout_ms ))) {
1696+ if (OB_FAIL (task_cond_.wait (wait_for_completion_timeout_us_ / 1000 ))) {
16731697 LOG_WARN (" failed to wait for completion" , K (ret));
16741698 }
16751699 }
0 commit comments