Skip to content

Commit 222f653

Browse files
obdevfootka
authored andcommitted
[CP] [vector index] handle hybird vector index calling LLM error
Co-authored-by: footka <672528926@qq.com>
1 parent deb98fb commit 222f653

10 files changed

Lines changed: 124 additions & 64 deletions

File tree

src/share/parameter/ob_parameter_attr.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ namespace common {
2424
#define DECL_EVAL_MACRO(macro, args...) macro(args)
2525
#define DECL_ATTR_LIST(M) \
2626
DECL_EVAL_MACRO(M, Section, ROOT_SERVICE, LOAD_BALANCE, DAILY_MERGE, LOCATION_CACHE, \
27-
SSTABLE, LOGSERVICE, CACHE, TRANS, TENANT, RPC, OBPROXY, OBSERVER, RESOURCE_LIMIT); \
27+
SSTABLE, LOGSERVICE, CACHE, TRANS, TENANT, RPC, OBPROXY, OBSERVER, \
28+
RESOURCE_LIMIT, AI); \
2829
DECL_EVAL_MACRO(M, Scope, CLUSTER, TENANT); \
2930
DECL_EVAL_MACRO(M, Source, DEFAULT, FILE, OBADMIN, CMDLINE, CLUSTER, TENANT); \
3031
DECL_EVAL_MACRO(M, Session, NO, YES); \

