fix(plugin-delta): Fix delta file path resolution by preserving full S3 URI#26826
Closed
ShahimSharafudeen wants to merge 1 commit into
Closed
fix(plugin-delta): Fix delta file path resolution by preserving full S3 URI#26826ShahimSharafudeen wants to merge 1 commit into
ShahimSharafudeen wants to merge 1 commit into
Conversation
Contributor
Reviewer's guide (collapsed on small PRs)Reviewer's GuideThis PR fixes incorrect handling of Delta Lake file paths for S3-backed tables by preserving the full S3 URI instead of converting it to a path, ensuring Presto passes valid s3a:// URIs through partition pruning and split generation. Sequence diagram for Delta split generation with preserved S3 URIsequenceDiagram
actor UserQuery
participant PrestoCoordinator
participant DeltaSplitManager
participant DeltaExpressionUtils
participant HadoopFileSystem
UserQuery->>PrestoCoordinator: submit SQL query on Delta table
PrestoCoordinator->>DeltaSplitManager: getNextBatch(partitionHandle, table, splitCount)
DeltaSplitManager->>DeltaSplitManager: read addFileStatus.getPath()
DeltaSplitManager->>DeltaSplitManager: filePath = URI.create(path).toString()
DeltaSplitManager->>HadoopFileSystem: open(filePath s3a_uri)
HadoopFileSystem-->>DeltaSplitManager: input stream to Parquet file
PrestoCoordinator->>DeltaExpressionUtils: evaluatePartitionPredicate(row, partitionPredicate)
DeltaExpressionUtils->>DeltaExpressionUtils: addFileStatus = InternalScanFileUtils.getAddFileStatus(row)
DeltaExpressionUtils->>DeltaExpressionUtils: filePath = URI.create(addFileStatus.getPath()).toString()
DeltaExpressionUtils-->>PrestoCoordinator: domain for partition pruning using full s3a_uri
PrestoCoordinator-->>UserQuery: return query results without FileNotFoundException
Updated class diagram for Delta path handling in DeltaSplitManager and DeltaExpressionUtilsclassDiagram
class DeltaSplitManager {
+CompletableFuture~ConnectorSplitBatch~ getNextBatch(ConnectorPartitionHandle partitionHandle, ConnectorTableLayoutHandle layoutHandle, List~ConnectorSplit~ splits, int maxSize)
-ConnectorId connectorId
-DeltaTable deltaTable
-DeltaMetadata deltaMetadata
}
class DeltaExpressionUtils {
<<utility>>
-static boolean evaluatePartitionPredicate(DeltaColumnHandle partitionColumn, TupleDomain~DeltaColumnHandle~ partitionPredicate, TypeManager typeManager, Object row)
}
class InternalScanFileUtils {
<<utility>>
+Map~String,String~ getPartitionValues(Object row)
+AddFileStatus getAddFileStatus(Object row)
}
class AddFileStatus {
+String getPath()
+long getSize()
}
class Domain {
+static Domain getDomain(DeltaColumnHandle column, String partitionValue, TypeManager typeManager, String filePath)
}
class DeltaColumnHandle {
+String getName()
}
class TypeManager
class ConnectorSplitBatch
class ConnectorSplit
class ConnectorPartitionHandle
class ConnectorTableLayoutHandle
class ConnectorId
class DeltaTable {
+String getSchemaName()
+String getTableName()
}
class DeltaMetadata
class String
DeltaSplitManager --> DeltaTable : uses
DeltaSplitManager --> DeltaMetadata : uses
DeltaSplitManager --> AddFileStatus : uses getPath and getSize
DeltaSplitManager --> ConnectorSplitBatch : returns
DeltaSplitManager --> ConnectorSplit : creates
DeltaSplitManager --> ConnectorId : uses
DeltaExpressionUtils --> InternalScanFileUtils : uses
DeltaExpressionUtils --> AddFileStatus : uses getPath
DeltaExpressionUtils --> DeltaColumnHandle : uses
DeltaExpressionUtils --> Domain : computes
DeltaExpressionUtils --> TypeManager : uses
InternalScanFileUtils --> AddFileStatus : returns
AddFileStatus --> String : path preserved as full s3a_uri
Domain --> String : filePath parameter is full s3a_uri
DeltaSplitManager --> String : filePath parameter is full s3a_uri for splits
DeltaExpressionUtils --> String : filePath parameter is full s3a_uri for domains
Flow diagram for Delta file path handling before and after fixflowchart LR
A["Delta addFileStatus.getPath() returns s3a://bucket/path/file.parquet"] --> B{Old vs new handling}
B -->|Old behavior| C["URI.create(path).getPath()"]
C --> D["Produces /path/file.parquet (scheme and bucket stripped)"]
D --> E["Hadoop LocalFileSystem attempts to open /path/file.parquet"]
E --> F["FileNotFoundException: File does not exist"]
B -->|New behavior| G["URI.create(path).toString()"]
G --> H["Preserves s3a://bucket/path/file.parquet"]
H --> I["S3 filesystem opens s3a://bucket/path/file.parquet"]
I --> J["Presto reads Parquet file successfully"]
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
daf55ae to
1b99a3c
Compare
1b99a3c to
f9e190d
Compare
Member
Contributor
Author
Thanks @agrawalreetika for the information. So closing this PR. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Problem
Presto Delta queries were failing with File does not exist exceptions even though the Parquet files were present in S3. The root cause was incorrect URI handling:
URI.create(...).getPath() stripped the scheme (s3a) and bucket name, resulting in invalid file paths.
Root Cause
Object-store URIs (S3) must retain their scheme and authority. Using URI.getPath() converts a fully qualified URI into a relative filesystem path, which is not valid for Presto’s S3 filesystem.
Root Cause OSS PR : #26397
Fix
The code now preserves the full URI by using URI.toString() (or by passing the original path directly), ensuring that the correct s3a://bucket/... path is passed to the filesystem layer and enabling support for reading tables with spaces in S3 locations or partition values.
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.