Skip to content

Commit 8252547

Browse files
obdevfootka
authored andcommitted
[CP] dag task错误码被ob cancel覆盖
Co-authored-by: footka <672528926@qq.com>
1 parent e9dd7c5 commit 8252547

15 files changed

Lines changed: 352 additions & 61 deletions

deps/oblib/src/lib/utility/ob_tracepoint_def.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,9 @@ GLOBAL_ERRSIM_POINT_DEF(543, EN_FTS_INDEX_BUILD_DOC_WORD_FAILED, "");
385385
GLOBAL_ERRSIM_POINT_DEF(544, FTS_INDEX_SUBTASK_FAILED, "");
386386
GLOBAL_ERRSIM_POINT_DEF(545, FTS_INDEX_SUBTASK_BUILD_SSTABLE_FAILED, "");
387387
GLOBAL_ERRSIM_POINT_DEF(546, EN_FTS_INDEX_BUILD_PREPARE_FAILED, "");
388+
GLOBAL_ERRSIM_POINT_DEF(547, EN_CLUSTERING_KEY_BIOLD_SSTABLE_FAILED, "");
389+
// vec index
390+
GLOBAL_ERRSIM_POINT_DEF(550, EN_POST_VEC_INDEX_BUILD_ROWKEY_VID_TBL_ERR, "");
388391
// SQL Optimizer related 551-599
389392
GLOBAL_ERRSIM_POINT_DEF(551, EN_EXPLAIN_GENERATE_PLAN_WITH_OUTLINE, "Used to enable outline validity check for explain query");
390393
GLOBAL_ERRSIM_POINT_DEF(552, EN_ENABLE_AUTO_DOP_FORCE_PARALLEL_PLAN, "Used to generate parallel plan with random dop");

