[One workflow] event-driven scheduling and trigger pagination#257283
[One workflow] event-driven scheduling and trigger pagination#257283yngrdyn merged 10 commits intoelastic:mainfrom
Conversation
…to avoid overloading ES/Task Manager when many workflows match
.../plugins/shared/workflows_management/server/workflows_management/workflows_management_api.ts
Show resolved
Hide resolved
| let searchAfter: estypes.SearchHit['sort'] | undefined; | ||
| let hasMore = true; | ||
|
|
||
| while (hasMore) { |
There was a problem hiding this comment.
would it make sense to limit this to something?
Something like:
while (hasMore && pageCount < MAX_PAGES) {to avoid unbounded sequential ES requests.
Alternatively the pageCount can be introduced to a more precise hasMore check (see below).
There was a problem hiding this comment.
Addressed in 8b987ae, although I'm not sure about the number I used. This number will be confirmed on our next sync (it's arbitrary) but I want this PR to not be blocked by this
...gins/shared/workflows_management/server/workflows_management/workflows_management_service.ts
Outdated
Show resolved
Hide resolved
...gins/shared/workflows_management/server/workflows_management/workflows_management_service.ts
Outdated
Show resolved
Hide resolved
...gins/shared/workflows_management/server/workflows_management/workflows_management_service.ts
Show resolved
Hide resolved
src/platform/plugins/shared/workflows_management/server/event_driven/trigger_event_handler.ts
Outdated
Show resolved
Hide resolved
...gins/shared/workflows_management/server/workflows_management/workflows_management_service.ts
Show resolved
Hide resolved
| const pageSize = 1000; | ||
| const MAX_PAGES = 100; | ||
| const keepAlive = '1m'; |
There was a problem hiding this comment.
1000 * 100 workflows in 1 minute sounds quite a challenging number.
Perhaps the amount of workflow can be lowered down a bit here?
There was a problem hiding this comment.
For the moment this is an arbitrary number, I'd like to take this discussion to our next sync
| expect(maxConcurrent).toBeLessThanOrEqual(20); | ||
| }); | ||
|
|
||
| it('should resolve after all scheduleWorkflow calls are invoked (fire-and-forget scheduling)', async () => { |
There was a problem hiding this comment.
it is relaly fire-and-forget? The code is waiting with allSettled inside
There was a problem hiding this comment.
It's waiting for the scheduling not the run of those workflows
💛 Build succeeded, but was flaky
Failed CI StepsMetrics [docs]
History
cc @yngrdyn |
Addresses open review comments on [event-driven workflow execution](#254964): concurrency/back pressure, pagination and related tests. ## Summary When many workflows are subscribed to a trigger, emitting an event can overload the node if we run them all inline. This PR **schedules** event-driven workflow runs via **TM** (same model as alerts) and caps concurrent **scheduling** with **p-limit** so `es` and `TM` are not flooded. It also fixes **silent truncation** when more than 1000 workflows match a trigger by implementing **PIT + search_after** in `getWorkflowsSubscribedToTrigger`, so all matching workflows are returned.
…c#257283) Addresses open review comments on [event-driven workflow execution](elastic#254964): concurrency/back pressure, pagination and related tests. ## Summary When many workflows are subscribed to a trigger, emitting an event can overload the node if we run them all inline. This PR **schedules** event-driven workflow runs via **TM** (same model as alerts) and caps concurrent **scheduling** with **p-limit** so `es` and `TM` are not flooded. It also fixes **silent truncation** when more than 1000 workflows match a trigger by implementing **PIT + search_after** in `getWorkflowsSubscribedToTrigger`, so all matching workflows are returned.
Addresses open review comments on event-driven workflow execution: concurrency/back pressure, pagination and related tests.
Summary
When many workflows are subscribed to a trigger, emitting an event can overload the node if we run them all inline. This PR schedules event-driven workflow runs via TM (same model as alerts) and caps concurrent scheduling with p-limit so
esandTMare not flooded. It also fixes silent truncation when more than 1000 workflows match a trigger by implementing PIT + search_after ingetWorkflowsSubscribedToTrigger, so all matching workflows are returned.