Skip to content

aws-kinesisanalyticsv2: L2 construct for Flink applications #12407

@mitchlloyd

Description

@mitchlloyd

Following #7535, add an L2 construct for building AWS:KinesisAnalyticsV2::Applications that use Flink. A README follows to start the design discussion:


The @aws-cdk/aws-kinesisanalyticsv2 package provides constructs for creating Kinesis Data Analytics applications.

For more information, see the AWS documentation for Kinesis Data Analytics.

Creating a Flink Kinesis Analytics Application

To create a new Flink application, use the FlinkApplication construct.

import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';

const flinkApp = new ka.FlinkApplication(this, 'Application', {
  code: ka.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
  runtime: ka.FlinkRuntime.FLINK_1_11,
});

The code property can use fromBucket as shown above to reference a jar file in s3 or fromAsset to reference a local file.

import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';
import * as path from 'path';

const flinkApp = new ka.FlinkApplication(this, 'Application', {
  code: ka.ApplicationCode.fromAsset(path.join(__dirname, 'my-app.jar')),
  // ...
});

The propertyGroups property provides a way of passing arbitrary runtime configuration to your Flink application. You can use the aws-kinesisanalytics-runtime library to retrieve these properties.

import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';
import * as path from 'path';

const flinkApp = new ka.FlinkApplication(this, 'Application', {
  // ...
  propertyGroups: [
    new ka.PropertyGroup('FlinkApplicationProperties', {
      inputStreamName: 'my-input-kinesis-stream',
      outputStreamName: 'my-output-kinesis-stream',
    }),
  ],
});

Flink applications also have specific properties for passing parameters when the Flink job starts. These include parameters for checkpointing, snapshotting, monitoring, and parallelism.

import * as ka from '@aws-cdk/aws-kinesisanalyticsv2';

const flinkApp = new ka.FlinkApplication(this, 'Application', {
  code: ka.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
  runtime: ka.FlinkRuntime.FLINK_1_11,
  checkpointingEnabled: true, // default is true
  checkpointInterval: cdk.Duration.seconds(30), // default is 1 minute
  minPausesBetweenCheckpoints: cdk.Duration.seconds(10), // default is 5 seconds
  logLevel: ka.LogLevel.ERROR, // default is INFO
  metricsLevel: ka.MetricsLevel.PARALLELISM, // default is APPLICATION
  autoScalingEnabled: false, // default is true
  parallelism: 32, // default is 1
  parallelismPerKpu: 2, // default is 1
  snapshotsEnabled: false, // default is true
});

Creating an SQL Kinesis Analytics Application

Constructs for SQL applications are not implemented yet.


Notes for review:

Decisions:

  1. Flink and SQL applications share almost no properties so having separate ka.FlinkApplication and ka.SqlApplication constructs seems correct. I can't see why these would even share an abstract base class.
  2. I'm trying to focus on shipping the Flink construct before SQL since they are so different and I haven't used an SQL application.
  3. I unested lots of the configuration for discoverability. The Cfn naming is verbose (usually with prefixes) so collisions are unlikely.

This is a pretty good example of using CDK today to build a Flink app.

  • 👋 I may be able to implement this feature request
  • ⚠️ This feature might incur a breaking change

cc @iliapolo


This is a 🚀 Feature Request

Metadata

Metadata

Assignees

Labels

@aws-cdk/aws-kinesisanalyticsRelated to Amazon Kinesis Data Analyticseffort/largeLarge work item – several weeks of effortfeature-requestA feature should be added or improved.in-progressThis issue is being actively worked on.p1

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions