Add option to process only metadata of objects in S3 scan mode#5470
Add option to process only metadata of objects in S3 scan mode#5470dlvenable merged 6 commits intoopensearch-project:mainfrom
Conversation
Signed-off-by: Kondaka <krishkdk@amazon.com>
dlvenable
left a comment
There was a problem hiding this comment.
This can be an interesting feature. We can make improvements to make this more generic and move the S3 source in a better direction for similar use-cases.
Also, we should make some changes to other configurations when we are not loading data. For example, we don't need a codec and shouldn't even set it (it would be misleading). And we can also disable compression. There might be a few others as well.
| @JsonProperty("end_time") | ||
| private LocalDateTime endTime; | ||
|
|
||
| @JsonProperty("metadata_only") |
There was a problem hiding this comment.
I think we should make this more generic by use of an enum. There may be different configurations that users want:
- Object data only (what we have now)
- Object data and object metadata
- Metadata only
Also, this option should exist on the high level configuration for s3. It is applicable for both SQS and S3 scan.
There was a problem hiding this comment.
We do not have "object data only" we already have object data and metadata (bucket and key) but not complete metadata. So, not sure how we want to handle that
There was a problem hiding this comment.
Also "metadata only" really doesn't make sense for s3-sqs when we have sqs source. I guess we could still do it.
| * | ||
| * @throws IOException exception is thrown every time because this is not supported | ||
| */ | ||
| default void processS3ObjectMetadata(final S3ObjectReference s3ObjectReference, |
There was a problem hiding this comment.
Rather than adding another method and having more conditionals in all the code, we can simplify this by implementing a new S3ObjectHandler which handles metadata-only.
Even better, I think we could have a different abstraction within the S3ObjectWorker to handle this differently. This would help us share metric reporting.
| acknowledgementSet.add(event); | ||
| } | ||
| AtomicLong lastCheckpointTime = new AtomicLong(System.currentTimeMillis()); | ||
| final AtomicInteger saveStateCounter = new AtomicInteger(); |
| LOG.warn("Failed to get metadata for S3 object: s3ObjectReference={}.", s3ObjectReference); | ||
| s3ObjectPluginMetrics.getS3ObjectNoRecordsFound().increment(); | ||
| } | ||
| s3ObjectPluginMetrics.getS3ObjectSizeSummary().record(s3ObjectSize); |
There was a problem hiding this comment.
I'm not sure it makes sense to include size metrics for metadata. We aren't processing the actual data.
| final SourceCoordinator<S3SourceProgressState> sourceCoordinator, | ||
| final String partitionKey) throws IOException { | ||
| final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference, bucketOwnerProvider, s3ObjectPluginMetrics); | ||
| final String BUCKET = "bucket"; |
There was a problem hiding this comment.
These should all be private static fields.
Also, you can clarify the names with _KEY. e.g. BUCKET_KEY.
There was a problem hiding this comment.
I thought about it and I felt KEY_KEY would not be good. So, decided to not use _KEY
| } catch (final Exception e) { | ||
| LOG.error("Failed writing S3 objects to buffer.", e); | ||
| } | ||
| if (acknowledgementSet != null && sourceCoordinator != null && partitionKey != null && |
There was a problem hiding this comment.
This seems a very leaky abstraction. Why does this code need to care about all these details? I see we have this in the current code and it is producing duplicate code and unclear responsibilities. We can refactor this with a simpler Consumer of some sort to finish the record. Let the S3 scan take care of this.
Signed-off-by: Kondaka <krishkdk@amazon.com>
| private boolean metadata_only = false; | ||
|
|
||
| @JsonProperty("buckets") | ||
| @Valid | ||
| private List<S3ScanBucketOptions> buckets; |
There was a problem hiding this comment.
This "metadata_only" is at the S3 Scan level so it applies to all the buckets under this scan. This doesn't support listing several S3 buckets with different metadata_only options under the same Scan as discussed earlier? The first bucket scans only for meta data, but the second bucket scans for content.
So is it possible to define this "metadata_only" at the bucket level?
buckets: # scans two buckets
- bucket:
metadata_only: true
name: "ml-input-bucket"
filter:
include_prefix:
- sagemaker/sagemaker_djl_batch_input
- bucket:
name: "ml-output-bucket"
filter:
include_prefix:
- sagemaker/output/sagemaker_djl_batch_input
There was a problem hiding this comment.
@Zhangxunmt Good point. I will look into this.
There was a problem hiding this comment.
@dlvenable The current approach will not work for bucket level option to support metadata_only. I think I have to go back to previous implementation (but with no code duplication). Let me know what you think.
There was a problem hiding this comment.
We can probably move the new configuration into the bucket and keep it for scan for the time being. But, I'd still have the code operate in such a way that it could work with sqs.
Signed-off-by: Kondaka <krishkdk@amazon.com>
| when(pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME)).thenReturn(s3DeleteFailedCounter); | ||
| S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3Client, pluginMetrics); | ||
|
|
||
| //when(s3ScanScanOptions.getBuckets()).thenReturn(List.of(s3ScanBucketOptions)); |
| buffer = mock(Buffer.class); | ||
| recordsReceived = 0; | ||
|
|
||
| //s3ScanBucketOptions = mock(S3ScanBucketOptions.class); |
| @JsonProperty("end_time") | ||
| private LocalDateTime endTime; | ||
|
|
||
| @JsonProperty("metadata_only") |
There was a problem hiding this comment.
Is this actually getting used anywhere in the code?
Signed-off-by: Kondaka <krishkdk@amazon.com>
|
|
||
| classpath = sourceSets.integrationTest.runtimeClasspath | ||
| systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket') | ||
| systemProperty 'tests.s3source.bucket2', System.getProperty('tests.s3source.bucket2') |
There was a problem hiding this comment.
Why do we need another bucket? Let's use paths within the bucket to avoid multiple resources. In order to use another bucket, we'd need to create new resources in AWS for the testing account.
|
|
||
| @JsonCreator | ||
| public static S3DataSelection fromOptionValue(final String name) { | ||
| return S3_DATA_SELECTION_MAP.get(name.toLowerCase()); |
There was a problem hiding this comment.
Don't force lowercase. This allows for variable casing. Just accept the expected strings only - e.g. data_only.
return S3_DATA_SELECTION_MAP.get(name);
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
…earch-project#5470) Add option to process only metadata of objects in S3 scan mode Signed-off-by: Kondaka <krishkdk@amazon.com>
…earch-project#5470) Add option to process only metadata of objects in S3 scan mode Signed-off-by: Kondaka <krishkdk@amazon.com> Signed-off-by: George Chen <qchea@amazon.com>
…earch-project#5470) Add option to process only metadata of objects in S3 scan mode Signed-off-by: Kondaka <krishkdk@amazon.com>
…earch-project#5470) Add option to process only metadata of objects in S3 scan mode Signed-off-by: Kondaka <krishkdk@amazon.com>
…earch-project#5470) Add option to process only metadata of objects in S3 scan mode Signed-off-by: Kondaka <krishkdk@amazon.com>
…earch-project#5470) Add option to process only metadata of objects in S3 scan mode Signed-off-by: Kondaka <krishkdk@amazon.com> Signed-off-by: mamol27 <mamol27@yandex.ru>
Description
Add option to process only metadata of objects in S3 scan mode
Issues Resolved
Resolves #5433
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.