1616#define USING_LOG_PREFIX RS
1717
1818#include " ob_ddl_tablet_scheduler.h"
19+ #include " ob_index_build_task.h"
1920#include " rootserver/ob_ddl_service_launcher.h" // for ObDDLServiceLauncher
2021#include " rootserver/ob_root_service.h"
2122#include " share/ob_ddl_checksum.h"
2223#include " share/tablet/ob_tablet_to_ls_operator.h"
2324#include " src/observer/ob_inner_sql_connection.h"
2425#include " share/vector_index/ob_vector_index_util.h"
26+ #include " lib/utility/serialization.h"
2527
2628using namespace oceanbase ::rootserver;
2729using namespace oceanbase ::common;
@@ -110,6 +112,8 @@ int ObDDLTabletScheduler::init(const uint64_t tenant_id,
110112 LOG_WARN (" fail to create column checksum map" , K (ret), K (ref_data_table_tablets.count ()));
111113 } else if (OB_FAIL (tablet_scheduled_times_statistic_.create (ref_data_table_tablets.count (), ObModIds::OB_SSTABLE_CREATE_INDEX ))) {
112114 LOG_WARN (" fail to create tablet scheduled count statistic map" , K (ret), K (ref_data_table_tablets.count ()));
115+ } else if (!tablet_id_to_execution_id_map_.created () && OB_FAIL (tablet_id_to_execution_id_map_.create (tablets.count (), ObModIds::OB_SSTABLE_CREATE_INDEX ))) {
116+ LOG_WARN (" fail to create tablet id to execution id map" , K (ret), K (tablets.count ()));
113117 } else if (OB_FAIL (ObDDLChecksumOperator::get_local_index_tablet_finish_status (
114118 tenant_id,
115119 ref_data_table_id,
@@ -255,11 +259,9 @@ int ObDDLTabletScheduler::get_next_batch_tablets(const bool is_ddl_retryable,
255259 if (OB_SUCC (ret)) {
256260 ret = OB_EAGAIN ;
257261 }
258- } else if (OB_FAIL (ObDDLTask::push_execution_id (tenant_id_, task_id_, task_type, is_ddl_retryable, new_execution_id))) {
259- LOG_WARN (" failed to fetch new execution id" , K (ret), K (tenant_id_), K (task_id_), K (new_execution_id));
260262 } else if (OB_FAIL (get_next_parallelism (parallelism))) {
261263 LOG_WARN (" fail to get next parallelism" , K (ret), K (parallelism));
262- } else if (OB_FAIL (get_unfinished_tablets (new_execution_id, ls_id, leader_addr, tablets))) {
264+ } else if (OB_FAIL (get_unfinished_tablets (task_type, is_ddl_retryable, new_execution_id, ls_id, leader_addr, tablets))) {
263265 LOG_WARN (" failed to get unfinished tablets" , K (ret), K (new_execution_id), K (tablets));
264266 }
265267 return ret;
@@ -309,6 +311,157 @@ int ObDDLTabletScheduler::confirm_batch_tablets_status(const int64_t execution_i
309311 return ret;
310312}
311313
314+ // in (idempotent_mode && ddl can not retry) case, every tablet's execution id cannot be pushed more than once
315+ // so push_tablet_execution_id does the following:
316+ // 1.push tablet execution id, and check if the tablet execution id is pushed more than once
317+ // 2.push task execution id anyway, it is no need to check task execution id, we always push task execution id high to avoid different inner sql has same task execution id.
318+ int ObDDLTabletScheduler::push_tablet_execution_id (share::ObDDLType task_type,
319+ const bool ddl_can_retry,
320+ const common::ObIArray<common::ObTabletID> &tablets,
321+ int64_t &new_task_execution_id)
322+ {
323+ int ret = OB_SUCCESS ;
324+ new_task_execution_id = 0 ;
325+ if (OB_UNLIKELY (!is_inited_)) {
326+ ret = OB_NOT_INIT ;
327+ LOG_WARN (" scheduler not init" , K (ret));
328+ } else if (OB_UNLIKELY (tablets.count () <= 0 )) {
329+ ret = OB_INVALID_ARGUMENT ;
330+ LOG_WARN (" invalid tablets" , K (ret), K (tablets.count ()));
331+ } else {
332+ int64_t next_tablet_execution_id = DEFAULT_EXECUTION_ID ;
333+ for (int64_t i = 0 ; OB_SUCC (ret) && i < tablets.count (); ++i) {
334+ const ObTabletID &tablet = tablets.at (i);
335+ int64_t tablet_execution_id = -1 ;
336+ if (OB_FAIL (tablet_id_to_execution_id_map_.get_refactored (tablet, tablet_execution_id))) {
337+ if (OB_HASH_NOT_EXIST != ret) {
338+ LOG_WARN (" failed to get tablet execution id" , K (ret), K (tablet));
339+ } else {
340+ tablet_execution_id = -1 ;
341+ ret = OB_SUCCESS ;
342+ }
343+ } else if (OB_FAIL (ObDDLTask::calc_next_execution_id (tablet_execution_id, task_type, ddl_can_retry, next_tablet_execution_id))) {
344+ LOG_WARN (" calc next execution id failed" , K (ret), K (tablet_execution_id), K (task_type), K (ddl_can_retry));
345+ } else if (OB_FAIL (tablet_id_to_execution_id_map_.set_refactored (tablet, next_tablet_execution_id, true ))) {
346+ LOG_WARN (" set tablet execution id failed" , K (ret), K (tablet), K (next_tablet_execution_id));
347+ }
348+ }
349+
350+ if (OB_SUCC (ret)) {
351+ if (OB_FAIL (push_task_execution_id (new_task_execution_id))) {
352+ LOG_WARN (" failed to push execution id" , K (ret), K (new_task_execution_id));
353+ }
354+ }
355+ }
356+ return ret;
357+ }
358+
359+ // push task execution id high anyway, cause retry check is done in push_tablet_execution_id
360+ int ObDDLTabletScheduler::push_task_execution_id (int64_t &new_task_execution_id)
361+ {
362+ int ret = OB_SUCCESS ;
363+ ObMySQLTransaction trans;
364+ int64_t task_status = 0 ;
365+ int64_t task_execution_id = 0 ;
366+ int64_t ret_code = OB_SUCCESS ;
367+ int64_t unused_snapshot_ver = OB_INVALID_VERSION ;
368+ if (OB_UNLIKELY (!is_inited_)) {
369+ ret = OB_NOT_INIT ;
370+ LOG_WARN (" scheduler not init" , K (ret));
371+ } else if (OB_ISNULL (GCTX .sql_proxy_ )) {
372+ ret = OB_INVALID_ARGUMENT ;
373+ LOG_WARN (" invalid argument" , KR (ret), KP (GCTX .sql_proxy_ ));
374+ } else if (OB_FAIL (trans.start (GCTX .sql_proxy_ , tenant_id_))) {
375+ LOG_WARN (" start transaction failed" , K (ret));
376+ } else if (OB_FAIL (ObDDLTaskRecordOperator::select_for_update (trans, tenant_id_, task_id_, task_status, task_execution_id, ret_code, unused_snapshot_ver))) {
377+ LOG_WARN (" select for update failed" , K (ret), K (tenant_id_), K (task_id_));
378+ } else if (task_execution_id == -1 ) {
379+ task_execution_id = 0 ;
380+ }
381+
382+ if (OB_SUCC (ret)) {
383+ if (OB_FAIL (ObDDLTaskRecordOperator::update_execution_id (trans, tenant_id_, task_id_, task_execution_id + 1 ))) {
384+ LOG_WARN (" update execution id failed" , K (ret), K (tenant_id_), K (task_id_), K (task_execution_id + 1 ));
385+ } else {
386+ new_task_execution_id = task_execution_id + 1 ;
387+ }
388+ }
389+
390+ bool commit = (OB_SUCCESS == ret);
391+ int tmp_ret = trans.end (commit);
392+ if (OB_SUCCESS != tmp_ret) {
393+ LOG_WARN (" fail to end trans" , K (tmp_ret));
394+ ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
395+ }
396+ return ret;
397+ }
398+
399+ OB_DEF_SERIALIZE (ObDDLTabletScheduler)
400+ {
401+ int ret = OB_SUCCESS ;
402+ common::ObSArray<ObTabletExecutionIdPair> pairs;
403+ if (tablet_id_to_execution_id_map_.created ()) {
404+ for (hash::ObHashMap<common::ObTabletID, int64_t >::const_iterator iter = tablet_id_to_execution_id_map_.begin ();
405+ OB_SUCC (ret) && iter != tablet_id_to_execution_id_map_.end ();
406+ ++iter) {
407+ ObTabletExecutionIdPair pair (iter->first , iter->second );
408+ if (OB_FAIL (pairs.push_back (pair))) {
409+ LOG_WARN (" failed to push tablet execution id pair" , K (ret), K (pair));
410+ }
411+ }
412+ }
413+ if (OB_SUCC (ret)) {
414+ if (OB_FAIL (pairs.serialize (buf, buf_len, pos))) {
415+ LOG_WARN (" failed to serialize tablet execution id pairs" , K (ret));
416+ }
417+ }
418+ return ret;
419+ }
420+
421+ OB_DEF_DESERIALIZE (ObDDLTabletScheduler)
422+ {
423+ int ret = OB_SUCCESS ;
424+ common::ObSArray<ObTabletExecutionIdPair> pairs;
425+ if (OB_FAIL (pairs.deserialize (buf, data_len, pos))) {
426+ LOG_WARN (" failed to deserialize tablet execution id pairs" , K (ret));
427+ } else if (pairs.count () > 0 ) {
428+ if (!tablet_id_to_execution_id_map_.created ()) {
429+ int64_t bucket_num = pairs.count ();
430+ if (OB_FAIL (tablet_id_to_execution_id_map_.create (bucket_num, ObModIds::OB_SSTABLE_CREATE_INDEX ))) {
431+ LOG_WARN (" failed to create tablet execution id map" , K (ret), K (pairs.count ()));
432+ }
433+ } else if (OB_FAIL (tablet_id_to_execution_id_map_.reuse ())) {
434+ LOG_WARN (" failed to reuse tablet execution id map" , K (ret));
435+ }
436+
437+ for (int64_t i = 0 ; OB_SUCC (ret) && i < pairs.count (); ++i) {
438+ const ObTabletExecutionIdPair &pair = pairs.at (i);
439+ if (OB_FAIL (tablet_id_to_execution_id_map_.set_refactored (pair.tablet_id_ , pair.execution_id_ , true ))) {
440+ LOG_WARN (" failed to set tablet execution id pair" , K (ret), K (pair));
441+ }
442+ }
443+ }
444+ return ret;
445+ }
446+
447+ OB_DEF_SERIALIZE_SIZE (ObDDLTabletScheduler)
448+ {
449+ int64_t len = 0 ;
450+ if (tablet_id_to_execution_id_map_.created ()) {
451+ const int64_t count = tablet_id_to_execution_id_map_.size ();
452+ len = serialization::encoded_length_vi64 (count);
453+ for (hash::ObHashMap<common::ObTabletID, int64_t >::const_iterator iter = tablet_id_to_execution_id_map_.begin ();
454+ iter != tablet_id_to_execution_id_map_.end ();
455+ ++iter) {
456+ ObTabletExecutionIdPair pair (iter->first , iter->second );
457+ len += pair.get_serialize_size ();
458+ }
459+ } else {
460+ len = serialization::encoded_length_vi64 (0 );
461+ }
462+ return len;
463+ }
464+
312465int ObDDLTabletScheduler::refresh_ls_location_map () {
313466 int ret = OB_SUCCESS ;
314467 TCRLockGuard guard (lock_);
@@ -353,9 +506,10 @@ int ObDDLTabletScheduler::get_next_parallelism(int64_t ¶llelism)
353506}
354507
355508
356- int ObDDLTabletScheduler::get_unfinished_tablets (const int64_t execution_id , share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets)
509+ int ObDDLTabletScheduler::get_unfinished_tablets (const share::ObDDLType task_type, const bool ddl_can_retry, int64_t &new_execution_id , share::ObLSID &ls_id, common::ObAddr &leader_addr, ObIArray<ObTabletID> &tablets)
357510{
358511 int ret = OB_SUCCESS ;
512+ new_execution_id = 0 ;
359513 tablets.reset ();
360514 ls_id.reset ();
361515 leader_addr.reset ();
@@ -376,10 +530,12 @@ int ObDDLTabletScheduler::get_unfinished_tablets(const int64_t execution_id, sha
376530 LOG_WARN (" fail to get ls host left disk space" , K (ret), K (tenant_id_), K (ls_id), K (leader_addr), K (left_space_size));
377531 } else if (OB_FAIL (calculate_candidate_tablets (left_space_size, tablet_queue, tablets))) {
378532 LOG_WARN (" fail to use strategy to get tablets" , K (ret), K (left_space_size), K (tablet_queue), K (tablets));
533+ } else if (OB_FAIL (push_tablet_execution_id (task_type, ddl_can_retry, tablets, new_execution_id))) {
534+ LOG_WARN (" failed to push tablet execution id" , K (ret), K (task_type), K (ddl_can_retry), K (tablets), K (new_execution_id));
379535 } else {
380536 TCWLockGuard guard (lock_);
381- if (OB_FAIL (running_ls_to_execution_id_.set_refactored (ls_id, execution_id , true /* overwrite */ ))) {
382- LOG_WARN (" running ls to execution id map set fail" , K (ret), K (ls_id), K (execution_id ));
537+ if (OB_FAIL (running_ls_to_execution_id_.set_refactored (ls_id, new_execution_id , true /* overwrite */ ))) {
538+ LOG_WARN (" running ls to execution id map set fail" , K (ret), K (ls_id), K (new_execution_id ));
383539 } else {
384540 ObArray<ObTabletID> running_tablet_queue;
385541 if (OB_FAIL (running_tablet_queue.assign (tablets))) {
0 commit comments