2121#include " src/storage/ls/ob_ls.h"
2222#include " src/storage/tx/ob_trans_service.h"
2323#include " sql/das/ob_das_dml_vec_iter.h"
24+ #include " sql/engine/expr/ob_expr_ai/ob_ai_func_utils.h"
2425
2526namespace oceanbase
2627{
@@ -220,7 +221,7 @@ int ObHybridVectorRefreshTask::do_work()
220221 exec_finish = true ;
221222 break ;
222223 }
223- default :
224+ default :
224225 ret = OB_ERR_UNEXPECTED ;
225226 LOG_WARN (" unexpected task status" , K (ret), K (current_status ()), KPC (get_task_ctx ()));
226227 break ;
@@ -435,9 +436,9 @@ int ObHybridVectorRefreshTask::get_embedded_table_column_ids(ObPluginVectorIndex
435436 return ret;
436437}
437438
438- int ObHybridVectorRefreshTask::init_dml_param (uint64_t table_id,
439- ObDMLBaseParam &dml_param,
440- share::schema::ObTableDMLParam &table_param,
439+ int ObHybridVectorRefreshTask::init_dml_param (uint64_t table_id,
440+ ObDMLBaseParam &dml_param,
441+ share::schema::ObTableDMLParam &table_param,
441442 ObIArray<uint64_t > &dml_column_ids,
442443 transaction::ObTxDesc *tx_desc,
443444 oceanbase::transaction::ObTxReadSnapshot &snapshot,
@@ -486,6 +487,8 @@ int ObHybridVectorRefreshTask::init_dml_param(uint64_t table_id,
486487int ObHybridVectorRefreshTask::init_endpoint (ObPluginVectorIndexAdaptor &adaptor)
487488{
488489 int ret = OB_SUCCESS ;
490+ bool use_request_model_name = false ;
491+ ObAIFuncExprInfo *ai_func_info = nullptr ;
489492 omt::ObTenantAiService *ai_service = MTL (omt::ObTenantAiService *);
490493 ObHybridVectorRefreshTaskCtx *task_ctx = static_cast <ObHybridVectorRefreshTaskCtx *>(get_task_ctx ());
491494 if (OB_ISNULL (ai_service) || OB_ISNULL (task_ctx)) {
@@ -495,6 +498,16 @@ int ObHybridVectorRefreshTask::init_endpoint(ObPluginVectorIndexAdaptor &adaptor
495498 LOG_WARN (" failed to get ai service guard" , K (ret), KPC (task_ctx));
496499 } else if (OB_FAIL (task_ctx->ai_service_ .get_ai_endpoint_by_ai_model_name (adaptor.get_endpoint (), task_ctx->endpoint_ , false /* need_check*/ ))) {
497500 LOG_WARN (" failed to get endpoint info" , K (ret), K (adaptor));
501+ } else if (OB_FALSE_IT (use_request_model_name = !task_ctx->endpoint_ ->get_request_model_name ().empty ())) {
502+ } else if (use_request_model_name && OB_FAIL (ob_write_string (task_ctx->allocator_ , task_ctx->endpoint_ ->get_request_model_name (), task_ctx->request_model_name_ ))) {
503+ LOG_WARN (" failed to copy request_model_name" , K (ret));
504+ } else if (!use_request_model_name && OB_FAIL (ObAIFuncUtils::get_ai_func_info (task_ctx->allocator_ , adaptor.get_endpoint (), ai_func_info))) {
505+ LOG_WARN (" failed to get ai func info" , K (ret), K (adaptor.get_endpoint ()));
506+ } else if (!use_request_model_name && OB_ISNULL (ai_func_info)) {
507+ ret = OB_ERR_UNEXPECTED ;
508+ LOG_WARN (" ai func info is null" , K (ret));
509+ } else if (!use_request_model_name && OB_FAIL (ob_write_string (task_ctx->allocator_ , ai_func_info->model_ , task_ctx->request_model_name_ ))) {
510+ LOG_WARN (" failed to copy model_name from ai func info" , K (ret));
498511 }
499512 return ret;
500513}
@@ -544,7 +557,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
544557 } else if (FALSE_IT (table_param = new (table_param)schema::ObTableParam (task_ctx->allocator_ ))) {
545558 } else if (FALSE_IT (ctx_->task_status_ .target_scn_ .convert_from_ts (ObTimeUtility::current_time ()))) {
546559 } else if (OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
547- &adaptor,
560+ &adaptor,
548561 ctx_->task_status_ .target_scn_ ,
549562 INDEX_TYPE_VEC_DELTA_BUFFER_LOCAL ,
550563 task_ctx->allocator_ ,
@@ -570,7 +583,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
570583 task_ctx->batch_cnt_ = MAX (task_ctx->batch_cnt_ / 4 , ObHybridVectorRefreshTaskCtx::MIN_BATCH_CNT );
571584 }
572585 }
573-
586+
574587 int cur_row_count = 0 ;
575588 ObSEArray<ObString, 4 > chunk_array;
576589 ObSEArray<ObString, 4 > tmp_chunk_array;
@@ -663,7 +676,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
663676 LOG_WARN (" failed to get access key" , K (ret));
664677 } else if (OB_FAIL (ob_write_string (task_ctx->allocator_ , endpoint->get_url (), url, true ))) {
665678 LOG_WARN (" fail to write string" , K (ret));
666- } else if (OB_FAIL (task_ctx->embedding_task_ ->init (url, endpoint-> get_request_model_name () ,
679+ } else if (OB_FAIL (task_ctx->embedding_task_ ->init (url, task_ctx-> request_model_name_ ,
667680 endpoint->get_provider (), access_key, chunk_array, dim, http_timeout_us, http_max_retries))) {
668681 LOG_WARN (" failed to init embedding task" , K (ret), KPC (endpoint));
669682 } else {
@@ -703,10 +716,10 @@ int ObHybridVectorRefreshTask::check_embedding_finish(bool &finish)
703716}
704717
705718int ObHybridVectorRefreshTask::do_refresh_only (
706- ObPluginVectorIndexAdaptor &adaptor,
707- transaction::ObTxDesc *tx_desc,
708- oceanbase::transaction::ObTxReadSnapshot &snapshot,
709- storage::ObStoreCtxGuard &store_ctx_guard,
719+ ObPluginVectorIndexAdaptor &adaptor,
720+ transaction::ObTxDesc *tx_desc,
721+ oceanbase::transaction::ObTxReadSnapshot &snapshot,
722+ storage::ObStoreCtxGuard &store_ctx_guard,
710723 storage::ObValueRowIterator &index_id_iter,
711724 storage::ObValueRowIterator &delta_delete_iter)
712725{
@@ -734,7 +747,7 @@ int ObHybridVectorRefreshTask::do_refresh_only(
734747 LOG_WARN (" failed to insert rows to index id table" , K (ret), K (adaptor.get_vbitmap_table_id ()));
735748 }
736749 store_ctx_guard.reset ();
737-
750+
738751 // delete from 3 table.
739752 affected_rows = 0 ;
740753 if (OB_FAIL (ret)) {
@@ -843,7 +856,7 @@ int ObHybridVectorRefreshTask::delete_embedded_table(ObPluginVectorIndexAdaptor
843856 common::ObNewRowIterator *scan_iter = nullptr ;
844857 ObStorageDatumUtils util;
845858 ObArenaAllocator scan_allocator (" VecEmbedding" , OB_MALLOC_NORMAL_BLOCK_SIZE , MTL_ID ());
846- if (OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
859+ if (OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
847860 &adaptor,
848861 snapshot.version (),
849862 INDEX_TYPE_HYBRID_INDEX_EMBEDDED_LOCAL ,
@@ -975,7 +988,7 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
975988 int64_t loop_cnt = 0 ;
976989 if (OB_FAIL (new_row.init (task_ctx->embedded_table_column_ids_ .count ()))) {
977990 LOG_WARN (" fail to init datum row" , K (ret), K (task_ctx->embedded_table_column_ids_ ), K (new_row));
978- } else if (adaptor.get_is_need_vid () && OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
991+ } else if (adaptor.get_is_need_vid () && OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
979992 &adaptor,
980993 ctx_->task_status_ .target_scn_ ,
981994 INDEX_TYPE_VEC_VID_ROWKEY_LOCAL ,
@@ -1053,7 +1066,7 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
10531066 ObTableScanIterator *embedded_table_scan_iter = nullptr ;
10541067 ObArenaAllocator embedde_scan_allocator (" VecEmbedding" , OB_MALLOC_NORMAL_BLOCK_SIZE , MTL_ID ());
10551068 ObRowkey rowkey (obj_ptr, embedded_rowkey_count);
1056- if (OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
1069+ if (OB_FAIL (ObPluginVectorIndexUtils::read_local_tablet (ls_id_,
10571070 &adaptor,
10581071 snapshot.version (),
10591072 INDEX_TYPE_HYBRID_INDEX_EMBEDDED_LOCAL ,
@@ -1098,7 +1111,7 @@ int ObHybridVectorRefreshTask::after_embedding(ObPluginVectorIndexAdaptor &adapt
10981111 }
10991112 }
11001113 }
1101-
1114+
11021115 CHECK_TASK_CANCELLED_IN_PROCESS (ret, loop_cnt, ctx_);
11031116 }
11041117 if (OB_NOT_NULL (tsc_service)) {
0 commit comments