Skip to content

Commit

Permalink
Support WASB scheme in ADLSFileIO (#11504)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnc committed Dec 19, 2024
1 parent 91a1505 commit f9e6059
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public Optional<Long> adlsWriteBlockSize() {
return Optional.ofNullable(adlsWriteBlockSize);
}

/**
* Applies configuration to the {@link DataLakeFileSystemClientBuilder} to provide the endpoint
* and credentials required to create an instance of the client.
*
* <p>The default endpoint is constructed in the form {@code
* https://{account}.dfs.core.windows.net} and default credentials are provided via the {@link
* com.azure.identity.DefaultAzureCredential}.
*
* @param account the service account name
* @param builder the builder instance
*/
public void applyClientConfiguration(String account, DataLakeFileSystemClientBuilder builder) {
String sasToken = adlsSasTokens.get(account);
if (sasToken != null && !sasToken.isEmpty()) {
Expand All @@ -93,7 +104,7 @@ public void applyClientConfiguration(String account, DataLakeFileSystemClientBui
if (connectionString != null && !connectionString.isEmpty()) {
builder.endpoint(connectionString);
} else {
builder.endpoint("https://" + account);
builder.endpoint("https://" + account + ".dfs.core.windows.net");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,21 @@
*
* <p>Locations follow a URI like structure to identify resources
*
* <pre>{@code abfs[s]://[<container>@]<storage account host>/<file path>}</pre>
* <pre>{@code abfs[s]://[<container>@]<storageAccount>.dfs.core.windows.net/<path>}</pre>
*
* or
*
* <pre>{@code wasb[s]://<container>@<storageAccount>.blob.core.windows.net/<path>}</pre>
*
* For compatibility, locations using the wasb scheme are also accepted but will use the Azure Data
* Lake Storage Gen2 REST APIs instead of the Blob Storage REST APIs.
*
* <p>See <a
* href="https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction-abfs-uri#uri-syntax">Azure
* Data Lake Storage URI</a>
*/
class ADLSLocation {
private static final Pattern URI_PATTERN = Pattern.compile("^abfss?://([^/?#]+)(.*)?$");
private static final Pattern URI_PATTERN = Pattern.compile("^(abfss?|wasbs?)://([^/?#]+)(.*)?$");

private final String storageAccount;
private final String container;
Expand All @@ -55,17 +62,18 @@ class ADLSLocation {

ValidationException.check(matcher.matches(), "Invalid ADLS URI: %s", location);

String authority = matcher.group(1);
String authority = matcher.group(2);
String[] parts = authority.split("@", -1);
if (parts.length > 1) {
this.container = parts[0];
this.storageAccount = parts[1];
String host = parts[1];
this.storageAccount = host.split("\\.", -1)[0];
} else {
this.container = null;
this.storageAccount = authority;
this.storageAccount = authority.split("\\.", -1)[0];
}

String uriPath = matcher.group(2);
String uriPath = matcher.group(3);
this.path = uriPath == null ? "" : uriPath.startsWith("/") ? uriPath.substring(1) : uriPath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ public void testNoSasToken() {
@Test
public void testWithConnectionString() {
AzureProperties props =
new AzureProperties(ImmutableMap.of("adls.connection-string.account1", "http://endpoint"));
new AzureProperties(
ImmutableMap.of(
"adls.connection-string.account1", "https://account1.dfs.core.usgovcloudapi.net"));

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account1", clientBuilder);
verify(clientBuilder).endpoint("http://endpoint");
verify(clientBuilder).endpoint("https://account1.dfs.core.usgovcloudapi.net");
}

@Test
Expand All @@ -111,7 +113,7 @@ public void testNoMatchingConnectionString() {

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account1", clientBuilder);
verify(clientBuilder).endpoint("https://account1");
verify(clientBuilder).endpoint("https://account1.dfs.core.windows.net");
}

@Test
Expand All @@ -120,7 +122,7 @@ public void testNoConnectionString() {

DataLakeFileSystemClientBuilder clientBuilder = mock(DataLakeFileSystemClientBuilder.class);
props.applyClientConfiguration("account", clientBuilder);
verify(clientBuilder).endpoint("https://account");
verify(clientBuilder).endpoint("https://account.dfs.core.windows.net");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,18 @@ public void testLocationParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}

@ParameterizedTest
@ValueSource(strings = {"wasb", "wasbs"})
public void testWasbLocatonParsing(String scheme) {
String p1 = scheme + "://[email protected]/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -43,7 +54,7 @@ public void testEncodedString() {
String p1 = "abfs://[email protected]/path%20to%20file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("path%20to%20file");
}
Expand All @@ -67,7 +78,7 @@ public void testNoContainer() {
String p1 = "abfs://account.dfs.core.windows.net/path/to/file";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().isPresent()).isFalse();
assertThat(location.path()).isEqualTo("path/to/file");
}
Expand All @@ -77,7 +88,7 @@ public void testNoPath() {
String p1 = "abfs://[email protected]";
ADLSLocation location = new ADLSLocation(p1);

assertThat(location.storageAccount()).isEqualTo("account.dfs.core.windows.net");
assertThat(location.storageAccount()).isEqualTo("account");
assertThat(location.container().get()).isEqualTo("container");
assertThat(location.path()).isEqualTo("");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
"s3n", S3_FILE_IO_IMPL,
"gs", GCS_FILE_IO_IMPL,
"abfs", ADLS_FILE_IO_IMPL,
"abfss", ADLS_FILE_IO_IMPL);
"abfss", ADLS_FILE_IO_IMPL,
"wasb", ADLS_FILE_IO_IMPL,
"wasbs", ADLS_FILE_IO_IMPL);

private final Map<String, DelegateFileIO> ioInstances = Maps.newConcurrentMap();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down

0 comments on commit f9e6059

Please sign in to comment.