Skip to content

Commit 0963485

Browse files
authored
Support WASB scheme in ADLSFileIO (#11504)
1 parent 071d9e2 commit 0963485

5 files changed

Lines changed: 50 additions & 16 deletions

File tree

azure/src/main/java/org/apache/iceberg/azure/AzureProperties.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,17 @@ public Optional<Long> adlsWriteBlockSize() {
7777
return Optional.ofNullable(adlsWriteBlockSize);
7878
}
7979

80+
/**
81+
* Applies configuration to the {@link DataLakeFileSystemClientBuilder} to provide the endpoint
82+
* and credentials required to create an instance of the client.
83+
*
84+
* <p>The default endpoint is constructed in the form {@code
85+
* https://{account}.dfs.core.windows.net} and default credentials are provided via the {@link
86+
* com.azure.identity.DefaultAzureCredential}.
87+
*
88+
* @param account the service account name
89+
* @param builder the builder instance
90+
*/
8091
public void applyClientConfiguration(String account, DataLakeFileSystemClientBuilder builder) {
8192
String sasToken = adlsSasTokens.get(account);
8293
if (sasToken != null && !sasToken.isEmpty()) {
@@ -93,7 +104,7 @@ public void applyClientConfiguration(String account, DataLakeFileSystemClientBui
93104
if (connectionString != null && !connectionString.isEmpty()) {
94105
builder.endpoint(connectionString);
95106
} else {
96-
builder.endpoint("https://" + account);
107+
builder.endpoint("https://" + account + ".dfs.core.windows.net");
97108
}
98109
}
99110
}

azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,21 @@
3030
*
3131
* <p>Locations follow a URI like structure to identify resources
3232
*
33-
* <pre>{@code abfs[s]://[<container>@]<storage account host>/<file path>}</pre>
33+
* <pre>{@code abfs[s]://[<container>@]<storageAccount>.dfs.core.windows.net/<path>}</pre>
34+
*
35+
* or
36+
*
37+
* <pre>{@code wasb[s]://<container>@<storageAccount>.blob.core.windows.net/<path>}</pre>
38+
*
39+
* For compatibility, locations using the wasb scheme are also accepted but will use the Azure Data
40+
* Lake Storage Gen2 REST APIs instead of the Blob Storage REST APIs.
3441
*
3542
* <p>See <a
3643
* href="https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax">Azure
3744
* Data Lake Storage URI</a>
3845
*/
3946
class ADLSLocation {
40-
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");
47+
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$");
4148

4249
private final String storageAccount;
4350
private final String container;
@@ -55,17 +62,18 @@ class ADLSLocation {
5562

5663
ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location);
5764

58-
String authority = matcher.group(1);
65+
String authority = matcher.group(2);
5966
String[] parts = authority.split("@", -1);
6067
if (parts.length > 1) {
6168
this.container = parts[0];
62-
this.storageAccount = parts[1];
69+
String host = parts[1];
70+
this.storageAccount = host.split("\\.", -1)[0];
6371
} else {
6472
this.container = null;
65-
this.storageAccount = authority;
73+
this.storageAccount = authority.split("\\.", -1)[0];
6674
}
6775

68-
String uriPath = matcher.group(2);
76+
String uriPath = matcher.group(3);
6977
this.path = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
7078
}
7179

azure/src/test/java/org/apache/iceberg/azure/AzurePropertiesTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,13 @@ public void testNoSasToken() {
9797
@Test
9898
public void testWithConnectionString() {
9999
AzureProperties props =
100-
new AzureProperties(ImmutableMap.of("adls.connection-string.account1", "http://endpoint"));
100+
new AzureProperties(
101+
ImmutableMap.of(
102+
"adls.connection-string.account1", "https://account1.dfs.core.usgovcloudapi.net"));
101103

102104
DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
103105
props.applyClientConfiguration("account1", clientBuilder);
104-
verify(clientBuilder).endpoint("http://endpoint");
106+
verify(clientBuilder).endpoint("https://account1.dfs.core.usgovcloudapi.net");
105107
}
106108

107109
@Test
@@ -111,7 +113,7 @@ public void testNoMatchingConnectionString() {
111113

112114
DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
113115
props.applyClientConfiguration("account1", clientBuilder);
114-
verify(clientBuilder).endpoint("https://account1");
116+
verify(clientBuilder).endpoint("https://account1.dfs.core.windows.net");
115117
}
116118

117119
@Test
@@ -120,7 +122,7 @@ public void testNoConnectionString() {
120122

121123
DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
122124
props.applyClientConfiguration("account", clientBuilder);
123-
verify(clientBuilder).endpoint("https://account");
125+
verify(clientBuilder).endpoint("https://account.dfs.core.windows.net");
124126
}
125127

126128
@Test

azure/src/test/java/org/apache/iceberg/azure/adlsv2/ADLSLocationTest.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,18 @@ public void testLocationParsing(String scheme) {
3333
String p1 = scheme + "://container@account.dfs.core.windows.net/path/to/file";
3434
ADLSLocation location = new ADLSLocation(p1);
3535

36-
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
36+
assertThat(location.storageAccount()).isEqualTo("account");
37+
assertThat(location.container().get()).isEqualTo("container");
38+
assertThat(location.path()).isEqualTo("path/to/file");
39+
}
40+
41+
@ParameterizedTest
42+
@ValueSource(strings = {"wasb", "wasbs"})
43+
public void testWasbLocatonParsing(String scheme) {
44+
String p1 = scheme + "://container@account.blob.core.windows.net/path/to/file";
45+
ADLSLocation location = new ADLSLocation(p1);
46+
47+
assertThat(location.storageAccount()).isEqualTo("account");
3748
assertThat(location.container().get()).isEqualTo("container");
3849
assertThat(location.path()).isEqualTo("path/to/file");
3950
}
@@ -43,7 +54,7 @@ public void testEncodedString() {
4354
String p1 = "abfs://container@account.dfs.core.windows.net/path%20to%20file";
4455
ADLSLocation location = new ADLSLocation(p1);
4556

46-
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
57+
assertThat(location.storageAccount()).isEqualTo("account");
4758
assertThat(location.container().get()).isEqualTo("container");
4859
assertThat(location.path()).isEqualTo("path%20to%20file");
4960
}
@@ -67,7 +78,7 @@ public void testNoContainer() {
6778
String p1 = "abfs://account.dfs.core.windows.net/path/to/file";
6879
ADLSLocation location = new ADLSLocation(p1);
6980

70-
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
81+
assertThat(location.storageAccount()).isEqualTo("account");
7182
assertThat(location.container().isPresent()).isFalse();
7283
assertThat(location.path()).isEqualTo("path/to/file");
7384
}
@@ -77,7 +88,7 @@ public void testNoPath() {
7788
String p1 = "abfs://container@account.dfs.core.windows.net";
7889
ADLSLocation location = new ADLSLocation(p1);
7990

80-
assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
91+
assertThat(location.storageAccount()).isEqualTo("account");
8192
assertThat(location.container().get()).isEqualTo("container");
8293
assertThat(location.path()).isEqualTo("");
8394
}

core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
6262
"s3n", S3_FILE_IO_IMPL,
6363
"gs", GCS_FILE_IO_IMPL,
6464
"abfs", ADLS_FILE_IO_IMPL,
65-
"abfss", ADLS_FILE_IO_IMPL);
65+
"abfss", ADLS_FILE_IO_IMPL,
66+
"wasb", ADLS_FILE_IO_IMPL,
67+
"wasbs", ADLS_FILE_IO_IMPL);
6668

6769
private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
6870
private final AtomicBoolean isClosed = new AtomicBoolean(false);

0 commit comments

Comments
 (0)