Skip to content

Commit

Permalink
implement IAM role support for S3 connector (#9)
Browse files Browse the repository at this point in the history
* implement IAM role support for S3 connector
  • Loading branch information
amrutha-shanbhag authored Oct 12, 2021
1 parent 1611f45 commit 11e8c58
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 8 deletions.
4 changes: 2 additions & 2 deletions distribution.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

<artifactId>distribution</artifactId>
<packaging>pom</packaging>
<version>0.1.3</version>
<version>0.1.4</version>

<dependencies>
<dependency>
<groupId>com.instaclustr.kafkaconnect</groupId>
<artifactId>instaclustr-s3-connector</artifactId>
<version>0.1.3</version>
<version>0.1.4</version>
</dependency>
<dependency>
<groupId>com.instaclustr.kafkaconnect</groupId>
Expand Down
11 changes: 8 additions & 3 deletions s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>instaclustr-s3-connector</artifactId>
<version>0.1.3</version>
<version>0.1.4</version>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.725</version>
<version>1.12.39</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.12.39</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
Expand All @@ -32,7 +37,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>24.1.1-jre</version>
<version>29.0-jre</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.Region;
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
Expand All @@ -27,6 +28,8 @@ public class AwsStorageConnectorCommonConfig {

public static final String AWS_ACCESS_KEY_ID = "aws.accessKeyId";

public static final String AWS_IAM_ROLE_ARN = "aws.role.arn";

public static final String DEFAULT_AWS_REGION = Regions.DEFAULT_REGION.getName();

private AwsStorageConnectorCommonConfig() {}
Expand All @@ -38,7 +41,8 @@ public static ConfigDef conf() {
ConfigDef.Importance.HIGH, "Path prefix for the objects written into S3")
.define(AWS_ACCESS_KEY_ID, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "AWS access key id")
.define(AWS_SECRET_KEY, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "AWS access secret key")
.define(AWS_REGION, ConfigDef.Type.STRING, DEFAULT_AWS_REGION, ConfigDef.Importance.MEDIUM, String.format("AWS client region, if not set will use %s", DEFAULT_AWS_REGION));
.define(AWS_REGION, ConfigDef.Type.STRING, DEFAULT_AWS_REGION, ConfigDef.Importance.MEDIUM, String.format("AWS client region, if not set will use %s", DEFAULT_AWS_REGION))
.define(AWS_IAM_ROLE_ARN, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "");
return configDef;
}

Expand Down Expand Up @@ -67,7 +71,7 @@ public static void verifyS3CredentialsAndBucketInfo(final Map<String, String> se
addErrorMessageToConfigObject(configObject, BUCKET, "The defined bucket name does not exist");
}
s3Client.shutdown();
} catch (AmazonS3Exception e) {
} catch (AmazonS3Exception | AWSSecurityTokenServiceException e) {
switch (e.getErrorCode()) {
case "InvalidAccessKeyId":
addErrorMessageToConfigObject(configObject, AWS_ACCESS_KEY_ID, "The defined aws.accessKeyId is invalid");
Expand All @@ -81,6 +85,12 @@ public static void verifyS3CredentialsAndBucketInfo(final Map<String, String> se
case "IllegalLocationConstraintException":
addErrorMessageToConfigObject(configObject, AWS_REGION, String.format("Defined region(%s) is not the same as the bucket region", sentConfigMap.get(AWS_REGION)));
break;
case "AccessDenied":
addErrorMessageToConfigObject(configObject, AWS_IAM_ROLE_ARN, "The user and/or role hasn't been setup correctly with the required permissions");
break;
case "ValidationError":
addErrorMessageToConfigObject(configObject, AWS_IAM_ROLE_ARN, "The defined aws.role.arn is invalid");
break;
default:
throw new ConnectException(String.format("Unknown Amazon S3 exception while validating config, %s", e.getErrorCode()), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package com.instaclustr.kafka.connect.s3;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class TransferManagerProvider {
private TransferManager transferManager;
Expand All @@ -30,9 +37,32 @@ public static AmazonS3ClientBuilder getS3ClientBuilderWithRegionAndCredentials(f
String accessKey = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_ACCESS_KEY_ID);
String secret = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_SECRET_KEY);
String region = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_REGION);
String roleArn = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_IAM_ROLE_ARN);

AWSStaticCredentialsProvider awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret));
AWSCredentialsProvider awsCredentialsProvider;

if (StringUtils.isBlank(roleArn)) {
// when IAM user has direct access to the S3 bucket
awsCredentialsProvider = awsStaticCredentialsProvider;
} else {
// when the IAM user needs to assume the role to access the S3 bucket
AWSSecurityTokenService awsSecurityTokenService = AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(awsStaticCredentialsProvider)
.build();

STSAssumeRoleSessionCredentialsProvider.Builder assumeRoleBuilder =
new STSAssumeRoleSessionCredentialsProvider.Builder(
roleArn,
UUID.randomUUID().toString().substring(0, 32));

awsCredentialsProvider = assumeRoleBuilder
.withStsClient(awsSecurityTokenService)
.build();
}

AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret)));
.withCredentials(awsCredentialsProvider);

if (region == null) {
region = AwsStorageConnectorCommonConfig.DEFAULT_AWS_REGION;
Expand Down

0 comments on commit 11e8c58

Please sign in to comment.