@@ -977,7 +977,7 @@ int ObTableLoadCoordinator::add_check_begin_result_task()
977977 LOG_WARN (" ObTableLoadCoordinator not init" , KR (ret), KP (this ));
978978 } else {
979979 ObTableLoadTask *task = nullptr ;
980- // 1. assign task
980+ // 1. Alloc task
981981 if (OB_FAIL (ctx_->alloc_task (task))) {
982982 LOG_WARN (" fail to alloc task" , KR (ret));
983983 }
@@ -1242,7 +1242,7 @@ int ObTableLoadCoordinator::add_check_merge_result_task()
12421242 LOG_WARN (" ObTableLoadCoordinator not init" , KR (ret), KP (this ));
12431243 } else {
12441244 ObTableLoadTask *task = nullptr ;
1245- // 1. assign task
1245+ // 1. Alloc task
12461246 if (OB_FAIL (ctx_->alloc_task (task))) {
12471247 LOG_WARN (" fail to alloc task" , KR (ret));
12481248 }
@@ -1889,20 +1889,17 @@ class ObTableLoadCoordinator::WriteTaskProcessor : public ObITableLoadTaskProces
18891889{
18901890public:
18911891 WriteTaskProcessor (ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
1892- ObTableLoadTransBucketWriter *bucket_writer, int32_t session_id)
1892+ int32_t session_id)
18931893 : ObITableLoadTaskProcessor(task),
18941894 ctx_ (ctx),
18951895 trans_(trans),
1896- bucket_writer_(bucket_writer),
18971896 session_id_(session_id)
18981897 {
18991898 ctx_->inc_ref_count ();
19001899 trans_->inc_ref_count ();
1901- bucket_writer_->inc_ref_count ();
19021900 }
19031901 virtual ~WriteTaskProcessor ()
19041902 {
1905- trans_->put_bucket_writer (bucket_writer_);
19061903 ctx_->coordinator_ctx_ ->put_trans (trans_);
19071904 ObTableLoadService::put_ctx (ctx_);
19081905 }
@@ -1936,36 +1933,32 @@ class ObTableLoadCoordinator::WriteTaskProcessor : public ObITableLoadTaskProces
19361933 {
19371934 OB_TABLE_LOAD_STATISTICS_TIME_COST (INFO , coordinator_write_time_us);
19381935 int ret = OB_SUCCESS ;
1939- if (OB_SUCC (trans_->check_trans_status (ObTableLoadTransStatusType::RUNNING )) ||
1940- OB_SUCC (trans_-> check_trans_status ( ObTableLoadTransStatusType::FROZEN ))) {
1941- if ( OB_FAIL (bucket_writer_-> write (session_id_, obj_rows_))) {
1942- LOG_WARN ( " fail to write bucket pool " , KR (ret));
1943- }
1936+ if (OB_FAIL (trans_->check_trans_status (ObTableLoadTransStatusType::RUNNING ,
1937+ ObTableLoadTransStatusType::FROZEN ))) {
1938+ LOG_WARN ( " fail to check trans status " , KR (ret));
1939+ } else if ( OB_FAIL (trans_-> get_bucket_writer ()-> write (session_id_, obj_rows_))) {
1940+ LOG_WARN ( " fail to write bucket pool " , KR (ret));
19441941 }
19451942 return ret;
19461943 }
19471944private:
19481945 ObTableLoadTableCtx * const ctx_;
19491946 ObTableLoadCoordinatorTrans * const trans_;
1950- ObTableLoadTransBucketWriter * const bucket_writer_;
19511947 const int32_t session_id_;
19521948 ObTableLoadObjRowArray obj_rows_;
19531949};
19541950
19551951class ObTableLoadCoordinator ::WriteTaskCallback : public ObITableLoadTaskCallback
19561952{
19571953public:
1958- WriteTaskCallback (ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
1959- ObTableLoadTransBucketWriter *bucket_writer)
1960- : ctx_(ctx), trans_(trans), bucket_writer_(bucket_writer)
1954+ WriteTaskCallback (ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans)
1955+ : ctx_(ctx), trans_(trans)
19611956 {
19621957 ctx_->inc_ref_count ();
19631958 trans_->inc_ref_count ();
1964- bucket_writer_->inc_ref_count ();
19651959 }
19661960 virtual ~WriteTaskCallback ()
19671961 {
1968- trans_->put_bucket_writer (bucket_writer_);
19691962 ctx_->coordinator_ctx_ ->put_trans (trans_);
19701963 ObTableLoadService::put_ctx (ctx_);
19711964 }
@@ -1980,7 +1973,6 @@ class ObTableLoadCoordinator::WriteTaskCallback : public ObITableLoadTaskCallbac
19801973private:
19811974 ObTableLoadTableCtx * const ctx_;
19821975 ObTableLoadCoordinatorTrans * const trans_;
1983- ObTableLoadTransBucketWriter * const bucket_writer_; // To ensure that the reference to bucket_writer is reset only after receiving the result of this write operation
19841976};
19851977
19861978int ObTableLoadCoordinator::write (const ObTableLoadTransId &trans_id, int32_t session_id,
@@ -1995,15 +1987,10 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
19951987 } else {
19961988 LOG_DEBUG (" coordinator write" );
19971989 ObTableLoadCoordinatorTrans *trans = nullptr ;
1998- ObTableLoadTransBucketWriter *bucket_writer = nullptr ;
19991990 ObTableLoadMutexGuard guard;
20001991 if (OB_FAIL (coordinator_ctx_->get_trans (trans_id, trans))) {
20011992 LOG_WARN (" fail to get trans" , KR (ret));
20021993 } else if (session_id == 0 && FALSE_IT (session_id = trans->get_default_session_id ())) {
2003- }
2004- // retrieve bucket_writer
2005- else if (OB_FAIL (trans->get_bucket_writer_for_write (bucket_writer))) {
2006- LOG_WARN (" fail to get bucket writer" , KR (ret));
20071994 // } else if (OB_FAIL(bucket_writer->advance_sequence_no(session_id, sequence_no, guard))) {
20081995 // if (OB_UNLIKELY(OB_ENTRY_EXIST != ret)) {
20091996 // LOG_WARN("fail to advance sequence no", KR(ret), K(session_id));
@@ -2013,12 +2000,12 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
20132000 } else {
20142001 ObTableLoadTask *task = nullptr ;
20152002 WriteTaskProcessor *processor = nullptr ;
2016- // 1. assign task
2003+ // 1. Alloc task
20172004 if (OB_FAIL (ctx_->alloc_task (task))) {
20182005 LOG_WARN (" fail to alloc task" , KR (ret));
20192006 }
20202007 // 2. Set processor
2021- else if (OB_FAIL (task->set_processor <WriteTaskProcessor>(ctx_, trans, bucket_writer, session_id))) {
2008+ else if (OB_FAIL (task->set_processor <WriteTaskProcessor>(ctx_, trans, session_id))) {
20222009 LOG_WARN (" fail to set write task processor" , KR (ret));
20232010 } else if (OB_ISNULL (processor = dynamic_cast <WriteTaskProcessor *>(task->get_processor ()))) {
20242011 ret = OB_ERR_UNEXPECTED ;
@@ -2027,7 +2014,7 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
20272014 LOG_WARN (" fail to set objs" , KR (ret), K (coordinator_ctx_->idx_array_ ));
20282015 }
20292016 // 3. Set callback
2030- else if (OB_FAIL (task->set_callback <WriteTaskCallback>(ctx_, trans, bucket_writer ))) {
2017+ else if (OB_FAIL (task->set_callback <WriteTaskCallback>(ctx_, trans))) {
20312018 LOG_WARN (" fail to set write task callback" , KR (ret));
20322019 }
20332020 // 4. Put task into scheduler
@@ -2041,18 +2028,13 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
20412028 }
20422029 }
20432030 if (OB_NOT_NULL (trans)) {
2044- if (OB_NOT_NULL (bucket_writer)) {
2045- trans->put_bucket_writer (bucket_writer);
2046- bucket_writer = nullptr ;
2047- }
20482031 coordinator_ctx_->put_trans (trans);
20492032 trans = nullptr ;
20502033 }
20512034 }
20522035 return ret;
20532036}
20542037
2055-
20562038/* *
20572039 * flush
20582040 */
@@ -2061,20 +2043,17 @@ class ObTableLoadCoordinator::FlushTaskProcessor : public ObITableLoadTaskProces
20612043{
20622044public:
20632045 FlushTaskProcessor (ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
2064- ObTableLoadTransBucketWriter *bucket_writer, int32_t session_id)
2046+ int32_t session_id)
20652047 : ObITableLoadTaskProcessor(task),
20662048 ctx_ (ctx),
20672049 trans_(trans),
2068- bucket_writer_(bucket_writer),
20692050 session_id_(session_id)
20702051 {
20712052 ctx_->inc_ref_count ();
20722053 trans_->inc_ref_count ();
2073- bucket_writer_->inc_ref_count ();
20742054 }
20752055 virtual ~FlushTaskProcessor ()
20762056 {
2077- trans_->put_bucket_writer (bucket_writer_);
20782057 ctx_->coordinator_ctx_ ->put_trans (trans_);
20792058 ObTableLoadService::put_ctx (ctx_);
20802059 }
@@ -2083,33 +2062,39 @@ class ObTableLoadCoordinator::FlushTaskProcessor : public ObITableLoadTaskProces
20832062 OB_TABLE_LOAD_STATISTICS_TIME_COST (INFO , coordinator_flush_time_us);
20842063 int ret = OB_SUCCESS ;
20852064 if (OB_SUCC (trans_->check_trans_status (ObTableLoadTransStatusType::FROZEN ))) {
2086- if (OB_FAIL (bucket_writer_->flush (session_id_))) {
2065+ bool is_finished = false ;
2066+ if (OB_FAIL (trans_->get_bucket_writer ()->flush (session_id_, is_finished))) {
20872067 LOG_WARN (" fail to flush bucket" , KR (ret));
2068+ } else if (!is_finished) {
2069+ // do nothing
2070+ } else {
2071+ ObTableLoadCoordinator coordinator (ctx_);
2072+ if (OB_FAIL (coordinator.init ())) {
2073+ LOG_WARN (" fail to init coordinator" , KR (ret));
2074+ } else if (OB_FAIL (coordinator.finish_trans_peers (trans_))) {
2075+ LOG_WARN (" fail to finish trans peers" , KR (ret));
2076+ }
20882077 }
20892078 }
20902079 return ret;
20912080 }
20922081private:
20932082 ObTableLoadTableCtx * const ctx_;
20942083 ObTableLoadCoordinatorTrans * const trans_;
2095- ObTableLoadTransBucketWriter * const bucket_writer_;
20962084 const int32_t session_id_;
20972085};
20982086
20992087class ObTableLoadCoordinator ::FlushTaskCallback : public ObITableLoadTaskCallback
21002088{
21012089public:
2102- FlushTaskCallback (ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
2103- ObTableLoadTransBucketWriter *bucket_writer)
2104- : ctx_(ctx), trans_(trans), bucket_writer_(bucket_writer)
2090+ FlushTaskCallback (ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans)
2091+ : ctx_(ctx), trans_(trans)
21052092 {
21062093 ctx_->inc_ref_count ();
21072094 trans_->inc_ref_count ();
2108- bucket_writer_->inc_ref_count ();
21092095 }
21102096 virtual ~FlushTaskCallback ()
21112097 {
2112- trans_->put_bucket_writer (bucket_writer_);
21132098 ctx_->coordinator_ctx_ ->put_trans (trans_);
21142099 ObTableLoadService::put_ctx (ctx_);
21152100 }
@@ -2125,7 +2110,6 @@ class ObTableLoadCoordinator::FlushTaskCallback : public ObITableLoadTaskCallbac
21252110private:
21262111 ObTableLoadTableCtx * const ctx_;
21272112 ObTableLoadCoordinatorTrans * const trans_;
2128- ObTableLoadTransBucketWriter * const bucket_writer_; // To ensure that the reference to bucket_writer is reset only after receiving the result of this write operation
21292113};
21302114
21312115int ObTableLoadCoordinator::flush (ObTableLoadCoordinatorTrans *trans)
@@ -2136,26 +2120,21 @@ int ObTableLoadCoordinator::flush(ObTableLoadCoordinatorTrans *trans)
21362120 LOG_WARN (" ObTableLoadCoordinator not init" , KR (ret), KP (this ));
21372121 } else {
21382122 LOG_DEBUG (" coordinator flush" );
2139- ObTableLoadTransBucketWriter *bucket_writer = nullptr ;
2140- // retrieve bucket_writer
2141- if (OB_FAIL (trans->get_bucket_writer_for_flush (bucket_writer))) {
2142- LOG_WARN (" fail to get bucket writer" , KR (ret));
2143- } else if (OB_FAIL (trans->set_trans_status_frozen ())) {
2123+ if (OB_FAIL (trans->set_trans_status_frozen ())) {
21442124 LOG_WARN (" fail to freeze trans" , KR (ret));
21452125 } else {
21462126 for (int32_t session_id = 1 ; OB_SUCC (ret) && session_id <= param_.write_session_count_ ; ++session_id) {
21472127 ObTableLoadTask *task = nullptr ;
2148- // 1. assign task
2128+ // 1. Alloc task
21492129 if (OB_FAIL (ctx_->alloc_task (task))) {
21502130 LOG_WARN (" fail to alloc task" , KR (ret));
21512131 }
21522132 // 2. Set processor
2153- else if (OB_FAIL (task->set_processor <FlushTaskProcessor>(ctx_, trans, bucket_writer,
2154- session_id))) {
2133+ else if (OB_FAIL (task->set_processor <FlushTaskProcessor>(ctx_, trans, session_id))) {
21552134 LOG_WARN (" fail to set flush task processor" , KR (ret));
21562135 }
21572136 // 3. Set callback
2158- else if (OB_FAIL (task->set_callback <FlushTaskCallback>(ctx_, trans, bucket_writer ))) {
2137+ else if (OB_FAIL (task->set_callback <FlushTaskCallback>(ctx_, trans))) {
21592138 LOG_WARN (" fail to set flush task callback" , KR (ret));
21602139 }
21612140 // 4. put task into scheduler
@@ -2169,10 +2148,6 @@ int ObTableLoadCoordinator::flush(ObTableLoadCoordinatorTrans *trans)
21692148 }
21702149 }
21712150 }
2172- if (OB_NOT_NULL (bucket_writer)) {
2173- trans->put_bucket_writer (bucket_writer);
2174- bucket_writer = nullptr ;
2175- }
21762151 }
21772152 return ret;
21782153}
0 commit comments