Skip to content

Commit bcdf60a

Browse files
committed
[Reporting] ReportingStore module (#69426)
* Add store class * fix tests * fix the createIndex bug * add reportingstore test * change function args * nits * add test for automatic index creation failure recovery # Conflicts: # x-pack/plugins/reporting/server/lib/esqueue/job.js
1 parent 35fde53 commit bcdf60a

21 files changed

Lines changed: 665 additions & 841 deletions

File tree

x-pack/plugins/reporting/server/core.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { screenshotsObservableFactory } from './export_types/common/lib/screensh
2424
import { checkLicense, getExportTypesRegistry } from './lib';
2525
import { ESQueueInstance } from './lib/create_queue';
2626
import { EnqueueJobFn } from './lib/enqueue_job';
27+
import { ReportingStore } from './lib/store';
2728

2829
export interface ReportingInternalSetup {
2930
elasticsearch: ElasticsearchServiceSetup;
@@ -37,6 +38,7 @@ export interface ReportingInternalStart {
3738
browserDriverFactory: HeadlessChromiumDriverFactory;
3839
enqueueJob: EnqueueJobFn;
3940
esqueue: ESQueueInstance;
41+
store: ReportingStore;
4042
savedObjects: SavedObjectsServiceStart;
4143
uiSettings: UiSettingsServiceStart;
4244
}

x-pack/plugins/reporting/server/lib/create_queue.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,16 @@ import { ReportingCore } from '../core';
88
import { JobSource, TaskRunResult } from '../types';
99
import { createTaggedLogger } from './create_tagged_logger'; // TODO remove createTaggedLogger once esqueue is removed
1010
import { createWorkerFactory } from './create_worker';
11-
import { Job } from './enqueue_job';
1211
// @ts-ignore
1312
import { Esqueue } from './esqueue';
1413
import { LevelLogger } from './level_logger';
14+
import { ReportingStore } from './store';
1515

1616
interface ESQueueWorker {
1717
on: (event: string, handler: any) => void;
1818
}
1919

2020
export interface ESQueueInstance {
21-
addJob: (type: string, payload: unknown, options: object) => Job;
2221
registerWorker: <JobParamsType>(
2322
pluginId: string,
2423
workerFn: GenericWorkerFn<JobParamsType>,
@@ -37,26 +36,25 @@ type GenericWorkerFn<JobParamsType> = (
3736
...workerRestArgs: any[]
3837
) => void | Promise<TaskRunResult>;
3938

40-
export async function createQueueFactory<JobParamsType, JobPayloadType>(
39+
export async function createQueueFactory(
4140
reporting: ReportingCore,
41+
store: ReportingStore,
4242
logger: LevelLogger
4343
): Promise<ESQueueInstance> {
4444
const config = reporting.getConfig();
45-
const queueIndexInterval = config.get('queue', 'indexInterval');
45+
46+
// esqueue-related
4647
const queueTimeout = config.get('queue', 'timeout');
47-
const queueIndex = config.get('index');
4848
const isPollingEnabled = config.get('queue', 'pollEnabled');
4949

50-
const elasticsearch = await reporting.getElasticsearchService();
50+
const elasticsearch = reporting.getElasticsearchService();
5151
const queueOptions = {
52-
interval: queueIndexInterval,
5352
timeout: queueTimeout,
54-
dateSeparator: '.',
5553
client: elasticsearch.legacy.client,
5654
logger: createTaggedLogger(logger, ['esqueue', 'queue-worker']),
5755
};
5856

59-
const queue: ESQueueInstance = new Esqueue(queueIndex, queueOptions);
57+
const queue: ESQueueInstance = new Esqueue(store, queueOptions);
6058

6159
if (isPollingEnabled) {
6260
// create workers to poll the index for idle jobs waiting to be claimed and executed

x-pack/plugins/reporting/server/lib/enqueue_job.ts

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,24 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
import { EventEmitter } from 'events';
87
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
98
import { AuthenticatedUser } from '../../../security/server';
109
import { ESQueueCreateJobFn } from '../../server/types';
1110
import { ReportingCore } from '../core';
12-
// @ts-ignore
13-
import { events as esqueueEvents } from './esqueue';
14-
import { LevelLogger } from './level_logger';
11+
import { LevelLogger } from './';
12+
import { ReportingStore, Report } from './store';
1513

16-
interface ConfirmedJob {
17-
id: string;
18-
index: string;
19-
_seq_no: number;
20-
_primary_term: number;
21-
}
22-
23-
export type Job = EventEmitter & {
24-
id: string;
25-
toJSON: () => {
26-
id: string;
27-
};
28-
};
29-
30-
export type EnqueueJobFn = <JobParamsType>(
14+
export type EnqueueJobFn = (
3115
exportTypeId: string,
32-
jobParams: JobParamsType,
16+
jobParams: unknown,
3317
user: AuthenticatedUser | null,
3418
context: RequestHandlerContext,
3519
request: KibanaRequest
36-
) => Promise<Job>;
20+
) => Promise<Report>;
3721

3822
export function enqueueJobFactory(
3923
reporting: ReportingCore,
24+
store: ReportingStore,
4025
parentLogger: LevelLogger
4126
): EnqueueJobFn {
4227
const config = reporting.getConfig();
@@ -45,16 +30,16 @@ export function enqueueJobFactory(
4530
const maxAttempts = config.get('capture', 'maxAttempts');
4631
const logger = parentLogger.clone(['queue-job']);
4732

48-
return async function enqueueJob<JobParamsType>(
33+
return async function enqueueJob(
4934
exportTypeId: string,
50-
jobParams: JobParamsType,
35+
jobParams: unknown,
5136
user: AuthenticatedUser | null,
5237
context: RequestHandlerContext,
5338
request: KibanaRequest
54-
): Promise<Job> {
55-
type ScheduleTaskFnType = ESQueueCreateJobFn<JobParamsType>;
39+
) {
40+
type ScheduleTaskFnType = ESQueueCreateJobFn<unknown>;
41+
5642
const username = user ? user.username : false;
57-
const esqueue = await reporting.getEsqueue();
5843
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);
5944

6045
if (exportType == null) {
@@ -71,16 +56,6 @@ export function enqueueJobFactory(
7156
max_attempts: maxAttempts,
7257
};
7358

74-
return new Promise((resolve, reject) => {
75-
const job = esqueue.addJob(exportType.jobType, payload, options);
76-
77-
job.on(esqueueEvents.EVENT_JOB_CREATED, (createdJob: ConfirmedJob) => {
78-
if (createdJob.id === job.id) {
79-
logger.info(`Successfully queued job: ${createdJob.id}`);
80-
resolve(job);
81-
}
82-
});
83-
job.on(esqueueEvents.EVENT_JOB_CREATE_ERROR, reject);
84-
});
59+
return await store.addReport(exportType.jobType, payload, options);
8560
};
8661
}

x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/create_index.js

Lines changed: 0 additions & 100 deletions
This file was deleted.

x-pack/plugins/reporting/server/lib/esqueue/__tests__/helpers/index_timestamp.js

Lines changed: 0 additions & 93 deletions
This file was deleted.

0 commit comments

Comments
 (0)