Skip to content

Commit a91acdd

Browse files
committed
add tests for workflow composition
1 parent 07ceb1b commit a91acdd

1 file changed

Lines changed: 220 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import { tags } from '@kbn/scout';
11+
import { expect } from '@kbn/scout/api';
12+
import { isTerminalStatus } from '@kbn/workflows';
13+
import { ExecutionStatus } from '@kbn/workflows/types/latest';
14+
import type { WorkflowsApiService } from '../../../common/apis/workflows';
15+
import { waitForConditionOrThrow } from '../../../common/utils/wait_for_condition';
16+
import { spaceTest } from '../../fixtures';
17+
18+
// ---------------------------------------------------------------------------
19+
// Workflow YAML templates — "System Health Monitoring" scenario
20+
// ---------------------------------------------------------------------------
21+
22+
const CHILD_HEALTH_CHECK_YAML = `
23+
name: Check Service Health
24+
enabled: true
25+
description: Validates health of a given service and returns diagnostic output
26+
triggers:
27+
- type: manual
28+
29+
inputs:
30+
- name: service_name
31+
type: string
32+
default: "api-gateway"
33+
34+
steps:
35+
- name: health_check
36+
type: console
37+
with:
38+
message: "Health check passed for {{ inputs.service_name }}"
39+
`;
40+
41+
const getAsyncParentYaml = (childWorkflowId: string) => `
42+
name: Monitor Services Async
43+
enabled: true
44+
description: Dispatches health checks asynchronously without waiting for results
45+
triggers:
46+
- type: manual
47+
48+
inputs:
49+
- name: service_name
50+
type: string
51+
default: "api-gateway"
52+
53+
steps:
54+
- name: log_start
55+
type: console
56+
with:
57+
message: "Initiating async health check for {{ inputs.service_name }}"
58+
59+
- name: dispatch_health_check
60+
type: workflow.executeAsync
61+
with:
62+
workflow-id: ${childWorkflowId}
63+
inputs:
64+
service_name: "{{ inputs.service_name }}"
65+
66+
- name: log_dispatch
67+
type: console
68+
with:
69+
message: "{{steps.dispatch_health_check.output | json}}"
70+
`;
71+
72+
const getSyncParentYaml = (childWorkflowId: string) => `
73+
name: Monitor Services Sync
74+
enabled: true
75+
description: Runs health check synchronously and waits for the result
76+
triggers:
77+
- type: manual
78+
79+
inputs:
80+
- name: service_name
81+
type: string
82+
default: "api-gateway"
83+
84+
steps:
85+
- name: log_start
86+
type: console
87+
with:
88+
message: "Initiating sync health check for {{ inputs.service_name }}"
89+
90+
- name: run_health_check
91+
type: workflow.execute
92+
with:
93+
workflow-id: ${childWorkflowId}
94+
inputs:
95+
service_name: "{{ inputs.service_name }}"
96+
97+
- name: log_result
98+
type: console
99+
with:
100+
message: "{{steps.run_health_check.output | json}}"
101+
`;
102+
103+
const FAILING_CHILD_YAML = `
104+
name: Notify via Slack
105+
enabled: true
106+
description: Sends Slack notification (fails due to non-existing connector)
107+
triggers:
108+
- type: manual
109+
110+
steps:
111+
- name: send_notification
112+
type: slack
113+
connector-id: "non-existing-slack-connector"
114+
with:
115+
message: "Health check alert triggered"
116+
`;
117+
118+
// ---------------------------------------------------------------------------
119+
// Helpers
120+
// ---------------------------------------------------------------------------
121+
122+
const SYNC_POLL_TIMEOUT = 30_000;
123+
124+
async function waitForExecution(workflowsApi: WorkflowsApiService, executionId: string) {
125+
return waitForConditionOrThrow({
126+
action: () => workflowsApi.getExecution(executionId),
127+
condition: (exec) => !!exec && isTerminalStatus(exec.status ?? ''),
128+
interval: 1000,
129+
timeout: SYNC_POLL_TIMEOUT,
130+
errorMessage: (exec) =>
131+
`Execution ${executionId} did not terminate within ${SYNC_POLL_TIMEOUT}ms (last status: ${exec?.status})`,
132+
});
133+
}
134+
135+
// ---------------------------------------------------------------------------
136+
// Tests
137+
// ---------------------------------------------------------------------------
138+
139+
spaceTest.describe('Workflow composition', { tag: tags.deploymentAgnostic }, () => {
140+
let workflowsApi: WorkflowsApiService;
141+
let childWorkflowId: string;
142+
let asyncParentId: string;
143+
let syncParentId: string;
144+
145+
spaceTest.beforeAll(async ({ apiServices }) => {
146+
spaceTest.setTimeout(60_000);
147+
workflowsApi = apiServices.workflowsApi;
148+
149+
const child = await workflowsApi.create(CHILD_HEALTH_CHECK_YAML);
150+
childWorkflowId = child.id;
151+
152+
const asyncParent = await workflowsApi.create(getAsyncParentYaml(childWorkflowId));
153+
asyncParentId = asyncParent.id;
154+
155+
const syncParent = await workflowsApi.create(getSyncParentYaml(childWorkflowId));
156+
syncParentId = syncParent.id;
157+
});
158+
159+
spaceTest.afterAll(async () => {
160+
await workflowsApi.deleteAll();
161+
});
162+
163+
// -- Async strategy -------------------------------------------------------
164+
165+
spaceTest('async: dispatches child workflow and receives execution metadata', async () => {
166+
const { workflowExecutionId } = await workflowsApi.run(asyncParentId, {
167+
service_name: 'payments-service',
168+
});
169+
170+
const parentExecution = await workflowsApi.waitForTermination({ workflowExecutionId });
171+
expect(parentExecution?.status).toBe(ExecutionStatus.COMPLETED);
172+
expect(parentExecution?.stepExecutions).toHaveLength(3);
173+
174+
const executionWithOutputs = await workflowsApi.getExecution(workflowExecutionId, {
175+
includeOutput: true,
176+
});
177+
const dispatchStep = executionWithOutputs?.stepExecutions.find(
178+
(s) => s.stepId === 'dispatch_health_check'
179+
);
180+
const output = dispatchStep?.output as Record<string, unknown>;
181+
182+
expect(output?.workflowId).toBe(childWorkflowId);
183+
expect(typeof output?.executionId).toBe('string');
184+
expect(output?.awaited).toBe(false);
185+
expect(output?.status).toBe('pending');
186+
187+
const childExecution = await workflowsApi.getExecution(output?.executionId as string);
188+
expect(childExecution).toBeDefined();
189+
});
190+
191+
// -- Sync strategy ---------------------------------------------------------
192+
193+
spaceTest('sync: waits for child workflow and completes with correct step count', async () => {
194+
const { workflowExecutionId } = await workflowsApi.run(syncParentId, {
195+
service_name: 'auth-service',
196+
});
197+
198+
const parentExecution = await waitForExecution(workflowsApi, workflowExecutionId);
199+
200+
expect(parentExecution?.status).toBe(ExecutionStatus.COMPLETED);
201+
expect(parentExecution?.stepExecutions).toHaveLength(3);
202+
203+
const { results: childExecutions } = await workflowsApi.getExecutions(childWorkflowId);
204+
const completedChildren = childExecutions.filter((e) => e.status === ExecutionStatus.COMPLETED);
205+
expect(completedChildren.length).toBeGreaterThan(0);
206+
});
207+
208+
spaceTest('sync: fails when child workflow fails', async () => {
209+
const failingChild = await workflowsApi.create(FAILING_CHILD_YAML);
210+
const failingParent = await workflowsApi.create(getSyncParentYaml(failingChild.id));
211+
212+
const { workflowExecutionId } = await workflowsApi.run(failingParent.id, {
213+
service_name: 'broken-service',
214+
});
215+
216+
const parentExecution = await waitForExecution(workflowsApi, workflowExecutionId);
217+
218+
expect(parentExecution?.status).toBe(ExecutionStatus.FAILED);
219+
});
220+
});

0 commit comments

Comments
 (0)