Skip to content

Commit a0511d9

Browse files
obdevob-robot
authored andcommitted
[CP] fix direct load trans status not match
1 parent 71aed1b commit a0511d9

18 files changed

Lines changed: 236 additions & 421 deletions

src/observer/table_load/ob_table_load_coordinator.cpp

Lines changed: 31 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,7 @@ int ObTableLoadCoordinator::add_check_begin_result_task()
977977
LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this));
978978
} else {
979979
ObTableLoadTask *task = nullptr;
980-
// 1. assign task
980+
// 1. Alloc task
981981
if (OB_FAIL(ctx_->alloc_task(task))) {
982982
LOG_WARN("fail to alloc task", KR(ret));
983983
}
@@ -1242,7 +1242,7 @@ int ObTableLoadCoordinator::add_check_merge_result_task()
12421242
LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this));
12431243
} else {
12441244
ObTableLoadTask *task = nullptr;
1245-
// 1. assign task
1245+
// 1. Alloc task
12461246
if (OB_FAIL(ctx_->alloc_task(task))) {
12471247
LOG_WARN("fail to alloc task", KR(ret));
12481248
}
@@ -1889,20 +1889,17 @@ class ObTableLoadCoordinator::WriteTaskProcessor : public ObITableLoadTaskProces
18891889
{
18901890
public:
18911891
WriteTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
1892-
ObTableLoadTransBucketWriter *bucket_writer, int32_t session_id)
1892+
int32_t session_id)
18931893
: ObITableLoadTaskProcessor(task),
18941894
ctx_(ctx),
18951895
trans_(trans),
1896-
bucket_writer_(bucket_writer),
18971896
session_id_(session_id)
18981897
{
18991898
ctx_->inc_ref_count();
19001899
trans_->inc_ref_count();
1901-
bucket_writer_->inc_ref_count();
19021900
}
19031901
virtual ~WriteTaskProcessor()
19041902
{
1905-
trans_->put_bucket_writer(bucket_writer_);
19061903
ctx_->coordinator_ctx_->put_trans(trans_);
19071904
ObTableLoadService::put_ctx(ctx_);
19081905
}
@@ -1936,36 +1933,32 @@ class ObTableLoadCoordinator::WriteTaskProcessor : public ObITableLoadTaskProces
19361933
{
19371934
OB_TABLE_LOAD_STATISTICS_TIME_COST(INFO, coordinator_write_time_us);
19381935
int ret = OB_SUCCESS;
1939-
if (OB_SUCC(trans_->check_trans_status(ObTableLoadTransStatusType::RUNNING)) ||
1940-
OB_SUCC(trans_->check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
1941-
if (OB_FAIL(bucket_writer_->write(session_id_, obj_rows_))) {
1942-
LOG_WARN("fail to write bucket pool", KR(ret));
1943-
}
1936+
if (OB_FAIL(trans_->check_trans_status(ObTableLoadTransStatusType::RUNNING,
1937+
ObTableLoadTransStatusType::FROZEN))) {
1938+
LOG_WARN("fail to check trans status", KR(ret));
1939+
} else if (OB_FAIL(trans_->get_bucket_writer()->write(session_id_, obj_rows_))) {
1940+
LOG_WARN("fail to write bucket pool", KR(ret));
19441941
}
19451942
return ret;
19461943
}
19471944
private:
19481945
ObTableLoadTableCtx * const ctx_;
19491946
ObTableLoadCoordinatorTrans * const trans_;
1950-
ObTableLoadTransBucketWriter * const bucket_writer_;
19511947
const int32_t session_id_;
19521948
ObTableLoadObjRowArray obj_rows_;
19531949
};
19541950

19551951
class ObTableLoadCoordinator::WriteTaskCallback : public ObITableLoadTaskCallback
19561952
{
19571953
public:
1958-
WriteTaskCallback(ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
1959-
ObTableLoadTransBucketWriter *bucket_writer)
1960-
: ctx_(ctx), trans_(trans), bucket_writer_(bucket_writer)
1954+
WriteTaskCallback(ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans)
1955+
: ctx_(ctx), trans_(trans)
19611956
{
19621957
ctx_->inc_ref_count();
19631958
trans_->inc_ref_count();
1964-
bucket_writer_->inc_ref_count();
19651959
}
19661960
virtual ~WriteTaskCallback()
19671961
{
1968-
trans_->put_bucket_writer(bucket_writer_);
19691962
ctx_->coordinator_ctx_->put_trans(trans_);
19701963
ObTableLoadService::put_ctx(ctx_);
19711964
}
@@ -1980,7 +1973,6 @@ class ObTableLoadCoordinator::WriteTaskCallback : public ObITableLoadTaskCallbac
19801973
private:
19811974
ObTableLoadTableCtx * const ctx_;
19821975
ObTableLoadCoordinatorTrans * const trans_;
1983-
ObTableLoadTransBucketWriter * const bucket_writer_; // To ensure that the reference to bucket_writer is reset only after receiving the result of this write operation
19841976
};
19851977

19861978
int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t session_id,
@@ -1995,15 +1987,10 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
19951987
} else {
19961988
LOG_DEBUG("coordinator write");
19971989
ObTableLoadCoordinatorTrans *trans = nullptr;
1998-
ObTableLoadTransBucketWriter *bucket_writer = nullptr;
19991990
ObTableLoadMutexGuard guard;
20001991
if (OB_FAIL(coordinator_ctx_->get_trans(trans_id, trans))) {
20011992
LOG_WARN("fail to get trans", KR(ret));
20021993
} else if (session_id == 0 && FALSE_IT(session_id = trans->get_default_session_id())) {
2003-
}
2004-
// retrieve bucket_writer
2005-
else if (OB_FAIL(trans->get_bucket_writer_for_write(bucket_writer))) {
2006-
LOG_WARN("fail to get bucket writer", KR(ret));
20071994
// } else if (OB_FAIL(bucket_writer->advance_sequence_no(session_id, sequence_no, guard))) {
20081995
// if (OB_UNLIKELY(OB_ENTRY_EXIST != ret)) {
20091996
// LOG_WARN("fail to advance sequence no", KR(ret), K(session_id));
@@ -2013,12 +2000,12 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
20132000
} else {
20142001
ObTableLoadTask *task = nullptr;
20152002
WriteTaskProcessor *processor = nullptr;
2016-
// 1. assign task
2003+
// 1. Alloc task
20172004
if (OB_FAIL(ctx_->alloc_task(task))) {
20182005
LOG_WARN("fail to alloc task", KR(ret));
20192006
}
20202007
// 2. Set processor
2021-
else if (OB_FAIL(task->set_processor<WriteTaskProcessor>(ctx_, trans, bucket_writer, session_id))) {
2008+
else if (OB_FAIL(task->set_processor<WriteTaskProcessor>(ctx_, trans, session_id))) {
20222009
LOG_WARN("fail to set write task processor", KR(ret));
20232010
} else if (OB_ISNULL(processor = dynamic_cast<WriteTaskProcessor *>(task->get_processor()))) {
20242011
ret = OB_ERR_UNEXPECTED;
@@ -2027,7 +2014,7 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
20272014
LOG_WARN("fail to set objs", KR(ret), K(coordinator_ctx_->idx_array_));
20282015
}
20292016
// 3. Set callback
2030-
else if (OB_FAIL(task->set_callback<WriteTaskCallback>(ctx_, trans, bucket_writer))) {
2017+
else if (OB_FAIL(task->set_callback<WriteTaskCallback>(ctx_, trans))) {
20312018
LOG_WARN("fail to set write task callback", KR(ret));
20322019
}
20332020
// 4. Put task into scheduler
@@ -2041,18 +2028,13 @@ int ObTableLoadCoordinator::write(const ObTableLoadTransId &trans_id, int32_t se
20412028
}
20422029
}
20432030
if (OB_NOT_NULL(trans)) {
2044-
if (OB_NOT_NULL(bucket_writer)) {
2045-
trans->put_bucket_writer(bucket_writer);
2046-
bucket_writer = nullptr;
2047-
}
20482031
coordinator_ctx_->put_trans(trans);
20492032
trans = nullptr;
20502033
}
20512034
}
20522035
return ret;
20532036
}
20542037

2055-
20562038
/**
20572039
* flush
20582040
*/
@@ -2061,20 +2043,17 @@ class ObTableLoadCoordinator::FlushTaskProcessor : public ObITableLoadTaskProces
20612043
{
20622044
public:
20632045
FlushTaskProcessor(ObTableLoadTask &task, ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
2064-
ObTableLoadTransBucketWriter *bucket_writer, int32_t session_id)
2046+
int32_t session_id)
20652047
: ObITableLoadTaskProcessor(task),
20662048
ctx_(ctx),
20672049
trans_(trans),
2068-
bucket_writer_(bucket_writer),
20692050
session_id_(session_id)
20702051
{
20712052
ctx_->inc_ref_count();
20722053
trans_->inc_ref_count();
2073-
bucket_writer_->inc_ref_count();
20742054
}
20752055
virtual ~FlushTaskProcessor()
20762056
{
2077-
trans_->put_bucket_writer(bucket_writer_);
20782057
ctx_->coordinator_ctx_->put_trans(trans_);
20792058
ObTableLoadService::put_ctx(ctx_);
20802059
}
@@ -2083,33 +2062,39 @@ class ObTableLoadCoordinator::FlushTaskProcessor : public ObITableLoadTaskProces
20832062
OB_TABLE_LOAD_STATISTICS_TIME_COST(INFO, coordinator_flush_time_us);
20842063
int ret = OB_SUCCESS;
20852064
if (OB_SUCC(trans_->check_trans_status(ObTableLoadTransStatusType::FROZEN))) {
2086-
if (OB_FAIL(bucket_writer_->flush(session_id_))) {
2065+
bool is_finished = false;
2066+
if (OB_FAIL(trans_->get_bucket_writer()->flush(session_id_, is_finished))) {
20872067
LOG_WARN("fail to flush bucket", KR(ret));
2068+
} else if (!is_finished) {
2069+
// do nothing
2070+
} else {
2071+
ObTableLoadCoordinator coordinator(ctx_);
2072+
if (OB_FAIL(coordinator.init())) {
2073+
LOG_WARN("fail to init coordinator", KR(ret));
2074+
} else if (OB_FAIL(coordinator.finish_trans_peers(trans_))) {
2075+
LOG_WARN("fail to finish trans peers", KR(ret));
2076+
}
20882077
}
20892078
}
20902079
return ret;
20912080
}
20922081
private:
20932082
ObTableLoadTableCtx * const ctx_;
20942083
ObTableLoadCoordinatorTrans * const trans_;
2095-
ObTableLoadTransBucketWriter * const bucket_writer_;
20962084
const int32_t session_id_;
20972085
};
20982086

