feat(step-functions): add item reader path#30836
feat(step-functions): add item reader path#30836ChakshuGupta13 wants to merge 7 commits intoaws:mainfrom ChakshuGupta13:issue_29409
Conversation
aws-cdk-automation
left a comment
There was a problem hiding this comment.
The pull request linter has failed. See the aws-cdk-automation comment below for failure reasons. If you believe this pull request should receive an exemption, please comment and provide a justification.
A comment requesting an exemption should contain the text Exemption Request. Additionally, if clarification is needed add Clarification Request to a comment.
✅ Updated pull request passes all PRLinter validations. Dismissing previous PRLinter review.
GavinZZ
left a comment
There was a problem hiding this comment.
Generally looks good. Just some questions to help me understand.
| /** | ||
| * State input shall include fields which are given to `S3ObjectItemReaderPath`: | ||
| * { | ||
| * ... | ||
| * bucketName: 'my-bucket', | ||
| * prefix: 'objectNamePrefix', | ||
| * ... | ||
| * } | ||
| */ |
There was a problem hiding this comment.
Is this comment needed and correct? Shouldn't it bucketNamePath?
There was a problem hiding this comment.
- About necessity of comment, yes, I thought this construct's usage is not very straightforward, hence, developers may need to do try-and-error with state input if this comment is not present, hence, to save time of developers by avoiding try-and-error, we can convey how state input shall look like.
bucketNamePathis a parameter ofS3ObjectsItemReaderPathwhich contains key of state input field (herebucketName) which contains actual value ('my-bucket'), so, no, it shouldn't bebucketNamePathunless (bucketNamePath: '$. bucketNamePath'inS3ObjectsItemReaderPath), hence, yes, comment is correct. (Note: Same is verified via integration test implementation)
new sfn.S3ObjectsItemReaderPath({
bucketNamePath: '$.bucketName', /** ~ bucketNamePath: '$.X' */
...
}),
/**
* State Input:
* {
* bucketName: 'my-bucket', // ~ X: 'my-bucket'
* ...
* }
*/
There was a problem hiding this comment.
That's good to know. I also deployed this template and you're right. I think this is an important piece of information. I would like to request you to format the README file a bit clearer. Can we split this part into subheaders, one for static bucket and one for dynamic. In the dynamic add an (complete) example with the stack definition and state input.
There was a problem hiding this comment.
Is following content for README okay?
- Sub-header at level 4 heading were too small to make a difference and also text content of each heading shall be some construct like
DistributedMap, hence, didn't utilise further sub-headers instead explained usage sequentially. - Ask: If this format seem alright, then will revise PR and in upcoming PRs will add examples for remaining input source types as they are not currently present for dynamic use-case.
DistributedMap supports various input source types to determine an array to iterate over:
- JSON array from the state input
- By default,
DistributedMapassumes whole state input is an JSON array and iterates over it:
[ "item1", "item2" ]
const distributedMap = new sfn.DistributedMap(this, 'DistributedMap'); distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
- When input source is present at a specific path in state input, then
itemsPathcan be utilised to configure the iterator source.
{ "distributedMapItemList": [ "item1", "item2" ] }const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { itemsPath: '$.distributedMapItemList', }); distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
- By default,
- Objects in a S3 bucket and optional prefix
- When
DistributedMapis required to iterate over objects stored in a S3 bucket, then if required parameters likebucketare known while creatingStateMachine(statically or at compile time), then an object ofS3ObjectsItemReader(implementingIItemReader) can be passed toitemReaderto configure the iterator source.
my-bucket | +--item1 | +--otherItem | +--item2 | ...
import * as s3 from 'aws-cdk-lib/aws-s3'; const bucket = new s3.Bucket(this, 'Bucket', { bucketName: 'my-bucket', }); const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { itemReader: new sfn.S3ObjectsItemReader({ bucket, prefix: 'item', }), }); distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
- But if required parameters like
bucketNameare only known while starting execution ofStateMachine(dynamically or at run-time) via state input, then an object ofS3ObjectsItemReaderPath(implementingIItemReaderPath) can be passed toitemReaderPathto configure the iterator source.
{ "bucketName": "my-bucket", "prefix": "item" }const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { itemReaderPath: new sfn.S3ObjectsItemReaderPath({ bucketNamePath: '$.bucketName', prefixPath: '$.prefix', }), }); distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
- Both
itemReaderanditemReaderPathare mutually exclusive. For example, ifbucketis known at compile time butprefixis only known at run-time, then both cannot be used simultaneously.
- When
- JSON array in a JSON file stored in S3
- When
DistributedMapis required to iterate over a JSON array stored in a JSON file in a S3 bucket, then if required parameters likebucketare known while creatingStateMachine(statically or at compile time), then an object ofS3JsonItemReader(implementingIItemReader) can be passed toitemReaderto configure the iterator source.
my-bucket | +--input.json | ...
[ "item1", "item2" ]
import * as s3 from 'aws-cdk-lib/aws-s3'; const bucket = new s3.Bucket(this, 'Bucket', { bucketName: 'my-bucket', }); const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', { itemReader: new sfn.S3JsonItemReader({ bucket, key: 'input.json', }), }); distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
- When
- CSV file stored in S3
- S3 inventory manifest stored in S3
Map states in Distributed mode also support writing results of the iterator to an S3 bucket and optional prefix. Use a ResultWriter object provided via the optional resultWriter property to configure which S3 location iterator results will be written. The default behavior id resultWriter is omitted is to use the state output payload. However, if the iterator results are larger than the 256 kb limit for Step Functions payloads then the State Machine will fail.
import * as s3 from 'aws-cdk-lib/aws-s3';
// create a bucket
const bucket = new s3.Bucket(this, 'Bucket');
const distributedMap = new sfn.DistributedMap(this, 'Distributed Map State', {
resultWriter: new sfn.ResultWriter({
bucket: bucket,
prefix: 'my-prefix',
})
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State'));...
...-cdk-testing/framework-integ/test/aws-stepfunctions/test/integ.item-reader-path-s3-object.ts
Show resolved
Hide resolved
...-cdk-testing/framework-integ/test/aws-stepfunctions/test/integ.item-reader-path-s3-object.ts
Show resolved
Hide resolved
|
I think the code changes are good to go. I would like to request you to update the README clearer with subheaders and full examples. I'm happy to approve once it's updated. |
packages/aws-cdk-lib/aws-stepfunctions/lib/states/distributed-map.ts
Outdated
Show resolved
Hide resolved
| * | ||
| * @default - No itemReaderPath | ||
| */ | ||
| readonly itemReaderPath?: IItemReaderPath; |
There was a problem hiding this comment.
I am finding this approach very confusing to customers. Also we are duplicating every thing in 2 versions of almost the same classes, which means that in future for any updates to these classes, we are duplicating the work. I see that this may be some source of errors.
The main difference I see between the ItemReader and ItemReaderPath is for the first one, we automatically add the policy to access the S3 bucket, and the other one is we could not add this policy. We need to think about a better design approach.
There was a problem hiding this comment.
one of the options is to deprecate the current ItemReader class, and add a new one, but I also do not like the idea of keep deprecating existing properties for new properties and ask customers to update their code.
There was a problem hiding this comment.
Following is intentional, with IItemReaderPath, during compile time, we will not have information about bucket, hence, we can't add any permission during compile time, therefore, this action needs to be taken separately by caller who starts StateMachine execution with bucketName. Even, if we consider adding * policy statement by default here, then it will violate minimum access to resource guideline.
The main difference I see between the ItemReader and ItemReaderPath is for the first one, we automatically add the policy to access the S3 bucket, and the other one is we could not add this policy.
Following is precisely issue with current codebase that it is not extensible to allow changes and most of such options are already considered to decent extent (refer PR description) - and among those, this approach was least troublesome.
We need to think about a better design approach.
one of the options is to deprecate the current ItemReader class, and add a new one, but I also do not like the idea of keep deprecating existing properties for new properties and ask customers to update their code.
If you have any other alternatives, please feel free to share - happy to ponder over and utilise them. This is exactly why this PR is raised with minimum number of constructs i.e. to gain consensus.
There was a problem hiding this comment.
Similar duplicate parameters are also utilised in different places (reference) - although they have simple types, not full-fledged classes (or interfaces)
/**
* Error code used to represent this failure
*
* @default - No error code
*/
readonly error?: string;
/**
* JsonPath expression to select part of the state to be the error to this state.
*
* You can also use an intrinsic function that returns a string to specify this property.
* The allowed functions include States.Format, States.JsonToString, States.ArrayGetItem, States.Base64Encode, States.Base64Decode, States.Hash, and States.UUID.
*
* @default - No error path
*/
readonly errorPath?: string;
I am finding this approach very confusing to customers. Also we are duplicating every thing in 2 versions of almost the same classes
There was a problem hiding this comment.
@moelasmar thanks for a second review and provide the feedback. I do agree that a number of resources are duplicate between IItemReader and IIteamReaderPath. This is not the best design but as mentioned earlier, we can't directly modify IItemReader due to breaking changes. Specifically IItemReader takes a required parameter bucket: IBucket and this doesn't work for dynamic parameter handling.
I agree this is not the best practice but there are many places in CDK that has similar issues. Would like you get your thought on the next step. One thing I can think of is to create a base interface that defines the shared parameters and IItemReader and IItemReaderPath to extend the interface and has its own definitions from there.
There was a problem hiding this comment.
I tried to think about base interface as we discussed briefly (re:Slack & WorkLog) but then got stuck as to how will that be better useful? We will still need different implementations (both for static and dynamic) for each reader type (object, JSON, CSV, manifest etc.)
One thing I can think of is to create a base interface that defines the shared parameters and IItemReader and IItemReaderPath to extend the interface and has its own definitions from there.
So, can you please share if you foresee any benefit with base class (and if possible, can you please explain "how" like an example - that will help clarify few thoughts for me).
Secondly, I was thinking about examples like following:
/**
* Error code used to represent this failure
*
* @default - No error code
*/
readonly error?: string;
/**
* JsonPath expression to select part of the state to be the error to this state.
*
* You can also use an intrinsic function that returns a string to specify this property.
* The allowed functions include States.Format, States.JsonToString, States.ArrayGetItem, States.Base64Encode, States.Base64Decode, States.Hash, and States.UUID.
*
* @default - No error path
*/
readonly errorPath?: string;
In such example, we always have two parameters (one for static and one for dynamic), so, now if IItemReader and its implementations have X number of static parameters (e.g. bucket, prefix, key etc.), then to have dynamic alternatives, we will have same number of parameters (e.g. bucketNamePath, prefixPath, keyPath etc.).
Also:
rendermethod for both use-cases will differ (e.g. for dynamicParameterswill be suffixed with.$)- As mentioned earlier, for dynamic use-case, we cannot have
policyStatementsdefined statically since required information is unavailable at that time, hence, that will also differ.
Therefore, only thing common between static and dynamic alternatives that can be de-duplicated is resource which is initialised to an Arn in constructor, hence, by making base class (or trying to generalise these classes), we can only avoid duplication of resource at max (since, we need do parameters like bucketNamePath, prefixPath, keyPath etc.)
Ideal solution would be to have classes like following:
export interface IItemReader {
/** bucketName and bucketNamePath mutually exclusive*/
readonly bucketName?: string;
readonly bucketNamePath?: string;
readonly resource: string;
readonly maxItems?: number;
render(): any;
}
export class S3ObjectsItemReader implements IItemReader {
readonly bucketName?: string;
readonly bucketNamePath?: string;
/** prefix and prefixPath mutually exclusive*/
readonly prefix?: string;
readonly prefixPath?: string;
readonly resource: string;
readonly maxItems?: number;
constructor(props...) {
this.bucketName = props.bucketName;
this.bucketNamePath = props.bucketNamePath;
this.prefix = props.prefix;
this.prefixPath = props.prefixPath;
this.maxItems = props.maxItems;
this.resource = Arn.format({
region: '',
account: '',
partition: Aws.PARTITION,
service: 'states',
resource: 's3',
resourceName: 'listObjectsV2',
arnFormat: ArnFormat.COLON_RESOURCE_NAME,
});
}
public render(): any {
return FieldUtils.renderObject({
Resource: this.resource,
...(this.maxItems && { ReaderConfig: { MaxItems: this.maxItems } }),
Parameters: {
... (this.bucketName && { Bucket: this.bucketName } ),
... (this.bucketNamePath && { 'Bucket.$': this.bucketNamePath } ),
...(this.prefix && { Prefix: this.prefix }),
...(this.prefixPath && { 'Prefix.$': this.prefixPath }),
},
});
}
}
This way we can even have partial static parameters and rest dynamic - example:
const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemReaderPath: new sfn.S3ObjectsItemReader({
bucketName: 'my-bucket',
prefixPath: '$.prefix',
}),
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));As much as I like this freedom to choose which parameter needs to be dynamic, this will need breaking changes.
EDIT: Forgot that validations (i.e. mutually exclusiveness above) can only be done for States - not state-less (node-less) constructs like implementations of IItemReader, so, above solution may not be feasible also (unless these are made actual Construct which poses different problems altogether like assigning unique logical IDs etc.) - leaving us with two different classes only.
There was a problem hiding this comment.
@moelasmar @GavinZZ
my colleague discovered following way with which we can provide dynamic parameters (except for Bucket because it is currently of type IBucket instead of string):
const myBucket = new Bucket(stack, 'MyBucket', ...);
const distributedMap = new sfn.DistributedMap(this, 'DistributedMap', {
itemReader: new sfn.S3ObjectsItemReader({
bucket: myBucket,
prefix: JsonPath.stringAt('$.prefix'),
}),
});
distributedMap.itemProcessor(new sfn.Pass(this, 'Pass'));
This code snippet produces following cdk.out template:
...
"Parameters\":{\"Bucket\":\"",
{
"Ref": "MyBucket..."
},
"\",\"Prefix.$\":\"$.prefix\"}}}}}"
]
...
Not sure why this works but since it works, this PR may not be needed for the said half issue (to be able to pass dynamic file name) but for other half issue i.e. how to also allow Bucket to be dynamic? For that, the ideal would be breaking change where bucket: IBucket is changed to bucket: string.
There was a problem hiding this comment.
Nice, but we still need to fix the issue where it only takes IBucket. I don't find much values actually to move the common parameters into a base interface and have IItemReader and IItemReaderPath to extend it.
@moelasmar would you mind take another look and provide some feedback if you have any ideas on a non-breaking change but don't have to duplicate the same properties.
| @@ -0,0 +1,118 @@ | |||
| import { Arn, ArnFormat, Aws } from '../../../../core'; | |||
There was a problem hiding this comment.
why you did not do similar thing for S3ManifestItemReader, S3CsvItemReader, and S3JsonItemReader
There was a problem hiding this comment.
They will be taken in follow up PRs for following reasons:
- Confirmation is first needed whether these type of changes are acceptable.
- It is not good to raise one huge PR with all changes when that huge PR can be easily broken down into smaller chunks.
| }); | ||
| map.itemProcessor(new stepfunctions.Pass(stack, 'Pass State')); | ||
|
|
||
| //THEN |
There was a problem hiding this comment.
should we also validate that there is no policy automatically added for this bucket.
There was a problem hiding this comment.
Why? There is no policyStatement parameter with IItemReaderPath unlike IItemReader.
By this logic, we will start checking for absence of everything everywhere just because we know that something similar existed somewhere else.
Second, no real bucket exists for this test - we are just passing state input path which will convey bucketName dynamically, hence, checking whether permissions are added will not be possible anyway.
|
Pinging @moelasmar for a second review for any blocking feedbacks. |
moelasmar
left a comment
There was a problem hiding this comment.
@ChakshuGupta13 .. sorry for the late reply. My recommendation is to update IItemReader interface in packages/aws-cdk-lib/aws-stepfunctions/lib/states/distributed-map/item-reader.ts to make the bucket property as optional, and you can add the new property bucketNamePath as another optional property, and you can add the required validations to make sure that either one of them is used.
As based on this link, changing a property from required to optional is the type of accepted weakening and it is not a breaking change
Thanks for checking @moelasmar. Before moving forward, I would also like to bring your attention to following comment:
|
|
thanks @ChakshuGupta13 for your reply. I agree on adding a new property |
|
This PR has been in the CHANGES REQUESTED state for 3 weeks, and looks abandoned. To keep this PR from being closed, please continue work on it. If not, it will automatically be closed in a week. |
**Why are these changes required?** - For `DistributedMap` state of StepFunctions, `IItemReader` currently only allows S3 bucket as input source to be declared statically in CDK. - In other words, current CDK implementation only caters to static use-case where we know either `bucket` or `bucketName` (from which we can create `IBucket`) and pass it to `IItemReader`. - Whereas via AWS Console, if we create `DistributedMap` manually, then we can also convey S3 source dynamically using State Input / JsonPath. - In other words, for dynamic use-case, we will neither have `bucket` nor `bucketName` i.e. we only know state input variable which will convey `bucketName` e.g. `$.bucketName`. - So, if we want to use `IItemReader` for dynamic use case also, then we will: - (1) need to make `bucket: IBucket` optional (which will be breaking - how? e.g. if some dev is currently accessing `bucket` via `reader.bucket` then dev now needs to add check for `undefined`) - (2) then add another optional fields to convey state input variable names (e.g. $.bucketName $.key $.prefix) - Therefore, to avoid introducing breaking change, we can follow `*Path` convention of StepFunctions and introduce `IItemReaderPath` for dynamic use-case. (closes #29409) **What changes are being made?** - Add `IItemReaderPath` interface (and its pros interface) - Add `S3ObjectsItemReaderPath` as one of many concrete classes of `IItemReaderPath` - this class also helps with unit-testing and integration-testing. - Modify `index` to export new constructs - Modify `DistributedMap` (and its props) to allow `itemReaderPath?` (which will be mutually exclusive with `itemReader?` and `itemsPath?`) and utilise it for render **How are these changes tested?** - Via new unit-tests for `DistributedMap` (via `yarn build+test`) - Via new integration test for `S3ObjectsItemReaderPath` (with snapshot created) - Via `yarn build --directory test/aws-stepfunctions/test && yarn integ test/aws-stepfunctions/test/integ.item-reader-path-s3-object.js && yarn integ-runner --update-on-failed` - Verified expected step function execution result during snapshot creation
- `IItemReaderPath` is being introduced to allow using dynamic S3 source for `DistributedMap`, hence, we will need to convey about it and its usage example via README. - Add when and how `IItemReaderPath` shall be used. - Previewed README changes.
- In the example of `S3ObjectsItemReaderPath`, need to use `prefixPath` instead of `prefix`.
Following errors are detected by CodeBuild logs, hence, changes resolve these.
```
aws-cdk-lib.aws_stepfunctions-README-L589.ts:34:1 - error TS2304: Cannot find name 'distributedMap'.
34 distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State'));
~~~~~~~~~~~~~~
aws-cdk-lib.aws_stepfunctions-README-L589.ts:46:3 - error TS2739: Type 'S3ObjectsItemReaderPath' is missing the following properties from type 'IItemReader': bucket, providePolicyStatements
46 itemReader: new sfn.S3ObjectsItemReaderPath({
~~~~~~~~~~
aws-cdk-lib.aws_stepfunctions-README-L589.ts:48:5 - error TS2353: Object literal may only specify known properties, and 'prefix' does not exist in type 'S3ObjectsItemReaderPathProps'.
48 prefix: '$.prefix',
~~~~~~
aws-cdk-lib.aws_stepfunctions-README-L589.ts:55:1 - error TS2304: Cannot find name 'distributedMap'.
55 distributedMap.itemProcessor(new sfn.Pass(this, 'Pass State'));
~~~~~~~~~~~~~~
[jsii-rosetta] [WARN] 4 diagnostics from assemblies with 'strict' mode on (and 44 more non-strict diagnostics)
```
…nation **Reason for this change** - current structure for distributed map's input source content does not convey properly how to configure input source as per requirements (also it lacks examples). **Description of changes** - re-structure distributed map's input source explanation to convey how to configure input source as per requirements with examples. - nit: also addressed unrelated issue with README **Description of how you validated changes** - `yarn build+extract`
**Why** - the first line of comment is used as the description shown in the properties table, hence, repeating the property name does not convey much **What** - replace first line of comment from property name to description of property **Tested by** - `yarn build`
Pull request has been modified.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
|
@GavinZZ @moelasmar Created separate PR #31619 since strategy is changed. Can you please review it? |
|
Comments on closed issues and PRs are hard for our team to see. |

Issue # (if applicable)
Fixes #29409
Reason for this change
DistributedMapstate of StepFunctions,IItemReadercurrently only allows S3 bucket as input source to be declared statically in CDK.bucketorbucketName(from which we can createIBucket) and pass it toIItemReader.DistributedMapmanually, then we can also convey S3 source dynamically using State Input / JsonPath.bucketnorbucketNamei.e. we only know state input variable which will conveybucketNamee.g.$.bucketName.IItemReaderfor dynamic use case also, then we will:bucket: IBucketoptional (which will be breaking - how? e.g. if some dev is currently accessingbucketviareader.bucketthen dev now needs to add check forundefined)$.bucketName,$.key,$.prefix)*Pathconvention of StepFunctions and introduceIItemReaderPathfor dynamic use-case.IItemReaderPathis being introduced to allow using dynamic S3 source forDistributedMap, hence, we will also need to convey about it and its usage example via README.Description of changes
IItemReaderPathinterface (and its pros interface)S3ObjectsItemReaderPathas one of many concrete classes ofIItemReaderPath- this class also helps with unit-testing and integration-testing.indexto export new constructsDistributedMap(and its props) to allowitemReaderPath?(which will be mutually exclusive withitemReader?anditemsPath?) and utilise it for renderIItemReaderPathshall be used in README.Description of how you validated changes
DistributedMap(viayarn build+test)S3ObjectsItemReaderPath(with snapshot created)yarn build --directory test/aws-stepfunctions/test && yarn integ test/aws-stepfunctions/test/integ.item-reader-path-s3-object.js && yarn integ-runner --update-on-failedChecklist
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license