src/share/parameter/ob_parameter_seed.ipp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,6 +2133,16 @@ DEF_BOOL(_enable_persistent_compiled_routine, OB_CLUSTER_PARAMETER, "true",
21332133
"The default value is TRUE. Value: TRUE: turned on FALSE: turned off",
21342134
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
21352135

2136+
// AI / LLM
2137+
DEF_TIME(model_request_timeout, OB_CLUSTER_PARAMETER, "60s", "[1s,)",
2138+
"Used to control the HTTP timeout for accessing the model. Especially, the default value is 60s.",
2139+
ObParameterAttr(Section::AI, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
2140+
2141+
DEF_INT(model_max_retries, OB_CLUSTER_PARAMETER, "2", "[1,)",
2142+
"Used to control the retry times after a failed model interaction. Especially, the default value is 2",
2143+
ObParameterAttr(Section::AI, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
2144+
2145+
21362146
DEF_STR_WITH_CHECKER(sql_protocol_min_tls_version, OB_CLUSTER_PARAMETER, "none",
21372147
common::ObConfigSQLTlsVersionChecker,
21382148
"SQL SSL control options, used to specify the minimum SSL/TLS version number. "

src/share/vector_index/ob_hybrid_vector_refresh_task.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,19 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
510510
storage::ObValueRowIterator &delta_delete_iter = task_ctx->delta_delete_iter_;
511511
int64_t dim = 0;
512512
int64_t loop_cnt = 0;
513-
uint64_t timeout_us = ObTimeUtility::current_time() + ObInsertLobColumnHelper::LOB_TX_TIMEOUT;
513+
int64_t http_timeout_us = 0;
514+
int64_t http_max_retries = 0;
515+
516+
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
517+
if (tenant_config.is_valid()) {
518+
http_timeout_us = tenant_config->model_request_timeout;
519+
http_max_retries = tenant_config->model_max_retries;
520+
} else {
521+
SHARE_LOG_RET(WARN, OB_INVALID_CONFIG, "init model request timeout and max retries config with default value");
522+
http_timeout_us = 60 * 1000 * 1000; // 60 seconds
523+
http_max_retries = 2;
524+
}
525+
514526
if (OB_ISNULL(task_ctx)) {
515527
ret = OB_ERR_UNEXPECTED;
516528
LOG_WARN("unexpected error", K(ret), KPC(task_ctx));
@@ -652,7 +664,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
652664
} else if (OB_FAIL(ob_write_string(task_ctx->allocator_, endpoint->get_url(), url, true))) {
653665
LOG_WARN("fail to write string", K(ret));
654666
} else if (OB_FAIL(task_ctx->embedding_task_->init(url, endpoint->get_request_model_name(),
655-
endpoint->get_provider(), access_key, chunk_array, dim, timeout_us))) {
667+
endpoint->get_provider(), access_key, chunk_array, dim, http_timeout_us, http_max_retries))) {
656668
LOG_WARN("failed to init embedding task", K(ret), KPC(endpoint));
657669
} else {
658670
ObEmbeddingTaskHandler *embedding_handler = nullptr;

src/share/vector_index/ob_vector_embedding_handler.cpp

Lines changed: 51 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ const ObString ObEmbeddingTask::USER_KEY_NAME = "user_key";
7171
const ObString ObEmbeddingTask::INPUT_NAME = "input";
7272
const ObString ObEmbeddingTask::DIMENSIONS_NAME = "dimensions";
7373

74-
const int64_t ObEmbeddingTask::HTTP_REQUEST_TIMEOUT = 20 * 1000 * 1000; // default http request timeout. 20 seconds
74+
const int64_t ObEmbeddingTask::HTTP_REQUEST_TIMEOUT = 60 * 1000 * 1000; // default http request timeout. 60 seconds
7575

7676
// Reschedule related constants
7777
const int64_t ObEmbeddingTask::MAX_RESCHEDULE_RETRY_CNT = 3;
7878
const int64_t ObEmbeddingTask::RESCHEDULE_RETRY_INTERVAL_US = 100 * 1000; // 100ms
7979

8080
// HTTP retry related constants
81-
const int64_t ObEmbeddingTask::MAX_HTTP_RETRY_CNT = 3;
81+
const int64_t ObEmbeddingTask::MAX_HTTP_RETRY_CNT = 2;
8282
const int64_t ObEmbeddingTask::HTTP_RETRY_BASE_INTERVAL_US = 1 * 1000 * 1000; // 1 second
8383
const int64_t ObEmbeddingTask::HTTP_RETRY_MAX_INTERVAL_US = 10 * 1000 * 1000; // 10 seconds
8484
const int64_t ObEmbeddingTask::HTTP_RETRY_MULTIPLIER = 2;
@@ -175,7 +175,7 @@ ObEmbeddingTask::ObEmbeddingTask() : local_allocator_("EmbeddingTask", OB_MALLOC
175175
internal_error_message_(),
176176
task_lock_(),
177177
batch_size_(10),
178-
current_batch_idx_(),
178+
current_batch_idx_(0),
179179
http_send_time_us_(0),
180180
http_response_data_size_(0),
181181
http_response_data_(nullptr),
@@ -188,6 +188,8 @@ ObEmbeddingTask::ObEmbeddingTask() : local_allocator_("EmbeddingTask", OB_MALLOC
188188
http_total_retry_count_(0),
189189
http_retry_start_time_us_(0),
190190
http_last_retry_time_us_(0),
191+
http_max_retry_count_(0),
192+
wait_for_completion_timeout_us_(0),
191193
need_retry_flag_(false),
192194
original_batch_size_(batch_size_),
193195
batch_size_adjusted_(false),
@@ -222,7 +224,7 @@ ObEmbeddingTask::ObEmbeddingTask(ObArenaAllocator &allocator) : local_allocator_
222224
internal_error_message_(),
223225
task_lock_(),
224226
batch_size_(10),
225-
current_batch_idx_(),
227+
current_batch_idx_(0),
226228
http_send_time_us_(0),
227229
http_response_data_size_(0),
228230
http_response_data_(nullptr),
@@ -235,6 +237,8 @@ ObEmbeddingTask::ObEmbeddingTask(ObArenaAllocator &allocator) : local_allocator_
235237
http_total_retry_count_(0),
236238
http_retry_start_time_us_(0),
237239
http_last_retry_time_us_(0),
240+
http_max_retry_count_(0),
241+
wait_for_completion_timeout_us_(0),
238242
need_retry_flag_(false),
239243
original_batch_size_(batch_size_),
240244
batch_size_adjusted_(false),
@@ -254,15 +258,19 @@ int ObEmbeddingTask::init(const ObString &model_url,
254258
const ObIArray<ObString> &input_chunks,
255259
int64_t dimension,
256260
int64_t http_timeout_us,
261+
int64_t http_max_retries,
257262
storage::ObEmbeddingIOCallbackHandle *cb_handle)
258263
{
259264
int ret = OB_SUCCESS;
260265
if (is_inited_) {
261266
ret = OB_INIT_TWICE;
262267
LOG_WARN("ObEmbeddingTask already inited", K(ret), K(model_url), K(model_name), K(user_key), K(input_chunks));
268+
} else if (http_timeout_us <= 0 || http_max_retries <= 0) {
269+
ret = OB_INVALID_ARGUMENT;
270+
LOG_WARN("invalid http timeout", K(ret), K(http_timeout_us), K(http_max_retries));
263271
} else if (OB_FAIL(input_chunks_.assign(input_chunks))) {
264272
LOG_WARN("failed to assign input chunks", K(ret), K(input_chunks));
265-
} else if (OB_FAIL(init_curl_handler(model_url, user_key))) {
273+
} else if (OB_FAIL(init_curl_handler(model_url, user_key, http_timeout_us))) {
266274
LOG_WARN("failed to init curl handler", K(ret), K(model_url), K(user_key));
267275
} else if (OB_FAIL(task_cond_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
268276
LOG_WARN("failed to init completion cond", K(ret));
@@ -298,13 +306,15 @@ int ObEmbeddingTask::init(const ObString &model_url,
298306
}
299307

300308
output_vectors_.prepare_allocate_and_keep_count(total_chunks_);
301-
if (http_timeout_us > 0) {
302-
http_timeout_us_ = http_timeout_us;
303-
} else {
304-
http_timeout_us_ = HTTP_REQUEST_TIMEOUT;
305-
}
306309

307310
// Initialize retry related variables
311+
http_timeout_us_ = http_timeout_us;
312+
http_max_retry_count_ = http_max_retries;
313+
// task total timeout for all retries
314+
wait_for_completion_timeout_us_ = (http_max_retry_count_ + 1) * http_timeout_us_ * input_chunks.count() / batch_size_ +
315+
(http_max_retry_count_ + 1) * HTTP_RETRY_MAX_INTERVAL_US;
316+
317+
308318
http_retry_start_time_us_ = 0;
309319
http_last_retry_time_us_ = 0;
310320
http_total_retry_count_ = 0;
@@ -313,7 +323,7 @@ int ObEmbeddingTask::init(const ObString &model_url,
313323
current_batch_size_ = batch_size_;
314324
successful_requests_count_ = 0;
315325

316-
LOG_DEBUG("task initialized successfully", K(user_key_), K(task_id_), K(dimension_));
326+
LOG_DEBUG("task initialized successfully", K(user_key_), K(task_id_), K(dimension_), K(http_max_retry_count_), K(http_timeout_us_));
317327
}
318328

319329
return ret;
@@ -579,6 +589,7 @@ int ObEmbeddingTask::check_async_progress()
579589
LOG_WARN("task not started yet", K(ret));
580590
} else if (current_phase == OB_EMBEDDING_TASK_DONE) {
581591
ret = OB_SUCCESS;
592+
LOG_DEBUG("check_async_progress", K(current_phase), K(curl_request_in_progress_));
582593
} else if (current_phase == OB_EMBEDDING_TASK_HTTP_SENT) {
583594
if (OB_FAIL(check_http_progress())) {
584595
if (ret == OB_NEED_RETRY) {
@@ -599,6 +610,10 @@ int ObEmbeddingTask::check_async_progress()
599610
LOG_WARN("failed to handle retry failure", K(ret));
600611
}
601612
}
613+
// reset flag
614+
if (OB_SUCC(ret)) {
615+
need_retry_flag_ = false;
616+
}
602617
} else {
603618
// not time to retry yet, continue waiting
604619
ret = OB_SUCCESS;
@@ -616,14 +631,9 @@ int ObEmbeddingTask::check_async_progress()
616631
if (OB_FAIL(set_phase(OB_EMBEDDING_TASK_HTTP_COMPLETED))) {
617632
LOG_WARN("failed to set phase to HTTP_COMPLETED", K(ret));
618633
}
619-
} else {
620-
int64_t current_time = ObTimeUtility::current_time();
621-
int64_t elapsed_time = current_time - http_send_time_us_;
622-
if (elapsed_time > http_timeout_us_) {
623-
if (OB_FAIL(complete_task(OB_EMBEDDING_TASK_DONE, OB_TIMEOUT, true))) {
624-
LOG_WARN("failed to handle task failure", K(ret));
625-
}
626-
LOG_WARN("HTTP request timeout", K(elapsed_time), K(http_timeout_us_), K(*this));
634+
} else { // http not response
635+
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) { //10s
636+
LOG_INFO("wait http response", K(ret), K(*this));
627637
}
628638
}
629639
} else if (current_phase == OB_EMBEDDING_TASK_HTTP_COMPLETED) {
@@ -812,7 +822,18 @@ int ObEmbeddingTask::check_http_progress()
812822
}
813823
http_last_retry_time_us_ = ObTimeUtility::current_time();
814824
}
825+
LOG_WARN("curl request error, need retry", K(ret), K(need_retry_flag_), K(http_retry_count_), K(http_max_retry_count_));
826+
}
827+
} else if (res == CURLE_OPERATION_TIMEDOUT) { // curl timeout
828+
if (++http_retry_count_ < http_max_retry_count_) {
829+
need_retry_flag_ = true;
830+
http_total_retry_count_++;
831+
http_last_retry_time_us_ = ObTimeUtility::current_time();
832+
ret = OB_NEED_RETRY;
833+
} else {
834+
ret = OB_TIMEOUT;
815835
}
836+
LOG_WARN("curl request timeot, need retry", K(ret), K(need_retry_flag_), K(http_retry_count_), K(http_max_retry_count_));
816837
} else {
817838
ret = OB_CURL_ERROR;
818839
LOG_WARN("curl request failed", K(ret), K(res), K(*this));
@@ -1011,7 +1032,6 @@ int ObEmbeddingTask::do_work(ThreadPoolType *thread_pool)
10111032
}
10121033
}
10131034
}
1014-
10151035
return ret;
10161036
}
10171037

@@ -1451,7 +1471,7 @@ bool ObEmbeddingTask::should_retry_http_request(int64_t http_error_code) const
14511471
case 502:
14521472
case 503:
14531473
case 504:
1454-
return http_retry_count_ < MAX_HTTP_RETRY_CNT;
1474+
return http_retry_count_ < http_max_retry_count_;
14551475
default:
14561476
return false;
14571477
}
@@ -1605,13 +1625,16 @@ int ObEmbeddingTask::maybe_callback()
16051625
return ret;
16061626
}
16071627