20992087
class ObTableLoadCoordinator::FlushTaskCallback : public ObITableLoadTaskCallback
21002088
{
21012089
public:
2102-
FlushTaskCallback(ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans,
2103-
ObTableLoadTransBucketWriter *bucket_writer)
2104-
: ctx_(ctx), trans_(trans), bucket_writer_(bucket_writer)
2090+
FlushTaskCallback(ObTableLoadTableCtx *ctx, ObTableLoadCoordinatorTrans *trans)
2091+
: ctx_(ctx), trans_(trans)
21052092
{
21062093
ctx_->inc_ref_count();
21072094
trans_->inc_ref_count();
2108-
bucket_writer_->inc_ref_count();
21092095
}
21102096
virtual ~FlushTaskCallback()
21112097
{
2112-
trans_->put_bucket_writer(bucket_writer_);
21132098
ctx_->coordinator_ctx_->put_trans(trans_);
21142099
ObTableLoadService::put_ctx(ctx_);
21152100
}
@@ -2125,7 +2110,6 @@ class ObTableLoadCoordinator::FlushTaskCallback : public ObITableLoadTaskCallbac
21252110
private:
21262111
ObTableLoadTableCtx * const ctx_;
21272112
ObTableLoadCoordinatorTrans * const trans_;
2128-
ObTableLoadTransBucketWriter * const bucket_writer_; // To ensure that the reference to bucket_writer is reset only after receiving the result of this write operation
21292113
};
21302114

