Skip to content

Commit 9c5ebf5

Browse files
[wrangler] Only allow "worker" queue consumer type in wrangler config (#13018)
Co-authored-by: Pete Bacon Darwin <pbacondarwin@cloudflare.com>
1 parent 535582d commit 9c5ebf5

9 files changed

Lines changed: 108 additions & 206 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
"wrangler": patch
3+
"@cloudflare/workers-utils": patch
4+
---
5+
6+
Validate that queue consumers in wrangler config only use the "worker" type
7+
8+
Previously, non-worker consumer types (e.g. `http_pull`) could be specified in the `queues.consumers` config. Now, wrangler will error if a consumer `type` other than `"worker"` is specified in the config file.
9+
10+
To configure non-worker consumer types, use the `wrangler queues consumer` CLI commands instead (e.g. `wrangler queues consumer http-pull add`).

packages/workers-utils/src/config/environment.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -863,8 +863,8 @@ export interface EnvironmentNonInheritable {
863863
/** The name of the queue from which this consumer should consume. */
864864
queue: string;
865865

866-
/** The consumer type, e.g., worker, http-pull, r2-bucket, etc. Default is worker. */
867-
type?: string;
866+
/** The consumer type. Only "worker" is supported in wrangler config. Default is "worker". */
867+
type?: "worker";
868868

869869
/** The maximum number of messages per batch */
870870
max_batch_size?: number;

packages/workers-utils/src/config/validation.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4425,6 +4425,16 @@ const validateConsumer: ValidatorFn = (diagnostics, field, value, _config) => {
44254425
);
44264426
}
44274427

4428+
// Validate that consumer type, if specified, is "worker".
4429+
// Non-worker consumer types (e.g., "http_pull") cannot be configured via
4430+
// wrangler config. Use `wrangler queues consumer http add` instead.
4431+
if ("type" in value && value.type !== undefined && value.type !== "worker") {
4432+
diagnostics.errors.push(
4433+
`"${field}.type" has an invalid value "${value.type}". Only "worker" consumers can be configured in your Wrangler configuration.`
4434+
);
4435+
isValid = false;
4436+
}
4437+
44284438
const options: {
44294439
key: string;
44304440
type: "number" | "string" | "boolean";

packages/workers-utils/src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ export type Trigger =
276276
| ({ type: "route" } & ZoneNameRoute)
277277
| ({ type: "route" } & CustomDomainRoute)
278278
| { type: "cron"; cron: string }
279-
| ({ type: "queue-consumer" } & QueueConsumer);
279+
| ({ type: "queue-consumer" } & Omit<QueueConsumer, "type">);
280280

281281
type BindingOmit<T> = Omit<T, "binding">;
282282
type NameOmit<T> = Omit<T, "name">;

packages/workers-utils/tests/config/validation/normalize-and-validate-config.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3432,6 +3432,47 @@ describe("normalizeAndValidateConfig()", () => {
34323432
`);
34333433
});
34343434

3435+
it("should error if queues consumer type is not 'worker'", ({
3436+
expect,
3437+
}) => {
3438+
const { diagnostics } = normalizeAndValidateConfig(
3439+
{
3440+
queues: {
3441+
consumers: [
3442+
{ queue: "myQueue", type: "http_pull" },
3443+
{ queue: "myQueue2", type: "r2_bucket" },
3444+
],
3445+
},
3446+
} as unknown as RawConfig,
3447+
undefined,
3448+
undefined,
3449+
{ env: undefined }
3450+
);
3451+
3452+
expect(diagnostics.renderErrors()).toMatchInlineSnapshot(`
3453+
"Processing wrangler configuration:
3454+
- "queues.consumers[0].type" has an invalid value "http_pull". Only "worker" consumers can be configured in your Wrangler configuration.
3455+
- "queues.consumers[1].type" has an invalid value "r2_bucket". Only "worker" consumers can be configured in your Wrangler configuration."
3456+
`);
3457+
});
3458+
3459+
it("should allow queues consumer type 'worker' explicitly", ({
3460+
expect,
3461+
}) => {
3462+
const { diagnostics } = normalizeAndValidateConfig(
3463+
{
3464+
queues: {
3465+
consumers: [{ queue: "myQueue", type: "worker" }],
3466+
},
3467+
} as unknown as RawConfig,
3468+
undefined,
3469+
undefined,
3470+
{ env: undefined }
3471+
);
3472+
3473+
expect(diagnostics.hasErrors()).toBe(false);
3474+
});
3475+
34353476
it("should error if queues consumers are not valid", ({ expect }) => {
34363477
const { config, diagnostics } = normalizeAndValidateConfig(
34373478
{

packages/wrangler/src/__tests__/deploy/helpers.ts

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -657,32 +657,6 @@ export function mockPostConsumerById(
657657
return requests;
658658
}
659659

660-
export function mockPostQueueHTTPConsumer(
661-
expectedQueueId: string,
662-
expectedBody: PostTypedConsumerBody
663-
) {
664-
const requests = { count: 0 };
665-
msw.use(
666-
http.post(
667-
`*/accounts/:accountId/queues/:queueId/consumers`,
668-
async ({ request, params }) => {
669-
const body = await request.json();
670-
expect(params.queueId).toEqual(expectedQueueId);
671-
expect(params.accountId).toEqual("some-account-id");
672-
expect(body).toEqual(expectedBody);
673-
requests.count += 1;
674-
return HttpResponse.json({
675-
success: true,
676-
errors: [],
677-
messages: [],
678-
result: {},
679-
});
680-
}
681-
)
682-
);
683-
return requests;
684-
}
685-
686660
export const mockAUSRequest = async (
687661
bodies?: AssetManifest[],
688662
buckets: string[][] = [[]],

packages/wrangler/src/__tests__/deploy/queues.test.ts

Lines changed: 5 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import {
2323
mockLastDeploymentRequest,
2424
mockPatchScriptSettings,
2525
mockPostConsumerById,
26-
mockPostQueueHTTPConsumer,
2726
mockPutQueueConsumerById,
2827
} from "./helpers";
2928
import type { QueueResponse } from "../../queues/client";
@@ -433,124 +432,24 @@ describe("deploy", () => {
433432
`);
434433
});
435434

436-
it("should post queue http consumers on deploy", async ({ expect }) => {
437-
writeWranglerConfig({
438-
queues: {
439-
consumers: [
440-
{
441-
queue: queueName,
442-
type: "http_pull",
443-
dead_letter_queue: "myDLQ",
444-
max_batch_size: 5,
445-
visibility_timeout_ms: 4000,
446-
max_retries: 10,
447-
retry_delay: 1,
448-
},
449-
],
450-
},
451-
});
452-
await fs.promises.writeFile("index.js", `export default {};`);
453-
mockSubDomainRequest();
454-
mockUploadWorkerRequest();
455-
const existingQueue: QueueResponse = {
456-
queue_id: queueId,
457-
queue_name: queueName,
458-
created_on: "",
459-
producers: [],
460-
consumers: [],
461-
producers_total_count: 0,
462-
consumers_total_count: 0,
463-
modified_on: "",
464-
};
465-
mockGetQueueByName(queueName, existingQueue);
466-
mockPostQueueHTTPConsumer(queueId, {
467-
type: "http_pull",
468-
dead_letter_queue: "myDLQ",
469-
settings: {
470-
batch_size: 5,
471-
max_retries: 10,
472-
visibility_timeout_ms: 4000,
473-
retry_delay: 1,
474-
},
475-
});
476-
await runWrangler("deploy index.js");
477-
expect(std.out).toMatchInlineSnapshot(`
478-
"
479-
⛅️ wrangler x.x.x
480-
──────────────────
481-
Total Upload: xx KiB / gzip: xx KiB
482-
Worker Startup Time: 100 ms
483-
Uploaded test-name (TIMINGS)
484-
Deployed test-name triggers (TIMINGS)
485-
https://test-name.test-sub-domain.workers.dev
486-
Consumer for queue1
487-
Current Version ID: Galaxy-Class"
488-
`);
489-
});
490-
491-
it("should update queue http consumers when one already exists for queue", async ({
435+
it("should reject http_pull consumer type in config", async ({
492436
expect,
493437
}) => {
494438
writeWranglerConfig({
495439
queues: {
496440
consumers: [
497441
{
498442
queue: queueName,
499-
type: "http_pull",
443+
// Cast needed to simulate invalid user input that bypasses static type checking; runtime validation is what this test exercises
444+
type: "http_pull" as "worker",
500445
},
501446
],
502447
},
503448
});
504449
await fs.promises.writeFile("index.js", `export default {};`);
505-
mockSubDomainRequest();
506-
mockUploadWorkerRequest();
507-
const existingQueue: QueueResponse = {
508-
queue_id: queueId,
509-
queue_name: queueName,
510-
created_on: "",
511-
producers: [],
512-
consumers: [
513-
{
514-
type: "http_pull",
515-
consumer_id: "queue1-consumer-id",
516-
settings: {},
517-
},
518-
],
519-
producers_total_count: 0,
520-
consumers_total_count: 0,
521-
modified_on: "",
522-
};
523-
mockGetQueueByName(queueName, existingQueue);
524-
525-
msw.use(
526-
http.put(
527-
`*/accounts/:accountId/queues/:queueId/consumers/:consumerId`,
528-
async ({ params }) => {
529-
expect(params.queueId).toEqual(queueId);
530-
expect(params.consumerId).toEqual("queue1-consumer-id");
531-
expect(params.accountId).toEqual("some-account-id");
532-
return HttpResponse.json({
533-
success: true,
534-
errors: [],
535-
messages: [],
536-
result: null,
537-
});
538-
}
539-
)
450+
await expect(runWrangler("deploy index.js")).rejects.toThrowError(
451+
/Only "worker" consumers can be configured in your Wrangler configuration/
540452
);
541-
await runWrangler("deploy index.js");
542-
expect(std.out).toMatchInlineSnapshot(`
543-
"
544-
⛅️ wrangler x.x.x
545-
──────────────────
546-
Total Upload: xx KiB / gzip: xx KiB
547-
Worker Startup Time: 100 ms
548-
Uploaded test-name (TIMINGS)
549-
Deployed test-name triggers (TIMINGS)
550-
https://test-name.test-sub-domain.workers.dev
551-
Consumer for queue1
552-
Current Version ID: Galaxy-Class"
553-
`);
554453
});
555454

556455
it("should support queue consumer concurrency with a max concurrency specified", async ({

packages/wrangler/src/api/startDevWorker/LocalRuntimeController.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ export async function convertToConfigBundle(
5555
if (trigger.type === "cron") {
5656
crons.push(trigger.cron);
5757
} else if (trigger.type === "queue-consumer") {
58-
queueConsumers.push(trigger);
58+
const { type: _, ...consumer } = trigger;
59+
queueConsumers.push(consumer);
5960
}
6061
}
6162
if (event.bundle.entry.format === "service-worker") {

0 commit comments

Comments
 (0)