Skip to content

Commit 3ed64ed

Browse files
committed
refactor(task-manager): made TM constructor a little more legible
1 parent 418b946 commit 3ed64ed

3 files changed

Lines changed: 39 additions & 34 deletions

File tree

x-pack/legacy/plugins/task_manager/lib/fill_pool.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ describe('fillPool', () => {
2020
const run = sinon.spy(async () => TaskPoolRunResult.RunningAllClaimedTasks);
2121
const converter = _.identity;
2222

23-
await fillPool(run, fetchAvailableTasks, converter);
23+
await fillPool(fetchAvailableTasks, converter, run);
2424

2525
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3, 4, 5]);
2626
});
@@ -35,7 +35,7 @@ describe('fillPool', () => {
3535
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
3636
const converter = _.identity;
3737

38-
await fillPool(run, fetchAvailableTasks, converter);
38+
await fillPool(fetchAvailableTasks, converter, run);
3939

4040
expect(_.flattenDeep(run.args)).toEqual([1, 2, 3]);
4141
});
@@ -50,7 +50,7 @@ describe('fillPool', () => {
5050
const run = sinon.spy(async () => TaskPoolRunResult.RanOutOfCapacity);
5151
const converter = (x: number) => x.toString();
5252

53-
await fillPool(run, fetchAvailableTasks, converter);
53+
await fillPool(fetchAvailableTasks, converter, run);
5454

5555
expect(_.flattenDeep(run.args)).toEqual(['1', '2', '3']);
5656
});
@@ -63,7 +63,7 @@ describe('fillPool', () => {
6363
try {
6464
const fetchAvailableTasks = async () => Promise.reject('fetch is not working');
6565

66-
await fillPool(run, fetchAvailableTasks, converter);
66+
await fillPool(fetchAvailableTasks, converter, run);
6767
} catch (err) {
6868
expect(err.toString()).toBe('fetch is not working');
6969
expect(run.called).toBe(false);
@@ -82,7 +82,7 @@ describe('fillPool', () => {
8282
let index = 0;
8383
const fetchAvailableTasks = async () => tasks[index++] || [];
8484

85-
await fillPool(run, fetchAvailableTasks, converter);
85+
await fillPool(fetchAvailableTasks, converter, run);
8686
} catch (err) {
8787
expect(err.toString()).toBe('run is not working');
8888
}
@@ -101,7 +101,7 @@ describe('fillPool', () => {
101101
throw new Error(`can not convert ${x}`);
102102
};
103103

104-
await fillPool(run, fetchAvailableTasks, converter);
104+
await fillPool(fetchAvailableTasks, converter, run);
105105
} catch (err) {
106106
expect(err.toString()).toBe('Error: can not convert 1');
107107
}

x-pack/legacy/plugins/task_manager/lib/fill_pool.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ type Converter<T1, T2> = (t: T1) => T2;
2929
* @param converter - a function that converts task records to the appropriate task runner
3030
*/
3131
export async function fillPool<TRecord, TRunner>(
32-
run: BatchRun<TRunner>,
3332
fetchAvailableTasks: Fetcher<TRecord>,
34-
converter: Converter<TRecord, TRunner>
33+
converter: Converter<TRecord, TRunner>,
34+
run: BatchRun<TRunner>
3535
): Promise<FillPoolResult> {
3636
performance.mark('fillPool.start');
3737
while (true) {

x-pack/legacy/plugins/task_manager/task_manager.ts

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export class TaskManager {
9090
this.logger.info(`TaskManager is identified by the Kibana UUID: ${taskManagerId}`);
9191
}
9292

93-
const store = new TaskStore({
93+
this.store = new TaskStore({
9494
serializer: opts.serializer,
9595
savedObjectsRepository: opts.savedObjectsRepository,
9696
callCluster: opts.callWithInternalUser,
@@ -100,38 +100,43 @@ export class TaskManager {
100100
taskManagerId: `kibana:${taskManagerId}`,
101101
});
102102

103-
const pool = new TaskPool({
103+
this.pool = new TaskPool({
104104
logger: this.logger,
105105
maxWorkers: this.maxWorkers,
106106
});
107-
const createRunner = (instance: ConcreteTaskInstance) =>
108-
new TaskManagerRunner({
109-
logger: this.logger,
110-
instance,
111-
store,
112-
definitions: this.definitions,
113-
beforeRun: this.middleware.beforeRun,
114-
beforeMarkRunning: this.middleware.beforeMarkRunning,
115-
});
116-
const poller = new TaskPoller<FillPoolResult>({
107+
108+
this.poller = new TaskPoller<FillPoolResult>({
117109
logger: this.logger,
118110
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
119-
work: (): Promise<FillPoolResult> =>
120-
fillPool(
121-
async tasks => await pool.run(tasks),
122-
() =>
123-
claimAvailableTasks(
124-
this.store.claimAvailableTasks.bind(this.store),
125-
this.pool.availableWorkers,
126-
this.logger
127-
),
128-
createRunner
129-
),
111+
work: this.work.bind(this),
130112
});
113+
}
131114

132-
this.pool = pool;
133-
this.store = store;
134-
this.poller = poller;
115+
private work(): Promise<FillPoolResult> {
116+
return fillPool(
117+
// claim available tasks
118+
() =>
119+
claimAvailableTasks(
120+
this.store.claimAvailableTasks.bind(this.store),
121+
this.pool.availableWorkers,
122+
this.logger
123+
),
124+
// wrap each task in a Task Runner
125+
this.createTaskRunnerForTask.bind(this),
126+
// place tasks in the Task Pool
127+
async tasks => await this.pool.run(tasks)
128+
);
129+
}
130+
131+
private createTaskRunnerForTask(instance: ConcreteTaskInstance) {
132+
return new TaskManagerRunner({
133+
logger: this.logger,
134+
instance,
135+
store: this.store,
136+
definitions: this.definitions,
137+
beforeRun: this.middleware.beforeRun,
138+
beforeMarkRunning: this.middleware.beforeMarkRunning,
139+
});
135140
}
136141

137142
/**

0 commit comments

Comments
 (0)