-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Description
Following #7535, add an L2 construct for building AWS:KinesisAnalyticsV2::Applications that use Flink. A README follows to start the design discussion:
The @aws-cdk/aws-kinesisanalyticsv2 package provides constructs for creating Kinesis Data Analytics applications.
For more information, see the AWS documentation for Kinesis Data Analytics.
Creating a Flink Kinesis Analytics Application
To create a new Flink application, use the FlinkApplication construct.
import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';
const flinkApp = new ka.FlinkApplication(this, 'Application', {
code: ka.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
runtime: ka.FlinkRuntime.FLINK_1_11,
});The code property can use fromBucket as shown above to reference a jar file in s3 or fromAsset to reference a local file.
import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';
import * as path from 'path';
const flinkApp = new ka.FlinkApplication(this, 'Application', {
code: ka.ApplicationCode.fromAsset(path.join(__dirname, 'my-app.jar')),
// ...
});The propertyGroups property provides a way of passing arbitrary runtime configuration to your Flink application. You can use the aws-kinesisanalytics-runtime library to retrieve these properties.
import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';
import * as path from 'path';
const flinkApp = new ka.FlinkApplication(this, 'Application', {
// ...
propertyGroups: [
new ka.PropertyGroup('FlinkApplicationProperties', {
inputStreamName: 'my-input-kinesis-stream',
outputStreamName: 'my-output-kinesis-stream',
}),
],
});Flink applications also have specific properties for passing parameters when the Flink job starts. These include parameters for checkpointing, snapshotting, monitoring, and parallelism.
import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';
const flinkApp = new ka.FlinkApplication(this, 'Application', {
code: ka.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
runtime: ka.FlinkRuntime.FLINK_1_11,
checkpointingEnabled: true, // default is true
checkpointInterval: cdk.Duration.seconds(30), // default is 1 minute
minPausesBetweenCheckpoints: cdk.Duration.seconds(10), // default is 5 seconds
logLevel: ka.LogLevel.ERROR, // default is INFO
metricsLevel: ka.MetricsLevel.PARALLELISM, // default is APPLICATION
autoScalingEnabled: false, // default is true
parallelism: 32, // default is 1
parallelismPerKpu: 2, // default is 1
snapshotsEnabled: false, // default is true
});Creating an SQL Kinesis Analytics Application
Constructs for SQL applications are not implemented yet.
Notes for review:
Decisions:
- Flink and SQL applications share almost no properties so having separate ka.FlinkApplication and ka.SqlApplication constructs seems correct. I can't see why these would even share an abstract base class.
- I'm trying to focus on shipping the Flink construct before SQL since they are so different and I haven't used an SQL application.
- I unested lots of the configuration for discoverability. The Cfn naming is verbose (usually with prefixes) so collisions are unlikely.
This is a pretty good example of using CDK today to build a Flink app.
- 👋 I may be able to implement this feature request
-
⚠️ This feature might incur a breaking change
cc @iliapolo
This is a 🚀 Feature Request