diff --git a/s3/src/main/java/com/instaclustr/kafka/connect/s3/TransferManagerProvider.java b/s3/src/main/java/com/instaclustr/kafka/connect/s3/TransferManagerProvider.java index b4e2c77..f6c5a6d 100644 --- a/s3/src/main/java/com/instaclustr/kafka/connect/s3/TransferManagerProvider.java +++ b/s3/src/main/java/com/instaclustr/kafka/connect/s3/TransferManagerProvider.java @@ -1,6 +1,7 @@ package com.instaclustr.kafka.connect.s3; import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; @@ -38,17 +39,16 @@ public static AmazonS3ClientBuilder getS3ClientBuilderWithRegionAndCredentials(f String region = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_REGION); String roleArn = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_IAM_ROLE_ARN); - AmazonS3ClientBuilder clientBuilder; + AWSStaticCredentialsProvider awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret)); + AWSCredentialsProvider awsCredentialsProvider; if (roleArn == null || StringUtils.isBlank(roleArn)) { - // authenticate with access key/secret - AWSStaticCredentialsProvider awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret)); - clientBuilder = AmazonS3ClientBuilder.standard() - .withCredentials(awsStaticCredentialsProvider); + // when IAM user has direct access to the S3 bucket + awsCredentialsProvider = awsStaticCredentialsProvider; } else { - // authenticate with access key/secret, then assume role + // when the IAM user needs to assume the role to access the S3 bucket AWSSecurityTokenService awsSecurityTokenService = AWSSecurityTokenServiceClientBuilder.standard() - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret))) + .withCredentials(awsStaticCredentialsProvider) .build(); STSAssumeRoleSessionCredentialsProvider.Builder assumeRoleBuilder = @@ -56,15 +56,15 @@ public static AmazonS3ClientBuilder getS3ClientBuilderWithRegionAndCredentials(f roleArn, UUID.randomUUID().toString().substring(0, 32)); - STSAssumeRoleSessionCredentialsProvider credentialsProvider = assumeRoleBuilder + awsCredentialsProvider = assumeRoleBuilder .withStsClient(awsSecurityTokenService) .withRoleSessionDurationSeconds((int) TimeUnit.HOURS.toSeconds(1)) .build(); - - clientBuilder = AmazonS3ClientBuilder.standard() - .withCredentials(credentialsProvider); } + AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard() + .withCredentials(awsCredentialsProvider); + if (region == null) { region = AwsStorageConnectorCommonConfig.DEFAULT_AWS_REGION; clientBuilder.enableForceGlobalBucketAccess();