Skip to content

Commit b8fafa0

Browse files
address review: use data clients, thin route, split modules
1 parent 014ecff commit b8fafa0

22 files changed

Lines changed: 1356 additions & 869 deletions

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/entity_store/entity_store_data_client.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import type {
1818
} from '@kbn/core/server';
1919
import type { SecurityPluginStart } from '@kbn/security-plugin/server';
2020
import { EntityClient } from '@kbn/entityManager-plugin/server/lib/entity_client';
21-
import type { HealthStatus, SortOrder } from '@elastic/elasticsearch/lib/api/types';
21+
import type { FieldValue, HealthStatus, SortOrder } from '@elastic/elasticsearch/lib/api/types';
2222
import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
2323
import type { DataViewsService } from '@kbn/data-views-plugin/common';
2424
import { isEqual } from 'lodash/fp';
@@ -28,6 +28,7 @@ import type { EntityStoreCapability, EntityDefinition } from '@kbn/entities-sche
2828
import type { estypes } from '@elastic/elasticsearch';
2929
import { SO_ENTITY_DEFINITION_TYPE } from '@kbn/entityManager-plugin/server/saved_objects';
3030
import { SECURITY_SOLUTION_ENABLE_ASSET_INVENTORY_SETTING } from '@kbn/management-settings-ids';
31+
import { getLatestEntitiesIndexName } from '@kbn/entity-store/server';
3132
import { RISK_SCORE_INDEX_PATTERN } from '../../../../common/constants';
3233
import {
3334
ENTITY_STORE_INDEX_PATTERN,
@@ -946,6 +947,48 @@ export class EntityStoreDataClient {
946947
return { records, total, inspect };
947948
}
948949

950+
/**
951+
* Fetch all entities from the V2 unified latest index with search_after pagination.
952+
* Suitable for batch processing pipelines that need the full entity population.
953+
*/
954+
public async fetchAllUnifiedLatestEntities(params?: {
955+
sourceFields?: string[];
956+
pageSize?: number;
957+
}): Promise<Entity[]> {
958+
const { namespace, logger } = this.options;
959+
const index = getLatestEntitiesIndexName(namespace);
960+
const size = params?.pageSize ?? 1000;
961+
const results: Entity[] = [];
962+
let searchAfter: FieldValue[] | undefined;
963+
964+
while (true) {
965+
try {
966+
const resp = await this.esClient.search<Entity>({
967+
index,
968+
size,
969+
ignore_unavailable: true,
970+
...(params?.sourceFields ? { _source: params.sourceFields } : {}),
971+
sort: [{ '@timestamp': { order: 'desc' as const } }],
972+
...(searchAfter ? { search_after: searchAfter } : {}),
973+
query: { match_all: {} },
974+
});
975+
976+
const { hits } = resp.hits;
977+
for (const hit of hits) {
978+
if (hit._source) results.push(hit._source);
979+
}
980+
981+
if (hits.length < size) break;
982+
searchAfter = hits[hits.length - 1].sort as FieldValue[];
983+
} catch (error) {
984+
logger.warn(`[EntityStoreDataClient] Failed to fetch entities from "${index}": ${error}`);
985+
break;
986+
}
987+
}
988+
989+
return results;
990+
}
991+
949992
public async applyDataViewIndices(): Promise<{
950993
successes: EngineDataviewUpdateResult[];
951994
errors: Error[];

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/lead_generation/engine/lead_generation_engine.ts

Lines changed: 33 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,13 @@ import type {
1212
Lead,
1313
LeadEntity,
1414
LeadGenerationEngineConfig,
15-
LeadStaleness,
1615
Observation,
1716
ObservationModule,
1817
} from '../types';
19-
import { DEFAULT_ENGINE_CONFIG } from '../types';
18+
import { DEFAULT_ENGINE_CONFIG, computeStaleness } from '../types';
19+
import { entityToKey } from '../observation_modules/utils';
2020
import { llmSynthesizeLeadContent } from './llm_synthesize';
2121

22-
// ---------------------------------------------------------------------------
23-
// Engine
24-
// ---------------------------------------------------------------------------
25-
2622
interface LeadGenerationEngineDeps {
2723
readonly logger: Logger;
2824
readonly config?: Partial<LeadGenerationEngineConfig>;
@@ -67,21 +63,21 @@ export const createLeadGenerationEngine = ({
6763
const collectStart = Date.now();
6864
const observations = await collectAllObservations(modules, entities, logger);
6965
const collectMs = Date.now() - collectStart;
70-
logger.info(
71-
`[LeadGenerationEngine][Telemetry] Observation collection: ${collectMs}ms (${observations.length} observations from ${modules.length} modules)`
66+
logger.debug(
67+
`[LeadGenerationEngine] Observation collection: ${collectMs}ms (${observations.length} observations from ${modules.length} modules)`
7268
);
7369

7470
if (observations.length === 0) {
75-
logger.info('[LeadGenerationEngine] No observations collected - no leads to generate');
71+
logger.debug('[LeadGenerationEngine] No observations collected - no leads to generate');
7672
return [];
7773
}
7874

7975
// 2. Score entities based on their observations
8076
const scoreStart = Date.now();
8177
const scoredEntities = scoreEntities(observations, entities, config);
8278
const scoreMs = Date.now() - scoreStart;
83-
logger.info(
84-
`[LeadGenerationEngine][Telemetry] Entity scoring: ${scoreMs}ms (${scoredEntities.length} entities scored)`
79+
logger.debug(
80+
`[LeadGenerationEngine] Entity scoring: ${scoreMs}ms (${scoredEntities.length} entities scored)`
8581
);
8682

8783
// 3. Filter entities below threshold
@@ -90,32 +86,28 @@ export const createLeadGenerationEngine = ({
9086
);
9187

9288
if (qualifyingEntities.length === 0) {
93-
logger.info('[LeadGenerationEngine] No entities met the threshold - no leads to generate');
89+
logger.debug('[LeadGenerationEngine] No entities met the threshold - no leads to generate');
9490
return [];
9591
}
9692

9793
// 4. Group related entities into leads
9894
const groupStart = Date.now();
9995
const leads = await groupIntoLeads(qualifyingEntities, config, logger, options?.chatModel);
10096
const groupMs = Date.now() - groupStart;
101-
logger.info(
102-
`[LeadGenerationEngine][Telemetry] Lead grouping & synthesis: ${groupMs}ms (${leads.length} leads)`
97+
logger.debug(
98+
`[LeadGenerationEngine] Lead grouping & synthesis: ${groupMs}ms (${leads.length} leads)`
10399
);
104100

105101
const totalMs = Date.now() - pipelineStart;
106-
logger.info(
107-
`[LeadGenerationEngine][Telemetry] Total pipeline: ${totalMs}ms | Collection: ${collectMs}ms | Scoring: ${scoreMs}ms | Synthesis: ${groupMs}ms | Entities: ${entities.length} | Observations: ${observations.length} | Leads: ${leads.length}`
102+
logger.debug(
103+
`[LeadGenerationEngine] Total pipeline: ${totalMs}ms | Collection: ${collectMs}ms | Scoring: ${scoreMs}ms | Synthesis: ${groupMs}ms | Entities: ${entities.length} | Observations: ${observations.length} | Leads: ${leads.length}`
108104
);
109105

110106
return leads.slice(0, config.maxLeads);
111107
},
112108
};
113109
};
114110

115-
// ---------------------------------------------------------------------------
116-
// Step 1: Observation collection
117-
// ---------------------------------------------------------------------------
118-
119111
const collectAllObservations = async (
120112
modules: ObservationModule[],
121113
entities: LeadEntity[],
@@ -129,8 +121,8 @@ const collectAllObservations = async (
129121
const moduleStart = Date.now();
130122
const moduleObservations = await module.collect(entities);
131123
const moduleMs = Date.now() - moduleStart;
132-
logger.info(
133-
`[LeadGenerationEngine][Telemetry] Module "${module.config.name}": ${moduleMs}ms (${moduleObservations.length} observations from ${entities.length} entities)`
124+
logger.debug(
125+
`[LeadGenerationEngine] Module "${module.config.name}": ${moduleMs}ms (${moduleObservations.length} observations from ${entities.length} entities)`
134126
);
135127
allObservations.push(...moduleObservations);
136128
} catch (error) {
@@ -144,18 +136,10 @@ const collectAllObservations = async (
144136
return allObservations;
145137
};
146138

147-
// ---------------------------------------------------------------------------
148-
// Step 2: Entity scoring
149-
//
150-
// Simplified formula:
151-
// priority = max(severity_rank) + clamp(observation_count - 1, 0, 4)
152-
//
153-
// severity_rank: critical=7, high=5, medium=3, low=1
154-
// This gives a natural 1-10 scale:
155-
// - 1 low observation = 1
156-
// - 1 critical + 4 more = 7 + min(4, 4) = 10 (max possible, capped at 10)
157-
// ---------------------------------------------------------------------------
158-
139+
/**
140+
* Scoring formula: priority = max(severity_rank) + clamp(observation_count - 1, 0, 4)
141+
* severity_rank: critical=7, high=5, medium=3, low=1 → natural 1–10 scale
142+
*/
159143
const SEVERITY_RANK: Record<string, number> = {
160144
critical: 7,
161145
high: 5,
@@ -210,17 +194,13 @@ const calculatePriority = (observations: Observation[]): number => {
210194
return Math.min(10, maxSeverityRank + countBonus);
211195
};
212196

213-
// ---------------------------------------------------------------------------
214-
// Step 3: Grouping into leads
215-
// ---------------------------------------------------------------------------
216-
217197
const groupIntoLeads = async (
218198
scoredEntities: ScoredEntity[],
219199
_config: LeadGenerationEngineConfig,
220200
logger: Logger,
221201
chatModel?: InferenceChatModel
222202
): Promise<Lead[]> => {
223-
resetUsedTitles();
203+
const usedTitleTracker = new Map<string, number>();
224204
const groups = groupByObservationPattern(scoredEntities);
225205
const leads: Lead[] = [];
226206
const now = new Date();
@@ -236,11 +216,12 @@ const groupIntoLeads = async (
236216
group,
237217
allObservations,
238218
logger,
219+
usedTitleTracker,
239220
chatModel
240221
);
241222
const synthMs = Date.now() - synthStart;
242-
logger.info(
243-
`[LeadGenerationEngine][Telemetry] Lead ${i + 1}/${
223+
logger.debug(
224+
`[LeadGenerationEngine] Lead ${i + 1}/${
244225
groups.length
245226
} synthesis for [${entityLabel}]: ${synthMs}ms (${chatModel ? 'LLM' : 'rule-based'})`
246227
);
@@ -255,7 +236,7 @@ const groupIntoLeads = async (
255236
priority: maxPriority,
256237
chatRecommendations: recommendations,
257238
timestamp: now.toISOString(),
258-
staleness: calculateStaleness(now, now),
239+
staleness: computeStaleness(now, now),
259240
observations: allObservations,
260241
});
261242
}
@@ -275,39 +256,11 @@ const groupByObservationPattern = (scoredEntities: ScoredEntity[]): ScoredEntity
275256
return scoredEntities.map((entity) => [entity]);
276257
};
277258

278-
// ---------------------------------------------------------------------------
279-
// Staleness model
280-
//
281-
// Fresh: 0-24 hours
282-
// Stale: 24-72 hours
283-
// Expired: >72 hours
284-
// ---------------------------------------------------------------------------
285-
286-
const STALENESS_THRESHOLDS = {
287-
fresh: 24 * 60 * 60 * 1000,
288-
stale: 72 * 60 * 60 * 1000,
289-
};
290-
291-
const calculateStaleness = (generatedAt: Date, now: Date): LeadStaleness => {
292-
const ageMs = now.getTime() - generatedAt.getTime();
293-
294-
if (ageMs <= STALENESS_THRESHOLDS.fresh) {
295-
return 'fresh';
296-
}
297-
if (ageMs <= STALENESS_THRESHOLDS.stale) {
298-
return 'stale';
299-
}
300-
return 'expired';
301-
};
302-
303-
// ---------------------------------------------------------------------------
304-
// Lead content synthesis (LLM-powered with rule-based fallback)
305-
// ---------------------------------------------------------------------------
306-
307259
const synthesizeLeadContent = async (
308260
group: ScoredEntity[],
309261
observations: Observation[],
310262
logger: Logger,
263+
usedTitleTracker: Map<string, number>,
311264
chatModel?: InferenceChatModel
312265
): Promise<{
313266
title: string;
@@ -319,7 +272,7 @@ const synthesizeLeadContent = async (
319272
if (chatModel) {
320273
try {
321274
const llmResult = await llmSynthesizeLeadContent(chatModel, group, observations, logger);
322-
const dominantPattern = selectDominantPattern(observations);
275+
const dominantPattern = selectDominantPattern(observations, usedTitleTracker);
323276
const byline = buildByline(group, observations, dominantPattern);
324277

325278
return {
@@ -336,12 +289,13 @@ const synthesizeLeadContent = async (
336289
}
337290
}
338291

339-
return ruleSynthesizeLeadContent(group, observations);
292+
return ruleSynthesizeLeadContent(group, observations, usedTitleTracker);
340293
};
341294

342295
const ruleSynthesizeLeadContent = (
343296
group: ScoredEntity[],
344-
observations: Observation[]
297+
observations: Observation[],
298+
usedTitleTracker: Map<string, number>
345299
): {
346300
title: string;
347301
byline: string;
@@ -351,7 +305,7 @@ const ruleSynthesizeLeadContent = (
351305
} => {
352306
const observationTypes = [...new Set(observations.map((o) => o.type))];
353307

354-
const dominantPattern = selectDominantPattern(observations);
308+
const dominantPattern = selectDominantPattern(observations, usedTitleTracker);
355309

356310
const title = buildRuleBasedTitle(group, dominantPattern);
357311
const byline = buildByline(group, observations, dominantPattern);
@@ -501,13 +455,10 @@ const PATTERN_CATALOG: Record<string, { labels: string[]; distinctiveness: numbe
501455
},
502456
};
503457

504-
const usedTitleTracker = new Map<string, number>();
505-
506-
const resetUsedTitles = (): void => {
507-
usedTitleTracker.clear();
508-
};
509-
510-
const selectDominantPattern = (observations: Observation[]): DominantPattern => {
458+
const selectDominantPattern = (
459+
observations: Observation[],
460+
usedTitleTracker: Map<string, number>
461+
): DominantPattern => {
511462
const bestByType = new Map<string, { score: number; confidence: number }>();
512463

513464
for (const obs of observations) {
@@ -722,5 +673,3 @@ const buildRecommendations = (group: ScoredEntity[], observations: Observation[]
722673

723674
return recommendations.slice(0, 5);
724675
};
725-
726-
const entityToKey = (entity: LeadEntity): string => `${entity.type}:${entity.name}`;

0 commit comments

Comments
 (0)