ddl: initial support for parallel DDL#6955
Conversation
| if err != nil { | ||
| return nil, errors.Trace(err) | ||
| } | ||
| return append(generalJobs, addIdxJobs...), nil |
There was a problem hiding this comment.
The return jobs may be not sorted by job ID, should we return sorted jobs? Because the older function naturally return a sorted jobs
There was a problem hiding this comment.
I think it can be done in the next PR. I added a "TODO" now.
ddl/ddl.go
Outdated
| // TODO: Add the type of DDL worker. | ||
| metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc() | ||
|
|
||
| // For every start, we will send a fake job to let worker |
There was a problem hiding this comment.
It's original comment. I think it means "For each call to the start function".
ddl/ddl.go
Outdated
| } | ||
|
|
||
| func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) { | ||
| // If the workes don't run, we needn't to notice workers. |
| return errors.Trace(err) | ||
| } | ||
|
|
||
| func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { |
There was a problem hiding this comment.
Will there be multiple job DDL dependencies?
There was a problem hiding this comment.
We only record the maximum job ID in multiple dependent jobs.
|
/run-all-tests |
1 similar comment
|
/run-all-tests |
|
/run-common-test |
|
/run-common-test |
ddl/ddl.go
Outdated
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
ddlJobCh used to be member of ddl, but now, we have 2 kinds of workers, we need two ddlJobCh for every kind of worker. So move ddlJobCh from ddl to worker
util/admin/admin.go
Outdated
There was a problem hiding this comment.
Do not need to mention the error.
There was a problem hiding this comment.
Other functions also mention the error.
There was a problem hiding this comment.
This comment provides nothing more than the function name.
ddl/ddl.go
Outdated
There was a problem hiding this comment.
Worker id is always 0 ?
There was a problem hiding this comment.
Worker id is always 0 ?
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
Why it need to be make(chan struct{}, 1 ) rather than make(chan struct{})
There was a problem hiding this comment.
We need to push info to the channel.
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
%s to print time? what's the result looks like
There was a problem hiding this comment.
why not print worker type?
There was a problem hiding this comment.
It's the old code, I will handle it.
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
It's hard to understand here.
There was a problem hiding this comment.
It's also hard to understand for me.
There was a problem hiding this comment.
I don't think add comment helps @shenli
meta.Meta should not store the information about queue key.
If the caller has to modify status in meta.Meta, before calling its method, why not provide that status as argument?
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
I suggest rename GetAllDDLJobs to GetDDLJobsInQueue and pass the queue, rather than change meta's jobListKey status, it's very tricky.
There was a problem hiding this comment.
GetAllDDLJobs definitely get jobs from mDDLJobListKey? Is that correct?
There was a problem hiding this comment.
mDDLJobListKey will change between two queue, I don't know which one, that's the problem.
So I suggest:
GetDDLJobsInQueue(general)
GetDDLJobsInQueue(addindex)
There was a problem hiding this comment.
Please address comment
There was a problem hiding this comment.
Done, I changed the name of the function.
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
Will t.GetHistoryDDLJob select the right job queue?
There was a problem hiding this comment.
There was a problem hiding this comment.
It is better to find the job in the waiting job list. Because the history job list maybe long.
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
There will be read/write conflict in GetALLDDLJobs, because each worker check the other worker's queue to dependence.
Is the error retryable and properly handled?
There was a problem hiding this comment.
It will be retried.
ddl/ddl_worker_test.go
Outdated
There was a problem hiding this comment.
s/for use by other tests/for other tests
ddl/ddl_worker_test.go
Outdated
There was a problem hiding this comment.
I find a strange char here
util/admin/admin.go
Outdated
There was a problem hiding this comment.
Add an else branch so that the code is more robust without the assumption about job.Type default value.
ddl/ddl.go
Outdated
ddl/ddl_test.go
Outdated
ddl/ddl_worker.go
Outdated
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
How about "current DDL job %v depends on job %v"?
ddl/ddl_worker.go
Outdated
There was a problem hiding this comment.
If waitDependencyJob is true, it is not an error.
There was a problem hiding this comment.
Yes, but we'd better wait a moment. I will add a comment for it.
|
/run-all-tests |
| store: store, | ||
| lease: lease, | ||
| ddlJobCh: make(chan struct{}, 1), | ||
| ddlJobDoneCh: make(chan struct{}, 1), |
There was a problem hiding this comment.
Could we move the ddlJobDoneCh to the different worker?
ddl/ddl_worker.go
Outdated
| if historyJob == nil { | ||
| return false, nil | ||
| } | ||
| log.Infof("[ddl] DDL job %v isn't dependent on job ID %d", job, job.DependencyID) |
There was a problem hiding this comment.
DDL job %v isn't dependent on job ID %d ? What about DDL job %v dependent job ID %d is finished ?
| } | ||
| err = t.UpdateDDLJob(int64(j), job, true) | ||
| if job.Type == model.ActionAddIndex { | ||
| err = t.UpdateDDLJob(int64(j), job, true, meta.AddIndexJobListKey) |
There was a problem hiding this comment.
Why not just new a meta with meta.AddIndexJobListKey?
There was a problem hiding this comment.
We have a meta here, I think it's OK.
There was a problem hiding this comment.
I mean we can use a new meta to avoid add a param to the function.
ddl/ddl.go
Outdated
| @@ -413,11 +411,9 @@ func (d *ddl) genGlobalID() (int64, error) { | |||
|
|
|||
| // generalWorker returns the first worker. The ddl structure has only one worker before we implement the parallel worker. | |||
|
@zimulala Please resolve the conflicts. |
ddl/ddl_worker.go
Outdated
| } | ||
|
|
||
| func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) { | ||
| if job.DependencyID == 0 { |
There was a problem hiding this comment.
Please create a constant for 0 or add comment for the if statement.
| return true, nil | ||
| } | ||
|
|
||
| func newMetaWithQueueTp(txn kv.Transaction, tp string) *meta.Meta { |
meta/meta.go
Outdated
| // NewMeta creates a Meta in transaction txn. | ||
| func NewMeta(txn kv.Transaction) *Meta { | ||
| // If the current Meta needs to handle a job, jobListKey is the type of the job's list. | ||
| // We don't change the value of the jobListKey in a Meta. |
ddl/db_change_test.go
Outdated
| kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { | ||
| m := meta.NewMeta(txn) | ||
| // Get the number of jobs from the adding index queue. | ||
| addIdxLen, err1 := m.DDLJobQueueLen(meta.AddIndexJobListKey) |
There was a problem hiding this comment.
Can we use GetDDLJobs and get the length of the return value?
| job.Query, _ = ctx.Value(sessionctx.QueryString).(string) | ||
| err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { | ||
| t := meta.NewMeta(txn) | ||
| t := newMetaWithQueueTp(txn, job.Type.String()) |
There was a problem hiding this comment.
Why not use meta.AddIndexJobListKey as the second parameter?
There was a problem hiding this comment.
The job may not be add index, So has to according to the job type to create meta?
There was a problem hiding this comment.
Almost. I want to put the check of job type or worker type into newMetaWithQueueTp.
| // No job now, return and retry getting later. | ||
| return nil | ||
| } | ||
| w.waitDependencyJobFinished(job, &waitDependencyJobCnt) |
There was a problem hiding this comment.
If its dependencyJob is not done yet, it would return at line 357. So why we need to wait here?
There was a problem hiding this comment.
line 357 return is in a txn func, not return in handleDDLJobQueue func
There was a problem hiding this comment.
As @crazycs520 said. And if put it in line357, we need wait for 200ms. I am afraid this txn is easy to conflict. So I put it here.
| // NewMeta creates a Meta in transaction txn. | ||
| func NewMeta(txn kv.Transaction) *Meta { | ||
| // If the current Meta needs to handle a job, jobListKey is the type of the job's list. | ||
| func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta { |
There was a problem hiding this comment.
Can we always specify the JobListKey?
There was a problem hiding this comment.
A lot of places use this function, so I use this method to handle it.
And In other packages, I think we needn't distinguish the type of jobListKeys.
|
/run-all-tests |
|
/run-common-test |
|
/run-common-test -tidb-test=pr/592 |
What have you changed? (mandatory)
ddl: remove
cleanAddIndexQueueJobsand initial support for parallel DDL.Initial support for parallel DDL is as follows:
The DDL of "add index" and the other types of DDL can be executed parallelly when they are on the different tables. We use two queues to save the "add index" and other DDLs in storage. And we have two workers to handle these DDL jobs. The "add index" worker handles the "add index" queue. Another worker handles another queue.
If the DDL of "add index" and the other types of DDL are on the same table, we need to perform these two operations serially.
What are the type of the changes (mandatory)?
The currently defined types are listed below, please pick one of the types for this PR by removing the others:
How has this PR been tested (mandatory)?
unit test
Does this PR affect documentation (docs/docs-cn) update? (optional)
Yes.