src/rootserver/ddl_task/ob_ddl_scheduler.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,8 @@ int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam &param,
13411341
param.tenant_data_version_,
13421342
*param.allocator_,
13431343
task_record,
1344-
param.new_snapshot_version_))) {
1344+
param.new_snapshot_version_,
1345+
param.ddl_need_retry_at_executor_))) {
13451346
LOG_WARN("fail to create build vec index task", K(ret));
13461347
}
13471348
break;
@@ -2215,7 +2216,8 @@ int ObDDLScheduler::create_build_vec_index_task(
22152216
const uint64_t tenant_data_version,
22162217
ObIAllocator &allocator,
22172218
ObDDLTaskRecord &task_record,
2218-
int64_t snapshot_version)
2219+
int64_t snapshot_version,
2220+
const bool ddl_need_retry_at_executor)
22192221
{
22202222
int ret = OB_SUCCESS;
22212223
int64_t task_id = 0;
@@ -2245,7 +2247,8 @@ int ObDDLScheduler::create_build_vec_index_task(
22452247
tenant_data_version,
22462248
parent_task_id,
22472249
share::ObDDLTaskStatus::PREPARE,
2248-
snapshot_version))) {
2250+
snapshot_version,
2251+
!ddl_need_retry_at_executor))) {
22492252
LOG_WARN("init fts index task failed", K(ret), K(data_table_schema), K(index_schema));
22502253
} else if (OB_FAIL(index_task.set_trace_id(*ObCurTraceId::get_trace_id()))) {
22512254
LOG_WARN("set trace id failed", K(ret));

src/rootserver/ddl_task/ob_ddl_scheduler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,8 @@ class ObDDLScheduler : public rootserver::ObTenantThreadHelper,
460460
const uint64_t tenant_data_version,
461461
ObIAllocator &allocator,
462462
ObDDLTaskRecord &task_record,
463-
const int64_t snapshot_version);
463+
const int64_t snapshot_version,
464+
const bool ddl_need_retry_at_executor = false);
464465
int create_constraint_task(
465466
common::ObISQLClient &proxy,
466467
const share::schema::ObTableSchema *table_schema,

src/rootserver/ddl_task/ob_ddl_tablet_scheduler.cpp

Lines changed: 162 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
#define USING_LOG_PREFIX RS
1717

1818
#include "ob_ddl_tablet_scheduler.h"
19+
#include "ob_index_build_task.h"
1920
#include "rootserver/ob_ddl_service_launcher.h" // for ObDDLServiceLauncher
2021
#include "rootserver/ob_root_service.h"
2122
#include "share/ob_ddl_checksum.h"
2223
#include "share/tablet/ob_tablet_to_ls_operator.h"
2324
#include "src/observer/ob_inner_sql_connection.h"
2425
#include "share/vector_index/ob_vector_index_util.h"
26+
#include "lib/utility/serialization.h"
2527

2628
using namespace oceanbase::rootserver;
2729
using namespace oceanbase::common;
@@ -110,6 +112,8 @@ int ObDDLTabletScheduler::init(const uint64_t tenant_id,
110112
LOG_WARN("fail to create column checksum map", K(ret), K(ref_data_table_tablets.count()));
111113
} else if (OB_FAIL(tablet_scheduled_times_statistic_.create(ref_data_table_tablets.count(), ObModIds::OB_SSTABLE_CREATE_INDEX))) {
112114
LOG_WARN("fail to create tablet scheduled count statistic map", K(ret), K(ref_data_table_tablets.count()));
115+
} else if (!tablet_id_to_execution_id_map_.created() && OB_FAIL(tablet_id_to_execution_id_map_.create(tablets.count(), ObModIds::OB_SSTABLE_CREATE_INDEX))) {
116+
LOG_WARN("fail to create tablet id to execution id map", K(ret), K(tablets.count()));
113117
} else if (OB_FAIL(ObDDLChecksumOperator::get_local_index_tablet_finish_status(
114118
tenant_id,
115119
ref_data_table_id,
@@ -255,11 +259,9 @@ int ObDDLTabletScheduler::get_next_batch_tablets(const bool is_ddl_retryable,
255259
if (OB_SUCC(ret)) {
256260
ret = OB_EAGAIN;
257261
}
258-
} else if (OB_FAIL(ObDDLTask::push_execution_id(tenant_id_, task_id_, task_type, is_ddl_retryable, new_execution_id))) {
259-
LOG_WARN("failed to fetch new execution id", K(ret), K(tenant_id_), K(task_id_), K(new_execution_id));
260262
} else if (OB_FAIL(get_next_parallelism(parallelism))) {
261263
LOG_WARN("fail to get next parallelism", K(ret), K(parallelism));
262-
} else if (OB_FAIL(get_unfinished_tablets(new_execution_id, ls_id, leader_addr, tablets))) {
264+
} else if (OB_FAIL(get_unfinished_tablets(task_type, is_ddl_retryable, new_execution_id, ls_id, leader_addr, tablets))) {
263265
LOG_WARN("failed to get unfinished tablets", K(ret), K(new_execution_id), K(tablets));
264266
}
265267
return ret;
@@ -309,6 +311,157 @@ int ObDDLTabletScheduler::confirm_batch_tablets_status(const int64_t execution_i
309311
return ret;
310312
}
311313

314+
// in (idempotent_mode && ddl can not retry) case, every tablet's execution id cannot be pushed more than once
315+
// so push_tablet_execution_id does the following:
316+
// 1.push tablet execution id, and check if the tablet execution id is pushed more than once
317+
// 2.push task execution id anyway, it is no need to check task execution id, we always push task execution id high to avoid different inner sql has same task execution id.
318+
int ObDDLTabletScheduler::push_tablet_execution_id(share::ObDDLType task_type,
319+
const bool ddl_can_retry,
320+
const common::ObIArray<common::ObTabletID> &tablets,
321+
int64_t &new_task_execution_id)
322+
{
323+
int ret = OB_SUCCESS;
324+
new_task_execution_id = 0;
325+
if (OB_UNLIKELY(!is_inited_)) {
326+
ret = OB_NOT_INIT;
327+
LOG_WARN("scheduler not init", K(ret));
328+
} else if (OB_UNLIKELY(tablets.count() <= 0)) {
329+
ret = OB_INVALID_ARGUMENT;
330+
LOG_WARN("invalid tablets", K(ret), K(tablets.count()));
331+
} else {
332+
int64_t next_tablet_execution_id = DEFAULT_EXECUTION_ID;
333+
for (int64_t i = 0; OB_SUCC(ret) && i < tablets.count(); ++i) {
334+
const ObTabletID &tablet = tablets.at(i);
335+
int64_t tablet_execution_id = -1;
336+
if (OB_FAIL(tablet_id_to_execution_id_map_.get_refactored(tablet, tablet_execution_id))) {
337+
if (OB_HASH_NOT_EXIST != ret) {
338+
LOG_WARN("failed to get tablet execution id", K(ret), K(tablet));
339+
} else {
340+
tablet_execution_id = -1;
341+
ret = OB_SUCCESS;
342+
}
343+
} else if (OB_FAIL(ObDDLTask::calc_next_execution_id(tablet_execution_id, task_type, ddl_can_retry, next_tablet_execution_id))) {
344+
LOG_WARN("calc next execution id failed", K(ret), K(tablet_execution_id), K(task_type), K(ddl_can_retry));
345+
} else if (OB_FAIL(tablet_id_to_execution_id_map_.set_refactored(tablet, next_tablet_execution_id, true))) {
346+
LOG_WARN("set tablet execution id failed", K(ret), K(tablet), K(next_tablet_execution_id));
347+
}
348+
}
349+
350+
if (OB_SUCC(ret)) {
351+
if (OB_FAIL(push_task_execution_id(new_task_execution_id))) {
352+
LOG_WARN("failed to push execution id", K(ret), K(new_task_execution_id));
353+
}
354+
}
355+
}
356+
return ret;
357+
}
358+
359+
// push task execution id high anyway, cause retry check is done in push_tablet_execution_id
360+
int ObDDLTabletScheduler::push_task_execution_id(int64_t &new_task_execution_id)
361+
{
362+
int ret = OB_SUCCESS;
363+
ObMySQLTransaction trans;
364+
int64_t task_status = 0;
365+
int64_t task_execution_id = 0;
366+
int64_t ret_code = OB_SUCCESS;
367+
int64_t unused_snapshot_ver = OB_INVALID_VERSION;
368+
if (OB_UNLIKELY(!is_inited_)) {
369+
ret = OB_NOT_INIT;
370+
LOG_WARN("scheduler not init", K(ret));
371+
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
372+
ret = OB_INVALID_ARGUMENT;
373+
LOG_WARN("invalid argument", KR(ret), KP(GCTX.sql_proxy_));
374+
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id_))) {
375+
LOG_WARN("start transaction failed", K(ret));
376+
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, task_status, task_execution_id, ret_code, unused_snapshot_ver))) {
377+
LOG_WARN("select for update failed", K(ret), K(tenant_id_), K(task_id_));
378+
} else if (task_execution_id == -1) {
379+
task_execution_id = 0;
380+
}
381+
382+
if (OB_SUCC(ret)) {
383+
if (OB_FAIL(ObDDLTaskRecordOperator::update_execution_id(trans, tenant_id_, task_id_, task_execution_id + 1))) {
384+
LOG_WARN("update execution id failed", K(ret), K(tenant_id_), K(task_id_), K(task_execution_id + 1));
385+
} else {
386+
new_task_execution_id = task_execution_id + 1;
387+
}
388+
}
389+
390+
bool commit = (OB_SUCCESS == ret);
391+
int tmp_ret = trans.end(commit);
392+
if (OB_SUCCESS != tmp_ret) {
393+
LOG_WARN("fail to end trans", K(tmp_ret));
394+
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
395+
}
396+
return ret;
397+
}
398+
399+
OB_DEF_SERIALIZE(ObDDLTabletScheduler)
400+
{
401+
int ret = OB_SUCCESS;
402+
common::ObSArray<ObTabletExecutionIdPair> pairs;
403+
if (tablet_id_to_execution_id_map_.created()) {
404+
for (hash::ObHashMap<common::ObTabletID, int64_t>::const_iterator iter = tablet_id_to_execution_id_map_.begin();
405+
OB_SUCC(ret) && iter != tablet_id_to_execution_id_map_.end();
406+
++iter) {
407+
ObTabletExecutionIdPair pair(iter->first, iter->second);
408+
if (OB_FAIL(pairs.push_back(pair))) {
409+
LOG_WARN("failed to push tablet execution id pair", K(ret), K(pair));
410+
}
411+
}
412+
}
413+
if (OB_SUCC(ret)) {
414+
if (OB_FAIL(pairs.serialize(buf, buf_len, pos))) {
415+
LOG_WARN("failed to serialize tablet execution id pairs", K(ret));
416+
}
417+
}
418+
return ret;
419+
}
420+
421+
OB_DEF_DESERIALIZE(ObDDLTabletScheduler)
422+
{
423+
int ret = OB_SUCCESS;
424+
common::ObSArray<ObTabletExecutionIdPair> pairs;
425+
if (OB_FAIL(pairs.deserialize(buf, data_len, pos))) {
426+
LOG_WARN("failed to deserialize tablet execution id pairs", K(ret));
427+
} else if (pairs.count() > 0) {
428+
if (!tablet_id_to_execution_id_map_.created()) {
429+
int64_t bucket_num = pairs.count();
430+
if (OB_FAIL(tablet_id_to_execution_id_map_.create(bucket_num, ObModIds::OB_SSTABLE_CREATE_INDEX))) {
431+
LOG_WARN("failed to create tablet execution id map", K(ret), K(pairs.count()));
432+
}
433+
} else if (OB_FAIL(tablet_id_to_execution_id_map_.reuse())) {
434+
LOG_WARN("failed to reuse tablet execution id map", K(ret));
435+
}
436+
437+
for (int64_t i = 0; OB_SUCC(ret) && i < pairs.count(); ++i) {
438+
const ObTabletExecutionIdPair &pair = pairs.at(i);
439+
if (OB_FAIL(tablet_id_to_execution_id_map_.set_refactored(pair.tablet_id_, pair.execution_id_, true))) {
440+
LOG_WARN("failed to set tablet execution id pair", K(ret), K(pair));
441+
}
442+
}
443+
}
444+
return ret;
445+
}
446+
447+
OB_DEF_SERIALIZE_SIZE(ObDDLTabletScheduler)
448+
{
449+
int64_t len = 0;
450+
if (tablet_id_to_execution_id_map_.created()) {
451+
const int64_t count = tablet_id_to_execution_id_map_.size();
452+
len = serialization::encoded_length_vi64(count);
453+
for (hash::ObHashMap<common::ObTabletID, int64_t>::const_iterator iter = tablet_id_to_execution_id_map_.begin();
454+
iter != tablet_id_to_execution_id_map_.end();
455+
++iter) {
456+
ObTabletExecutionIdPair pair(iter->first, iter->second);
457+
len += pair.get_serialize_size();
458+
}
459+
} else {
460+
len = serialization::encoded_length_vi64(0);
461+
}
462+
return len;
463+
}
464+
312465
int ObDDLTabletScheduler::refresh_ls_location_map() {
313466
int ret = OB_SUCCESS;
314467
TCRLockGuard guard(lock_);
@@ -353,9 +506,10 @@ int ObDDLTabletScheduler::get_next_parallelism(int64_t &parallelism)
353506
}
354507

