Skip to content

Commit 97ba8a7

Browse files
committed
Revert "[Search] Search batching using bfetch (#83418)"
This reverts commit 5708c5d.
1 parent 9fcf1f0 commit 97ba8a7

31 files changed

Lines changed: 81 additions & 414 deletions

docs/development/plugins/data/public/kibana-plugin-plugins-data-public.plugin.setup.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
<b>Signature:</b>
88

99
```typescript
10-
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
10+
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
1111
```
1212

1313
## Parameters
1414

1515
| Parameter | Type | Description |
1616
| --- | --- | --- |
1717
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
18-
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
18+
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
1919

2020
<b>Returns:</b>
2121

docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md

Lines changed: 0 additions & 11 deletions
This file was deleted.

docs/development/plugins/data/public/kibana-plugin-plugins-data-public.searchinterceptordeps.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ export interface SearchInterceptorDeps
1414

1515
| Property | Type | Description |
1616
| --- | --- | --- |
17-
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
1817
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
1918
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
2019
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |

docs/development/plugins/data/server/kibana-plugin-plugins-data-server.plugin.setup.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<b>Signature:</b>
88

99
```typescript
10-
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
10+
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
1111
__enhance: (enhancements: DataEnhancements) => void;
1212
search: ISearchSetup;
1313
fieldFormats: {
@@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, e
2121
| Parameter | Type | Description |
2222
| --- | --- | --- |
2323
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
24-
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
24+
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
2525

2626
<b>Returns:</b>
2727

src/plugins/bfetch/public/batching/create_streaming_batched_function.test.ts

Lines changed: 1 addition & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
2121
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
22-
import { AbortError, defer, of } from '../../../kibana_utils/public';
22+
import { defer, of } from '../../../kibana_utils/public';
2323
import { Subject } from 'rxjs';
2424

2525
const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
@@ -168,28 +168,6 @@ describe('createStreamingBatchedFunction()', () => {
168168
expect(fetchStreaming).toHaveBeenCalledTimes(1);
169169
});
170170

171-
test('ignores a request with an aborted signal', async () => {
172-
const { fetchStreaming } = setup();
173-
const fn = createStreamingBatchedFunction({
174-
url: '/test',
175-
fetchStreaming,
176-
maxItemAge: 5,
177-
flushOnMaxItems: 3,
178-
});
179-
180-
const abortController = new AbortController();
181-
abortController.abort();
182-
183-
of(fn({ foo: 'bar' }, abortController.signal));
184-
fn({ baz: 'quix' });
185-
186-
await new Promise((r) => setTimeout(r, 6));
187-
const { body } = fetchStreaming.mock.calls[0][0];
188-
expect(JSON.parse(body)).toEqual({
189-
batch: [{ baz: 'quix' }],
190-
});
191-
});
192-
193171
test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
194172
const { fetchStreaming } = setup();
195173
const fn = createStreamingBatchedFunction({
@@ -445,73 +423,6 @@ describe('createStreamingBatchedFunction()', () => {
445423
expect(result3).toEqual({ b: '3' });
446424
});
447425

448-
describe('when requests are aborted', () => {
449-
test('aborts stream when all are aborted', async () => {
450-
const { fetchStreaming } = setup();
451-
const fn = createStreamingBatchedFunction({
452-
url: '/test',
453-
fetchStreaming,
454-
maxItemAge: 5,
455-
flushOnMaxItems: 3,
456-
});
457-
458-
const abortController = new AbortController();
459-
const promise = fn({ a: '1' }, abortController.signal);
460-
const promise2 = fn({ a: '2' }, abortController.signal);
461-
await new Promise((r) => setTimeout(r, 6));
462-
463-
expect(await isPending(promise)).toBe(true);
464-
expect(await isPending(promise2)).toBe(true);
465-
466-
abortController.abort();
467-
await new Promise((r) => setTimeout(r, 6));
468-
469-
expect(await isPending(promise)).toBe(false);
470-
expect(await isPending(promise2)).toBe(false);
471-
const [, error] = await of(promise);
472-
const [, error2] = await of(promise2);
473-
expect(error).toBeInstanceOf(AbortError);
474-
expect(error2).toBeInstanceOf(AbortError);
475-
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
476-
});
477-
478-
test('rejects promise on abort and lets others continue', async () => {
479-
const { fetchStreaming, stream } = setup();
480-
const fn = createStreamingBatchedFunction({
481-
url: '/test',
482-
fetchStreaming,
483-
maxItemAge: 5,
484-
flushOnMaxItems: 3,
485-
});
486-
487-
const abortController = new AbortController();
488-
const promise = fn({ a: '1' }, abortController.signal);
489-
const promise2 = fn({ a: '2' });
490-
await new Promise((r) => setTimeout(r, 6));
491-
492-
expect(await isPending(promise)).toBe(true);
493-
494-
abortController.abort();
495-
await new Promise((r) => setTimeout(r, 6));
496-
497-
expect(await isPending(promise)).toBe(false);
498-
const [, error] = await of(promise);
499-
expect(error).toBeInstanceOf(AbortError);
500-
501-
stream.next(
502-
JSON.stringify({
503-
id: 1,
504-
result: { b: '2' },
505-
}) + '\n'
506-
);
507-
508-
await new Promise((r) => setTimeout(r, 1));
509-
510-
const [result2] = await of(promise2);
511-
expect(result2).toEqual({ b: '2' });
512-
});
513-
});
514-
515426
describe('when stream closes prematurely', () => {
516427
test('rejects pending promises with CONNECTION error code', async () => {
517428
const { fetchStreaming, stream } = setup();

src/plugins/bfetch/public/batching/create_streaming_batched_function.ts

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
20+
import { defer, Defer } from '../../../kibana_utils/public';
2121
import {
2222
ItemBufferParams,
2323
TimedItemBufferParams,
@@ -27,7 +27,13 @@ import {
2727
} from '../../common';
2828
import { fetchStreaming, split } from '../streaming';
2929
import { normalizeError } from '../../common';
30-
import { BatchedFunc, BatchItem } from './types';
30+
31+
export interface BatchItem<Payload, Result> {
32+
payload: Payload;
33+
future: Defer<Result>;
34+
}
35+
36+
export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
3137

3238
export interface BatchedFunctionProtocolError extends ErrorLike {
3339
code: string;
@@ -76,67 +82,32 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
7682
flushOnMaxItems = 25,
7783
maxItemAge = 10,
7884
} = params;
79-
const [fn] = createBatchedFunction({
80-
onCall: (payload: Payload, signal?: AbortSignal) => {
85+
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
86+
onCall: (payload: Payload) => {
8187
const future = defer<Result>();
8288
const entry: BatchItem<Payload, Result> = {
8389
payload,
8490
future,
85-
signal,
8691
};
8792
return [future.promise, entry];
8893
},
8994
onBatch: async (items) => {
9095
try {
91-
// Filter out any items whose signal is already aborted
92-
items = items.filter((item) => {
93-
if (item.signal?.aborted) item.future.reject(new AbortError());
94-
return !item.signal?.aborted;
95-
});
96-
97-
const donePromises: Array<Promise<any>> = items.map((item) => {
98-
return new Promise<void>((resolve) => {
99-
const { promise: abortPromise, cleanup } = item.signal
100-
? abortSignalToPromise(item.signal)
101-
: {
102-
promise: undefined,
103-
cleanup: () => {},
104-
};
105-
106-
const onDone = () => {
107-
resolve();
108-
cleanup();
109-
};
110-
if (abortPromise)
111-
abortPromise.catch(() => {
112-
item.future.reject(new AbortError());
113-
onDone();
114-
});
115-
item.future.promise.then(onDone, onDone);
116-
});
117-
});
118-
119-
// abort when all items were either resolved, rejected or aborted
120-
const abortController = new AbortController();
121-
let isBatchDone = false;
122-
Promise.all(donePromises).then(() => {
123-
isBatchDone = true;
124-
abortController.abort();
125-
});
126-
const batch = items.map((item) => item.payload);
127-
96+
let responsesReceived = 0;
97+
const batch = items.map(({ payload }) => payload);
12898
const { stream } = fetchStreamingInjected({
12999
url,
130100
body: JSON.stringify({ batch }),
131101
method: 'POST',
132-
signal: abortController.signal,
133102
});
134103
stream.pipe(split('\n')).subscribe({
135104
next: (json: string) => {
136105
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
137106
if (response.error) {
107+
responsesReceived++;
138108
items[response.id].future.reject(response.error);
139109
} else if (response.result !== undefined) {
110+
responsesReceived++;
140111
items[response.id].future.resolve(response.result);
141112
}
142113
},
@@ -146,7 +117,8 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
146117
for (const { future } of items) future.reject(normalizedError);
147118
},
148119
complete: () => {
149-
if (!isBatchDone) {
120+
const streamTerminatedPrematurely = responsesReceived !== items.length;
121+
if (streamTerminatedPrematurely) {
150122
const error: BatchedFunctionProtocolError = {
151123
message: 'Connection terminated prematurely.',
152124
code: 'CONNECTION',

src/plugins/bfetch/public/batching/types.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/plugins/bfetch/public/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import { BfetchPublicPlugin } from './plugin';
2323
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
2424
export { split } from './streaming';
2525

26-
export { BatchedFunc } from './batching/types';
27-
2826
export function plugin(initializerContext: PluginInitializerContext) {
2927
return new BfetchPublicPlugin(initializerContext);
3028
}

src/plugins/bfetch/public/plugin.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
2222
import { removeLeadingSlash } from '../common';
2323
import {
2424
createStreamingBatchedFunction,
25+
BatchedFunc,
2526
StreamingBatchedFunctionParams,
2627
} from './batching/create_streaming_batched_function';
27-
import { BatchedFunc } from './batching/types';
2828

2929
// eslint-disable-next-line
3030
export interface BfetchPublicSetupDependencies {}

src/plugins/bfetch/public/streaming/fetch_streaming.test.ts

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -132,33 +132,6 @@ test('completes stream observable when request finishes', async () => {
132132
expect(spy).toHaveBeenCalledTimes(1);
133133
});
134134

135-
test('completes stream observable when aborted', async () => {
136-
const env = setup();
137-
const abort = new AbortController();
138-
const { stream } = fetchStreaming({
139-
url: 'http://example.com',
140-
signal: abort.signal,
141-
});
142-
143-
const spy = jest.fn();
144-
stream.subscribe({
145-
complete: spy,
146-
});
147-
148-
expect(spy).toHaveBeenCalledTimes(0);
149-
150-
(env.xhr as any).responseText = 'foo';
151-
env.xhr.onprogress!({} as any);
152-
153-
abort.abort();
154-
155-
(env.xhr as any).readyState = 4;
156-
(env.xhr as any).status = 200;
157-
env.xhr.onreadystatechange!({} as any);
158-
159-
expect(spy).toHaveBeenCalledTimes(1);
160-
});
161-
162135
test('promise throws when request errors', async () => {
163136
const env = setup();
164137
const { stream } = fetchStreaming({

0 commit comments

Comments
 (0)