1608-
int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString &user_key)
1628+
int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us)
16091629
{
16101630
int ret = OB_SUCCESS;
16111631
if (OB_UNLIKELY(curl_multi_handle_ || curl_easy_handle_)) {
16121632
ret = OB_INIT_TWICE;
16131633
LOG_WARN("curl handles already initialized", K(ret), KPC(this));
1614-
}else if (OB_ISNULL(curl_multi_handle_ = curl_multi_init())) {
1634+
} else if (http_timeout_us <=0 ) {
1635+
ret = OB_INVALID_ARGUMENT;
1636+
LOG_WARN("invalid http_timeout_us", K(ret), K(http_timeout_us));
1637+
} else if (OB_ISNULL(curl_multi_handle_ = curl_multi_init())) {
16151638
ret = OB_CURL_ERROR;
16161639
LOG_WARN("failed to init curl multi handle", K(ret));
16171640
} else if (OB_ISNULL(curl_easy_handle_ = curl_easy_init())) {
@@ -1643,8 +1666,8 @@ int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString
16431666
} else {
16441667
curl_easy_setopt(curl_easy_handle_, CURLOPT_WRITEDATA, (void *)curl_response_data_);
16451668

1646-
curl_easy_setopt(curl_easy_handle_, CURLOPT_TIMEOUT, HTTP_REQUEST_TIMEOUT / 1000);
1647-
curl_easy_setopt(curl_easy_handle_, CURLOPT_CONNECTTIMEOUT, HTTP_REQUEST_TIMEOUT / 1000);
1669+
curl_easy_setopt(curl_easy_handle_, CURLOPT_TIMEOUT_MS, http_timeout_us / 1000);
1670+
curl_easy_setopt(curl_easy_handle_, CURLOPT_CONNECTTIMEOUT_MS, http_timeout_us / 1000);
16481671

16491672
CURLMcode multi_res = curl_multi_add_handle(curl_multi_handle_, curl_easy_handle_);
16501673
if (multi_res != CURLM_OK) {
@@ -1654,11 +1677,12 @@ int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString
16541677
}
16551678
}
16561679
}
1680+
16571681
return ret;
16581682