355508

356-
int ObDDLTabletScheduler::get_unfinished_tablets(const int64_t execution_id, share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets)
509+
int ObDDLTabletScheduler::get_unfinished_tablets(const share::ObDDLType task_type,const bool ddl_can_retry, int64_t &new_execution_id, share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets)
357510
{
358511
int ret = OB_SUCCESS;
512+
new_execution_id = 0;
359513
tablets.reset();
360514
ls_id.reset();
361515
leader_addr.reset();
@@ -376,10 +530,12 @@ int ObDDLTabletScheduler::get_unfinished_tablets(const int64_t execution_id, sha
376530
LOG_WARN("fail to get ls host left disk space", K(ret), K(tenant_id_), K(ls_id), K(leader_addr), K(left_space_size));
377531
} else if (OB_FAIL(calculate_candidate_tablets(left_space_size, tablet_queue, tablets))) {
378532
LOG_WARN("fail to use strategy to get tablets", K(ret), K(left_space_size), K(tablet_queue), K(tablets));
533+
} else if (OB_FAIL(push_tablet_execution_id(task_type, ddl_can_retry, tablets, new_execution_id))) {
534+
LOG_WARN("failed to push tablet execution id", K(ret), K(task_type), K(ddl_can_retry), K(tablets), K(new_execution_id));
379535
} else {
380536
TCWLockGuard guard(lock_);
381-
if (OB_FAIL(running_ls_to_execution_id_.set_refactored(ls_id, execution_id, true /* overwrite */))) {
382-
LOG_WARN("running ls to execution id map set fail", K(ret), K(ls_id), K(execution_id));
537+
if (OB_FAIL(running_ls_to_execution_id_.set_refactored(ls_id, new_execution_id, true /* overwrite */))) {
538+
LOG_WARN("running ls to execution id map set fail", K(ret), K(ls_id), K(new_execution_id));
383539
} else {
384540
ObArray<ObTabletID> running_tablet_queue;
385541
if (OB_FAIL(running_tablet_queue.assign(tablets))) {

src/rootserver/ddl_task/ob_ddl_tablet_scheduler.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ namespace rootserver
2525
{
2626
class ObDDLTabletScheduler final
2727
{
28+
OB_UNIS_VERSION(1);
2829
public:
2930
ObDDLTabletScheduler();
3031
~ObDDLTabletScheduler();
@@ -47,7 +48,7 @@ class ObDDLTabletScheduler final
4748
K_(task_id), K_(parallelism), K_(snapshot_version), K_(trace_id), K_(all_tablets), K_(running_task_ls_ids_before));
4849
private:
4950
int get_next_parallelism(int64_t &parallelism);
50-
int get_unfinished_tablets(const int64_t execution_id, share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets);
51+
int get_unfinished_tablets(const share::ObDDLType task_type, const bool ddl_can_retry, int64_t &new_execution_id, share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets);
5152
int get_to_be_scheduled_tablets(share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets);
5253
int calculate_candidate_tablets(const uint64_t left_space_size, const ObIArray<ObTabletID> &in_tablets, ObIArray<ObTabletID> &out_tablets);
5354
int get_session_running_lsid(ObIArray<share::ObLSID> &running_ls_ids);
@@ -58,6 +59,11 @@ class ObDDLTabletScheduler final
5859
bool is_all_tasks_finished();
5960
bool is_running_tasks_before_finished();
6061
int refresh_ls_location_map();
62+
int push_tablet_execution_id(share::ObDDLType task_type,
63+
const bool ddl_can_retry,
64+
const common::ObIArray<common::ObTabletID> &tablets,
65+
int64_t &new_task_execution_id);
66+
int push_task_execution_id(int64_t &new_task_execution_id);
6167
void destroy();
6268
private:
6369
bool is_inited_;
@@ -79,6 +85,7 @@ class ObDDLTabletScheduler final
7985
common::hash::ObHashMap<int64_t, int64_t> tablet_id_to_data_size_;
8086
common::hash::ObHashMap<int64_t, int64_t> tablet_id_to_data_row_cnt_;
8187
common::hash::ObHashMap<int64_t, int64_t> tablet_scheduled_times_statistic_;
88+
common::hash::ObHashMap<common::ObTabletID, int64_t> tablet_id_to_execution_id_map_;
8289
};
8390

8491
class ObTabletIdUpdater final

0 commit comments

Comments
 (0)