21312115
int ObTableLoadCoordinator::flush(ObTableLoadCoordinatorTrans *trans)
@@ -2136,26 +2120,21 @@ int ObTableLoadCoordinator::flush(ObTableLoadCoordinatorTrans *trans)
21362120
LOG_WARN("ObTableLoadCoordinator not init", KR(ret), KP(this));
21372121
} else {
21382122
LOG_DEBUG("coordinator flush");
2139-
ObTableLoadTransBucketWriter *bucket_writer = nullptr;
2140-
// retrieve bucket_writer
2141-
if (OB_FAIL(trans->get_bucket_writer_for_flush(bucket_writer))) {
2142-
LOG_WARN("fail to get bucket writer", KR(ret));
2143-
} else if (OB_FAIL(trans->set_trans_status_frozen())) {
2123+
if (OB_FAIL(trans->set_trans_status_frozen())) {
21442124
LOG_WARN("fail to freeze trans", KR(ret));
21452125
} else {
21462126
for (int32_t session_id = 1; OB_SUCC(ret) && session_id <= param_.write_session_count_; ++session_id) {
21472127
ObTableLoadTask *task = nullptr;
2148-
// 1. assign task
2128+
// 1. Alloc task
21492129
if (OB_FAIL(ctx_->alloc_task(task))) {
21502130
LOG_WARN("fail to alloc task", KR(ret));
21512131
}
21522132
// 2. Set processor
2153-
else if (OB_FAIL(task->set_processor<FlushTaskProcessor>(ctx_, trans, bucket_writer,
2154-
session_id))) {
2133+
else if (OB_FAIL(task->set_processor<FlushTaskProcessor>(ctx_, trans, session_id))) {
21552134
LOG_WARN("fail to set flush task processor", KR(ret));
21562135
}
21572136
// 3. Set callback
2158-
else if (OB_FAIL(task->set_callback<FlushTaskCallback>(ctx_, trans, bucket_writer))) {
2137+
else if (OB_FAIL(task->set_callback<FlushTaskCallback>(ctx_, trans))) {
21592138
LOG_WARN("fail to set flush task callback", KR(ret));
21602139
}
21612140
// 4. put task into scheduler
@@ -2169,10 +2148,6 @@ int ObTableLoadCoordinator::flush(ObTableLoadCoordinatorTrans *trans)
21692148
}
21702149
}
21712150
}
2172-
if (OB_NOT_NULL(bucket_writer)) {
2173-
trans->put_bucket_writer(bucket_writer);
2174-
bucket_writer = nullptr;
2175-
}
21762151
}
21772152
return ret;
21782153
}

src/observer/table_load/ob_table_load_coordinator_ctx.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ int ObTableLoadCoordinatorCtx::alloc_trans_ctx(const ObTableLoadTransId &trans_i
285285
int ret = OB_SUCCESS;
286286
trans_ctx = nullptr;
287287
// allocate trans_ctx
288-
if (OB_ISNULL(trans_ctx = ctx_->alloc_trans_ctx(trans_id))) {
288+
if (OB_ISNULL(trans_ctx = ctx_->alloc_trans_ctx(ObTableLoadTransCtx::COORDINATOR, trans_id))) {
289289
ret = OB_ALLOCATE_MEMORY_FAILED;
290290
LOG_WARN("fail to alloc trans ctx", KR(ret), K(trans_id));
291291
}

src/observer/table_load/ob_table_load_coordinator_trans.cpp

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -106,93 +106,5 @@ int ObTableLoadCoordinatorTrans::set_trans_status_abort()
106106
return ret;
107107
}
108108

109-
int ObTableLoadCoordinatorTrans::get_bucket_writer_for_write(
110-
ObTableLoadTransBucketWriter *&bucket_writer) const
111-
{
112-
int ret = OB_SUCCESS;
113-
bucket_writer = nullptr;
114-
if (IS_NOT_INIT) {
115-
ret = OB_NOT_INIT;
116-
LOG_WARN("ObTableLoadCoordinatorTrans not init", KR(ret), KP(this));
117-
} else if (OB_FAIL(check_trans_status(ObTableLoadTransStatusType::RUNNING))) {
118-
LOG_WARN("fail to check trans status", KR(ret));
119-
} else {
120-
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
121-
if (OB_ISNULL(trans_bucket_writer_)) {
122-
ret = OB_ERR_UNEXPECTED;
123-
LOG_WARN("unexpected null bucket writer", KR(ret));
124-
} else if (OB_UNLIKELY(trans_bucket_writer_->is_flush())) {
125-
ret = OB_ERR_UNEXPECTED;
126-
LOG_WARN("trans bucket writer has flush", KR(ret));
127-
} else {
128-
bucket_writer = trans_bucket_writer_;
129-
bucket_writer->inc_ref_count();
130-
}
131-
}
132-
return ret;
133-
}
134-
135-
int ObTableLoadCoordinatorTrans::get_bucket_writer_for_flush(
136-
ObTableLoadTransBucketWriter *&bucket_writer) const
137-
{
138-
int ret = OB_SUCCESS;
139-
bucket_writer = nullptr;
140-
if (IS_NOT_INIT) {
141-
ret = OB_NOT_INIT;
142-
LOG_WARN("ObTableLoadCoordinatorTrans not init", KR(ret), KP(this));
143-
} else {
144-
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
145-
if (OB_ISNULL(trans_bucket_writer_)) {
146-
ret = OB_ERR_UNEXPECTED;
147-
LOG_WARN("unexpected null bucket writer", KR(ret));
148-
} else if (OB_UNLIKELY(trans_bucket_writer_->is_flush())) {
149-
ret = OB_ERR_UNEXPECTED;
150-
LOG_WARN("trans bucket writer has flush", KR(ret));
151-
} else {
152-
trans_bucket_writer_->set_is_flush();
153-
bucket_writer = trans_bucket_writer_;
154-
bucket_writer->inc_ref_count();
155-
}
156-
}
157-
return ret;
158-
}
159-
160-
void ObTableLoadCoordinatorTrans::put_bucket_writer(ObTableLoadTransBucketWriter *bucket_writer)
161-
{
162-
int ret = OB_SUCCESS;
163-
if (IS_NOT_INIT) {
164-
ret = OB_NOT_INIT;
165-
LOG_WARN("ObTableLoadCoordinatorTrans not init", KR(ret));
166-
} else if (OB_ISNULL(bucket_writer)) {
167-
ret = OB_INVALID_ARGUMENT;
168-
LOG_WARN("invalid null bucket writer", KR(ret));
169-
} else {
170-
obsys::ObRLockGuard guard(trans_ctx_->rwlock_);
171-
OB_ASSERT(trans_bucket_writer_ == bucket_writer);
172-
}
173-
if (OB_SUCC(ret)) {
174-
if (0 == bucket_writer->dec_ref_count() && OB_FAIL(handle_write_done())) {
175-
LOG_WARN("fail to handle coordinator write done", KR(ret));
176-
}
177-
}
178-
if (OB_FAIL(ret)) {
179-
set_trans_status_error(ret);
180-
}
181-
}
182-
183-
int ObTableLoadCoordinatorTrans::handle_write_done()
184-
{
185-
int ret = OB_SUCCESS;
186-
if (ObTableLoadTransStatusType::FROZEN == trans_ctx_->get_trans_status()) {
187-
ObTableLoadCoordinator coordinator(trans_ctx_->ctx_);
188-
if (OB_FAIL(coordinator.init())) {
189-
LOG_WARN("fail to init coordinator", KR(ret));
190-
} else if (OB_FAIL(coordinator.finish_trans_peers(this))) {
191-
LOG_WARN("fail to finish trans peers", KR(ret));
192-
}
193-
}
194-
return ret;
195-
}
196-
197109
} // namespace observer
198110
} // namespace oceanbase

0 commit comments

Comments
 (0)