16591683
}
16601684

1661-
int ObEmbeddingTask::wait_for_completion(const int64_t timeout_ms)
1685+
int ObEmbeddingTask::wait_for_completion()
16621686
{
16631687
int ret = OB_SUCCESS;
16641688
if (OB_UNLIKELY(!is_inited_)) {
@@ -1669,7 +1693,7 @@ int ObEmbeddingTask::wait_for_completion(const int64_t timeout_ms)
16691693
if (callback_done_) {
16701694
// do nothing
16711695
} else {
1672-
if (OB_FAIL(task_cond_.wait(timeout_ms))) {
1696+
if (OB_FAIL(task_cond_.wait(wait_for_completion_timeout_us_ / 1000))) {
16731697
LOG_WARN("failed to wait for completion", K(ret));
16741698
}
16751699
}

src/share/vector_index/ob_vector_embedding_handler.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ class ObEmbeddingTask
171171
const ObIArray<ObString> &input_chunks,
172172
int64_t dimension,
173173
int64_t http_timeout_us,
174+
int64_t http_max_retries,
174175
storage::ObEmbeddingIOCallbackHandle *cb_handle = nullptr);
175176
template <typename ThreadPoolType>
176177
int do_work(ThreadPoolType *thread_pool);
@@ -197,7 +198,7 @@ class ObEmbeddingTask
197198
// 公共方法用于外部设置任务失败
198199
int mark_task_failed(int error_code);
199200
int maybe_callback();
200-
int wait_for_completion(const int64_t timeout_ms = 0);
201+
int wait_for_completion();
201202
int wake_up();
202203
void disable_callback();
203204
void set_callback_done();
@@ -262,7 +263,7 @@ class ObEmbeddingTask
262263
void reset_retry_state();
263264
int map_http_error_to_internal_error(int64_t http_error_code) const;
264265
void try_increase_batch_size();
265-
int init_curl_handler(const ObString &model_url, const ObString &user_key);
266+
int init_curl_handler(const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us);
266267

267268
struct HttpResponseData {
268269
HttpResponseData(ObIAllocator &allocator) : data(nullptr), size(0), allocator(allocator) {}
@@ -360,6 +361,9 @@ class ObEmbeddingTask
360361
int64_t http_total_retry_count_;
361362
int64_t http_retry_start_time_us_;
362363
int64_t http_last_retry_time_us_;
364+
int64_t http_max_retry_count_;
365+
int64_t wait_for_completion_timeout_us_; // For controlling the maximum timeout of waiting for completion
366+
363367
bool need_retry_flag_;
364368

365369
// Batch size adjustment for retry

src/storage/ddl/ob_ddl_pipeline.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1684,13 +1684,13 @@ int ObHNSWEmbeddingOperator::init(const ObTabletID &tablet_id)
16841684
}
16851685

16861686
if (OB_SUCC(ret)) {
1687-
if (OB_FAIL(embedmgr_->init(model_id_, http_timeout_us_))) {
1687+
if (OB_FAIL(embedmgr_->init(model_id_))) {
16881688
embedmgr_->~ObEmbeddingTaskMgr();
16891689
op_allocator_.free(embedmgr_);
16901690
embedmgr_ = nullptr;
16911691
LOG_WARN("failed to init embedding task manager", K(ret));
16921692
} else {
1693-
batch_size_ = 64; // TODO(fanfangyao.ffy):待调参
1693+
batch_size_ = 64; // TODO(fanfangyao.ffy): To be tuned
16941694
void *batch_buf = ob_malloc(sizeof(ObTaskBatchInfo), ObMemAttr(MTL_ID(), "TaskBatch"));
16951695
if (OB_ISNULL(batch_buf)) {
16961696
ret = OB_ALLOCATE_MEMORY_FAILED;
@@ -1725,7 +1725,6 @@ int ObHNSWEmbeddingOperator::execute(const ObChunk &input_chunk,
17251725
int ret = OB_SUCCESS;
17261726
output_chunk.reset();
17271727
result_state = ObPipelineOperator::NEED_MORE_INPUT;
1728-
int64_t wait_timeout_us = http_timeout_us_;
17291728
if (OB_UNLIKELY(!is_inited_)) {
17301729
ret = OB_NOT_INIT;
17311730
LOG_WARN("not init", K(ret));
@@ -1756,7 +1755,7 @@ int ObHNSWEmbeddingOperator::execute(const ObChunk &input_chunk,
17561755

17571756
//wait for task completion
17581757
if (OB_SUCC(ret)) {
1759-
if (OB_FAIL(embedmgr_->wait_for_completion(wait_timeout_us))) {
1758+
if (OB_FAIL(embedmgr_->wait_for_completion())) {
17601759
LOG_WARN("wait for completion failed", K(ret));
17611760
} else if (OB_FAIL(get_ready_results(output_chunk, result_state))) {
17621761
LOG_WARN("get ready results failed", K(ret));

src/storage/ddl/ob_ddl_pipeline.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ class ObHNSWEmbeddingOperator : public ObVectorIndexBaseOperator
725725
explicit ObHNSWEmbeddingOperator(ObPipeline *pipeline)
726726
: ObVectorIndexBaseOperator(pipeline), embedmgr_(nullptr), vec_dim_(-1), rowkey_cnt_(-1),
727727
text_col_idx_(-1), is_inited_(false), error_ret_code_(OB_SUCCESS),
728-
batch_size_(0), current_batch_(nullptr), http_timeout_us_(20 * 1000 * 1000) /* 20s */
728+
batch_size_(0), current_batch_(nullptr)
729729
{}
730730
~ObHNSWEmbeddingOperator();
731731
int init(const ObTabletID &tablet_id);
@@ -771,7 +771,6 @@ class ObHNSWEmbeddingOperator : public ObVectorIndexBaseOperator
771771
blocksstable::ObBatchDatumRows *cur_datum_rows_;
772772
int64_t cur_row_in_batch_;
773773
bool chunk_exhausted_;
774-
int64_t http_timeout_us_;
775774
DISALLOW_COPY_AND_ASSIGN(ObHNSWEmbeddingOperator);
776775
};
777776

0 commit comments

Comments
 (0)