Skip to content

Commit b0465e3

Browse files
Merge branch 'main' into pipelines-cli-binary-format
2 parents 3f2809e + a02ef9c commit b0465e3

40 files changed

Lines changed: 2745 additions & 413 deletions

File tree

packages/@aws-cdk/aws-lambda-event-sources/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,13 +261,17 @@ const topic = 'some-cool-topic';
261261
// The secret that allows access to your self hosted Kafka cluster
262262
declare const secret: Secret;
263263

264+
// (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption
265+
declare const encryption: Secret;
266+
264267
declare const myFunction: lambda.Function;
265268
myFunction.addEventSource(new SelfManagedKafkaEventSource({
266269
bootstrapServers: bootstrapServers,
267270
topic: topic,
268271
secret: secret,
269272
batchSize: 100, // default
270273
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
274+
encryption: encryption // optional
271275
}));
272276
```
273277

packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { StreamEventSource, BaseStreamEventSourceProps } from './stream';
1010
/**
1111
* Properties for a Kafka event source
1212
*/
13-
export interface KafkaEventSourceProps extends BaseStreamEventSourceProps{
13+
export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
1414
/**
1515
* The Kafka topic to subscribe to
1616
*/
@@ -94,6 +94,14 @@ export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps
9494
* @default AuthenticationMethod.SASL_SCRAM_512_AUTH
9595
*/
9696
readonly authenticationMethod?: AuthenticationMethod
97+
98+
/**
99+
* The secret with the root CA certificate used by your Kafka brokers for TLS encryption
100+
* This field is required if your Kafka brokers use certificates signed by a private CA
101+
*
102+
* @default - none
103+
*/
104+
readonly rootCACertificate?: secretsmanager.Secret;
97105
}
98106

99107
/**
@@ -231,6 +239,13 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
231239
sourceAccessConfigurations.push({ type: authType, uri: this.innerProps.secret.secretArn });
232240
}
233241

242+
if (this.innerProps.rootCACertificate !== undefined) {
243+
sourceAccessConfigurations.push({
244+
type: lambda.SourceAccessConfigurationType.SERVER_ROOT_CA_CERTIFICATE,
245+
uri: this.innerProps.rootCACertificate.secretArn,
246+
});
247+
}
248+
234249
if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) {
235250
sourceAccessConfigurations.push({
236251
type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP,
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import * as lambda from '@aws-cdk/aws-lambda';
2+
import * as secretsmanager from '@aws-cdk/aws-secretsmanager';
3+
import * as cdk from '@aws-cdk/core';
4+
import * as integ from '@aws-cdk/integ-tests';
5+
import { AuthenticationMethod, SelfManagedKafkaEventSource } from '../lib';
6+
import { TestFunction } from './test-function';
7+
8+
class KafkaSelfManagedEventSourceTest extends cdk.Stack {
9+
constructor(scope: cdk.App, id: string) {
10+
super(scope, id);
11+
12+
const dummyCertString = `-----BEGIN CERTIFICATE-----
13+
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
14+
cmUuiAii9R0=
15+
-----END CERTIFICATE-----
16+
-----BEGIN CERTIFICATE-----
17+
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
18+
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
19+
-----END CERTIFICATE-----"
20+
`;
21+
22+
const dummyPrivateKey = `-----BEGIN ENCRYPTED PRIVATE KEY-----
23+
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
24+
-----END ENCRYPTED PRIVATE KEY-----`;
25+
26+
const fn = new TestFunction(this, 'F');
27+
const rootCASecret = new secretsmanager.Secret(this, 'S', {
28+
secretObjectValue: {
29+
certificate: cdk.SecretValue.unsafePlainText(dummyCertString),
30+
},
31+
});
32+
const clientCertificatesSecret = new secretsmanager.Secret(this, 'SC', {
33+
secretObjectValue: {
34+
certificate: cdk.SecretValue.unsafePlainText(dummyCertString),
35+
privateKey: cdk.SecretValue.unsafePlainText(dummyPrivateKey),
36+
},
37+
});
38+
rootCASecret.grantRead(fn);
39+
clientCertificatesSecret.grantRead(fn);
40+
41+
const bootstrapServers = [
42+
'my-self-hosted-kafka-broker-1:9092',
43+
'my-self-hosted-kafka-broker-2:9092',
44+
'my-self-hosted-kafka-broker-3:9092',
45+
];
46+
47+
fn.addEventSource(
48+
new SelfManagedKafkaEventSource({
49+
bootstrapServers,
50+
topic: 'my-test-topic',
51+
secret: clientCertificatesSecret,
52+
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
53+
rootCACertificate: rootCASecret,
54+
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
55+
}),
56+
);
57+
}
58+
}
59+
60+
const app = new cdk.App();
61+
const stack = new KafkaSelfManagedEventSourceTest(
62+
app,
63+
'lambda-event-source-kafka-self-managed',
64+
);
65+
new integ.IntegTest(app, 'LambdaEventSourceKafkaSelfManagedTest', {
66+
testCases: [stack],
67+
});
68+
app.synth();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"version":"20.0.0"}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"version": "20.0.0",
3+
"testCases": {
4+
"LambdaEventSourceKafkaSelfManagedTest/DefaultTest": {
5+
"stacks": [
6+
"lambda-event-source-kafka-self-managed"
7+
],
8+
"assertionStack": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F"
9+
}
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
{
2+
"Resources": {
3+
"FServiceRole3AC82EE1": {
4+
"Type": "AWS::IAM::Role",
5+
"Properties": {
6+
"AssumeRolePolicyDocument": {
7+
"Statement": [
8+
{
9+
"Action": "sts:AssumeRole",
10+
"Effect": "Allow",
11+
"Principal": {
12+
"Service": "lambda.amazonaws.com"
13+
}
14+
}
15+
],
16+
"Version": "2012-10-17"
17+
},
18+
"ManagedPolicyArns": [
19+
{
20+
"Fn::Join": [
21+
"",
22+
[
23+
"arn:",
24+
{
25+
"Ref": "AWS::Partition"
26+
},
27+
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
28+
]
29+
]
30+
}
31+
]
32+
}
33+
},
34+
"FServiceRoleDefaultPolicy17A19BFA": {
35+
"Type": "AWS::IAM::Policy",
36+
"Properties": {
37+
"PolicyDocument": {
38+
"Statement": [
39+
{
40+
"Action": [
41+
"secretsmanager:DescribeSecret",
42+
"secretsmanager:GetSecretValue"
43+
],
44+
"Effect": "Allow",
45+
"Resource": [
46+
{
47+
"Ref": "S509448A1"
48+
},
49+
{
50+
"Ref": "SC0855C491"
51+
}
52+
]
53+
}
54+
],
55+
"Version": "2012-10-17"
56+
},
57+
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
58+
"Roles": [
59+
{
60+
"Ref": "FServiceRole3AC82EE1"
61+
}
62+
]
63+
}
64+
},
65+
"FC4345940": {
66+
"Type": "AWS::Lambda::Function",
67+
"Properties": {
68+
"Code": {
69+
"ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}"
70+
},
71+
"Role": {
72+
"Fn::GetAtt": [
73+
"FServiceRole3AC82EE1",
74+
"Arn"
75+
]
76+
},
77+
"Handler": "index.handler",
78+
"Runtime": "nodejs14.x"
79+
},
80+
"DependsOn": [
81+
"FServiceRoleDefaultPolicy17A19BFA",
82+
"FServiceRole3AC82EE1"
83+
]
84+
},
85+
"FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798": {
86+
"Type": "AWS::Lambda::EventSourceMapping",
87+
"Properties": {
88+
"FunctionName": {
89+
"Ref": "FC4345940"
90+
},
91+
"BatchSize": 100,
92+
"SelfManagedEventSource": {
93+
"Endpoints": {
94+
"KafkaBootstrapServers": [
95+
"my-self-hosted-kafka-broker-1:9092",
96+
"my-self-hosted-kafka-broker-2:9092",
97+
"my-self-hosted-kafka-broker-3:9092"
98+
]
99+
}
100+
},
101+
"SourceAccessConfigurations": [
102+
{
103+
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
104+
"URI": {
105+
"Ref": "SC0855C491"
106+
}
107+
},
108+
{
109+
"Type": "SERVER_ROOT_CA_CERTIFICATE",
110+
"URI": {
111+
"Ref": "S509448A1"
112+
}
113+
}
114+
],
115+
"StartingPosition": "TRIM_HORIZON",
116+
"Topics": [
117+
"my-test-topic"
118+
]
119+
}
120+
},
121+
"S509448A1": {
122+
"Type": "AWS::SecretsManager::Secret",
123+
"Properties": {
124+
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\"}"
125+
},
126+
"UpdateReplacePolicy": "Delete",
127+
"DeletionPolicy": "Delete"
128+
},
129+
"SC0855C491": {
130+
"Type": "AWS::SecretsManager::Secret",
131+
"Properties": {
132+
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}"
133+
},
134+
"UpdateReplacePolicy": "Delete",
135+
"DeletionPolicy": "Delete"
136+
}
137+
}
138+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
{
2+
"version": "20.0.0",
3+
"artifacts": {
4+
"Tree": {
5+
"type": "cdk:tree",
6+
"properties": {
7+
"file": "tree.json"
8+
}
9+
},
10+
"lambda-event-source-kafka-self-managed": {
11+
"type": "aws:cloudformation:stack",
12+
"environment": "aws://unknown-account/unknown-region",
13+
"properties": {
14+
"templateFile": "lambda-event-source-kafka-self-managed.template.json",
15+
"validateOnSynth": false
16+
},
17+
"metadata": {
18+
"/lambda-event-source-kafka-self-managed/F/ServiceRole/Resource": [
19+
{
20+
"type": "aws:cdk:logicalId",
21+
"data": "FServiceRole3AC82EE1"
22+
}
23+
],
24+
"/lambda-event-source-kafka-self-managed/F/ServiceRole/DefaultPolicy/Resource": [
25+
{
26+
"type": "aws:cdk:logicalId",
27+
"data": "FServiceRoleDefaultPolicy17A19BFA"
28+
}
29+
],
30+
"/lambda-event-source-kafka-self-managed/F/Resource": [
31+
{
32+
"type": "aws:cdk:logicalId",
33+
"data": "FC4345940"
34+
}
35+
],
36+
"/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [
37+
{
38+
"type": "aws:cdk:logicalId",
39+
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798"
40+
}
41+
],
42+
"/lambda-event-source-kafka-self-managed/S/Resource": [
43+
{
44+
"type": "aws:cdk:logicalId",
45+
"data": "S509448A1"
46+
}
47+
],
48+
"/lambda-event-source-kafka-self-managed/SC/Resource": [
49+
{
50+
"type": "aws:cdk:logicalId",
51+
"data": "SC0855C491"
52+
}
53+
]
54+
},
55+
"displayName": "lambda-event-source-kafka-self-managed"
56+
},
57+
"LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F": {
58+
"type": "aws:cloudformation:stack",
59+
"environment": "aws://unknown-account/unknown-region",
60+
"properties": {
61+
"templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json",
62+
"validateOnSynth": false
63+
},
64+
"displayName": "LambdaEventSourceKafkaSelfManagedTest/DefaultTest/DeployAssert"
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)