From 734b5747d105d6318c6cb941bec56d4f69e48518 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Wed, 30 Oct 2024 22:33:05 -0400 Subject: [PATCH 01/12] recommit transfer manager v2 files --- feature/s3/transfermanager/api.go | 16 + feature/s3/transfermanager/api_client.go | 51 + .../s3/transfermanager/api_op_PutObject.go | 993 ++++++++++++++++++ feature/s3/transfermanager/go.mod | 27 + feature/s3/transfermanager/go.sum | 36 + .../internal/testing/endpoint.go | 25 + .../internal/testing/upload.go | 201 ++++ feature/s3/transfermanager/options.go | 63 ++ feature/s3/transfermanager/pool.go | 63 ++ feature/s3/transfermanager/pool_test.go | 47 + feature/s3/transfermanager/putobject_test.go | 904 ++++++++++++++++ feature/s3/transfermanager/shared_test.go | 4 + feature/s3/transfermanager/types/types.go | 346 ++++++ 13 files changed, 2776 insertions(+) create mode 100644 feature/s3/transfermanager/api.go create mode 100644 feature/s3/transfermanager/api_client.go create mode 100644 feature/s3/transfermanager/api_op_PutObject.go create mode 100644 feature/s3/transfermanager/go.mod create mode 100644 feature/s3/transfermanager/go.sum create mode 100644 feature/s3/transfermanager/internal/testing/endpoint.go create mode 100644 feature/s3/transfermanager/internal/testing/upload.go create mode 100644 feature/s3/transfermanager/options.go create mode 100644 feature/s3/transfermanager/pool.go create mode 100644 feature/s3/transfermanager/pool_test.go create mode 100644 feature/s3/transfermanager/putobject_test.go create mode 100644 feature/s3/transfermanager/shared_test.go create mode 100644 feature/s3/transfermanager/types/types.go diff --git a/feature/s3/transfermanager/api.go b/feature/s3/transfermanager/api.go new file mode 100644 index 00000000000..dbe430dadb4 --- /dev/null +++ b/feature/s3/transfermanager/api.go @@ -0,0 +1,16 @@ +package transfermanager + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// S3APIClient defines an interface doing S3 client side operations for transfer manager +type S3APIClient interface { + PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) + UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error) + CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) + CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) + AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) +} diff --git a/feature/s3/transfermanager/api_client.go b/feature/s3/transfermanager/api_client.go new file mode 100644 index 00000000000..84ce16db425 --- /dev/null +++ b/feature/s3/transfermanager/api_client.go @@ -0,0 +1,51 @@ +package transfermanager + +import ( + "github.com/aws/aws-sdk-go-v2/aws" +) + +const userAgentKey = "s3-transfer" + +// defaultMaxUploadParts is the maximum allowed number of parts in a multi-part upload +// on Amazon S3. +const defaultMaxUploadParts = 10000 + +// defaultPartSizeBytes is the default part size when transferring objects to/from S3 +const minPartSizeBytes = 1024 * 1024 * 8 + +// defaultMultipartUploadThreshold is the default size threshold in bytes indicating when to use multipart upload. +const defaultMultipartUploadThreshold = 1024 * 1024 * 16 + +// defaultTransferConcurrency is the default number of goroutines to spin up when +// using PutObject(). +const defaultTransferConcurrency = 5 + +// Client provides the API client to make operations call for Amazon Simple +// Storage Service's Transfer Manager +// It is safe to call Client methods concurrently across goroutines. +type Client struct { + options Options +} + +// New returns an initialized Client from the client Options. Provide +// more functional options to further configure the Client +func New(s3Client S3APIClient, opts Options, optFns ...func(*Options)) *Client { + opts.S3 = s3Client + for _, fn := range optFns { + fn(&opts) + } + + resolveConcurrency(&opts) + resolvePartSizeBytes(&opts) + resolveChecksumAlgorithm(&opts) + resolveMultipartUploadThreshold(&opts) + + return &Client{ + options: opts, + } +} + +// NewFromConfig returns a new Client from the provided s3 config +func NewFromConfig(s3Client S3APIClient, cfg aws.Config, optFns ...func(*Options)) *Client { + return New(s3Client, Options{}, optFns...) +} diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go new file mode 100644 index 00000000000..bba6b8036d3 --- /dev/null +++ b/feature/s3/transfermanager/api_op_PutObject.go @@ -0,0 +1,993 @@ +package transfermanager + +import ( + "bytes" + "context" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/middleware" + "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + smithymiddleware "github.com/aws/smithy-go/middleware" +) + +// A MultipartUploadError wraps a failed S3 multipart upload. An error returned +// will satisfy this interface when a multi part upload failed to upload all +// chucks to S3. In the case of a failure the UploadID is needed to operate on +// the chunks, if any, which were uploaded. +// +// Example: +// +// c := transfermanager.New(client, opts) +// output, err := c.PutObject(context.Background(), input) +// if err != nil { +// var multierr transfermanager.MultipartUploadError +// if errors.As(err, &multierr) { +// fmt.Printf("upload failure UploadID=%s, %s\n", multierr.UploadID(), multierr.Error()) +// } else { +// fmt.Printf("upload failure, %s\n", err.Error()) +// } +// } +type MultipartUploadError interface { + error + + // UploadID returns the upload id for the S3 multipart upload that failed. + UploadID() string +} + +// A multipartUploadError wraps the upload ID of a failed s3 multipart upload. +// Composed of BaseError for code, message, and original error +// +// Should be used for an error that occurred failing a S3 multipart upload, +// and a upload ID is available. +type multipartUploadError struct { + err error + + // ID for multipart upload which failed. + uploadID string +} + +// Error returns the string representation of the error. +// +// # See apierr.BaseError ErrorWithExtra for output format +// +// Satisfies the error interface. +func (m *multipartUploadError) Error() string { + var extra string + if m.err != nil { + extra = fmt.Sprintf(", cause: %s", m.err.Error()) + } + return fmt.Sprintf("upload multipart failed, upload id: %s%s", m.uploadID, extra) +} + +// Unwrap returns the underlying error that cause the upload failure +func (m *multipartUploadError) Unwrap() error { + return m.err +} + +// UploadID returns the id of the S3 upload which failed. +func (m *multipartUploadError) UploadID() string { + return m.uploadID +} + +// PutObjectInput represents a request to the PutObject() call. +type PutObjectInput struct { + // Bucket the object is uploaded into + Bucket string + + // Object key for which the PUT action was initiated + Key string + + // Object data + Body io.Reader + + // The canned ACL to apply to the object. For more information, see [Canned ACL] in the Amazon + // S3 User Guide. + // + // When adding a new object, you can use headers to grant ACL-based permissions to + // individual Amazon Web Services accounts or to predefined groups defined by + // Amazon S3. These permissions are then added to the ACL on the object. By + // default, all objects are private. Only the owner has full access control. For + // more information, see [Access Control List (ACL) Overview]and [Managing ACLs Using the REST API] in the Amazon S3 User Guide. + // + // If the bucket that you're uploading objects to uses the bucket owner enforced + // setting for S3 Object Ownership, ACLs are disabled and no longer affect + // permissions. Buckets that use this setting only accept PUT requests that don't + // specify an ACL or PUT requests that specify bucket owner full control ACLs, such + // as the bucket-owner-full-control canned ACL or an equivalent form of this ACL + // expressed in the XML format. PUT requests that contain other ACLs (for example, + // custom grants to certain Amazon Web Services accounts) fail and return a 400 + // error with the error code AccessControlListNotSupported . For more information, + // see [Controlling ownership of objects and disabling ACLs]in the Amazon S3 User Guide. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + // + // [Managing ACLs Using the REST API]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-using-rest-api.html + // [Access Control List (ACL) Overview]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html + // [Canned ACL]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL + // [Controlling ownership of objects and disabling ACLs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html + ACL types.ObjectCannedACL + + // Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption + // with server-side encryption using Key Management Service (KMS) keys (SSE-KMS). + // Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object + // encryption with SSE-KMS. + // + // Specifying this header with a PUT action doesn’t affect bucket-level settings + // for S3 Bucket Key. + // + // This functionality is not supported for directory buckets. + BucketKeyEnabled bool + + // Can be used to specify caching behavior along the request/reply chain. For more + // information, see [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]. + // + // [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9 + CacheControl string + + // Indicates the algorithm used to create the checksum for the object when you use + // the SDK. This header will not provide any additional functionality if you don't + // use the SDK. When you send this header, there must be a corresponding + // x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3 + // fails the request with the HTTP status code 400 Bad Request . + // + // For the x-amz-checksum-algorithm header, replace algorithm with the + // supported algorithm from the following list: + // + // - CRC32 + // + // - CRC32C + // + // - SHA1 + // + // - SHA256 + // + // For more information, see [Checking object integrity] in the Amazon S3 User Guide. + // + // If the individual checksum value you provide through x-amz-checksum-algorithm + // doesn't match the checksum algorithm you set through + // x-amz-sdk-checksum-algorithm , Amazon S3 ignores any provided ChecksumAlgorithm + // parameter and uses the checksum algorithm that matches the provided value in + // x-amz-checksum-algorithm . + // + // For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the + // default checksum algorithm that's used for performance. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html + ChecksumAlgorithm types.ChecksumAlgorithm + + // Size of the body in bytes. This parameter is useful when the size of the body + // cannot be determined automatically. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length + ContentLength int64 + + // Specifies presentational information for the object. For more information, see [https://www.rfc-editor.org/rfc/rfc6266#section-4]. + // + // [https://www.rfc-editor.org/rfc/rfc6266#section-4]: https://www.rfc-editor.org/rfc/rfc6266#section-4 + ContentDisposition string + + // Specifies what content encodings have been applied to the object and thus what + // decoding mechanisms must be applied to obtain the media-type referenced by the + // Content-Type header field. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding + ContentEncoding string + + // The language the content is in. + ContentLanguage string + + // A standard MIME type describing the format of the contents. For more + // information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]. + // + // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type + ContentType string + + // The account ID of the expected bucket owner. If the account ID that you provide + // does not match the actual owner of the bucket, the request fails with the HTTP + // status code 403 Forbidden (access denied). + ExpectedBucketOwner string + + // The date and time at which the object is no longer cacheable. For more + // information, see [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]. + // + // [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]: https://www.rfc-editor.org/rfc/rfc7234#section-5.3 + Expires time.Time + + // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantFullControl string + + // Allows grantee to read the object data and its metadata. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantRead string + + // Allows grantee to read the object ACL. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantReadACP string + + // Allows grantee to write the ACL for the applicable object. + // + // - This functionality is not supported for directory buckets. + // + // - This functionality is not supported for Amazon S3 on Outposts. + GrantWriteACP string + + // A map of metadata to store with the object in S3. + Metadata map[string]string + + // Specifies whether a legal hold will be applied to this object. For more + // information about S3 Object Lock, see [Object Lock]in the Amazon S3 User Guide. + // + // This functionality is not supported for directory buckets. + // + // [Object Lock]: https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock.html + ObjectLockLegalHoldStatus types.ObjectLockLegalHoldStatus + + // The Object Lock mode that you want to apply to this object. + // + // This functionality is not supported for directory buckets. + ObjectLockMode types.ObjectLockMode + + // The date and time when you want this object's Object Lock to expire. Must be + // formatted as a timestamp parameter. + // + // This functionality is not supported for directory buckets. + ObjectLockRetainUntilDate time.Time + + // Confirms that the requester knows that they will be charged for the request. + // Bucket owners need not specify this parameter in their requests. If either the + // source or destination S3 bucket has Requester Pays enabled, the requester will + // pay for corresponding charges to copy the object. For information about + // downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User + // Guide. + // + // This functionality is not supported for directory buckets. + // + // [Downloading Objects in Requester Pays Buckets]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html + RequestPayer types.RequestPayer + + // Specifies the algorithm to use when encrypting the object (for example, AES256 ). + // + // This functionality is not supported for directory buckets. + SSECustomerAlgorithm string + + // Specifies the customer-provided encryption key for Amazon S3 to use in + // encrypting data. This value is used to store the object and then it is + // discarded; Amazon S3 does not store the encryption key. The key must be + // appropriate for use with the algorithm specified in the + // x-amz-server-side-encryption-customer-algorithm header. + // + // This functionality is not supported for directory buckets. + SSECustomerKey string + + // Specifies the Amazon Web Services KMS Encryption Context to use for object + // encryption. The value of this header is a base64-encoded UTF-8 string holding + // JSON with the encryption context key-value pairs. This value is stored as object + // metadata and automatically gets passed on to Amazon Web Services KMS for future + // GetObject or CopyObject operations on this object. This value must be + // explicitly added during CopyObject operations. + // + // This functionality is not supported for directory buckets. + SSEKMSEncryptionContext string + + // If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse , + // this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key + // Management Service (KMS) symmetric encryption customer managed key that was used + // for the object. If you specify x-amz-server-side-encryption:aws:kms or + // x-amz-server-side-encryption:aws:kms:dsse , but do not provide + // x-amz-server-side-encryption-aws-kms-key-id , Amazon S3 uses the Amazon Web + // Services managed key ( aws/s3 ) to protect the data. If the KMS key does not + // exist in the same account that's issuing the command, you must use the full ARN + // and not just the ID. + // + // This functionality is not supported for directory buckets. + SSEKMSKeyID string + + // The server-side encryption algorithm that was used when you store this object + // in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ). + // + // General purpose buckets - You have four mutually exclusive options to protect + // data using server-side encryption in Amazon S3, depending on how you choose to + // manage the encryption keys. Specifically, the encryption key options are Amazon + // S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS), + // and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side + // encryption by using Amazon S3 managed keys (SSE-S3) by default. You can + // optionally tell Amazon S3 to encrypt data at rest by using server-side + // encryption with other key options. For more information, see [Using Server-Side Encryption]in the Amazon S3 + // User Guide. + // + // Directory buckets - For directory buckets, only the server-side encryption with + // Amazon S3 managed keys (SSE-S3) ( AES256 ) value is supported. + // + // [Using Server-Side Encryption]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html + ServerSideEncryption types.ServerSideEncryption + + // By default, Amazon S3 uses the STANDARD Storage Class to store newly created + // objects. The STANDARD storage class provides high durability and high + // availability. Depending on performance needs, you can specify a different + // Storage Class. For more information, see [Storage Classes]in the Amazon S3 User Guide. + // + // - For directory buckets, only the S3 Express One Zone storage class is + // supported to store newly created objects. + // + // - Amazon S3 on Outposts only uses the OUTPOSTS Storage Class. + // + // [Storage Classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html + StorageClass types.StorageClass + + // The tag-set for the object. The tag-set must be encoded as URL Query + // parameters. (For example, "Key1=Value1") + // + // This functionality is not supported for directory buckets. + Tagging string + + // If the bucket is configured as a website, redirects requests for this object to + // another object in the same bucket or to an external URL. Amazon S3 stores the + // value of this header in the object metadata. For information about object + // metadata, see [Object Key and Metadata]in the Amazon S3 User Guide. + // + // In the following example, the request header sets the redirect to an object + // (anotherPage.html) in the same bucket: + // + // x-amz-website-redirect-location: /anotherPage.html + // + // In the following example, the request header sets the object redirect to + // another website: + // + // x-amz-website-redirect-location: http://www.example.com/ + // + // For more information about website hosting in Amazon S3, see [Hosting Websites on Amazon S3] and [How to Configure Website Page Redirects] in the + // Amazon S3 User Guide. + // + // This functionality is not supported for directory buckets. + // + // [How to Configure Website Page Redirects]: https://docs.aws.amazon.com/AmazonS3/latest/dev/how-to-page-redirect.html + // [Hosting Websites on Amazon S3]: https://docs.aws.amazon.com/AmazonS3/latest/dev/WebsiteHosting.html + // [Object Key and Metadata]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html + WebsiteRedirectLocation string +} + +// map non-zero string to *string +func nzstring(v string) *string { + if v == "" { + return nil + } + return aws.String(v) +} + +// map non-zero Time to *Time +func nztime(t time.Time) *time.Time { + if t.IsZero() { + return nil + } + return aws.Time(t) +} + +func (i PutObjectInput) mapSingleUploadInput(body io.Reader, checksumAlgorithm types.ChecksumAlgorithm) *s3.PutObjectInput { + input := &s3.PutObjectInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + Body: body, + } + if i.ACL != "" { + input.ACL = s3types.ObjectCannedACL(i.ACL) + } + if i.ChecksumAlgorithm != "" { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } else { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(checksumAlgorithm) + } + if i.ObjectLockLegalHoldStatus != "" { + input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus) + } + if i.ObjectLockMode != "" { + input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode) + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + if i.ServerSideEncryption != "" { + input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption) + } + if i.StorageClass != "" { + input.StorageClass = s3types.StorageClass(i.StorageClass) + } + input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled) + input.CacheControl = nzstring(i.CacheControl) + input.ContentDisposition = nzstring(i.ContentDisposition) + input.ContentEncoding = nzstring(i.ContentEncoding) + input.ContentLanguage = nzstring(i.ContentLanguage) + input.ContentType = nzstring(i.ContentType) + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.GrantFullControl = nzstring(i.GrantFullControl) + input.GrantRead = nzstring(i.GrantRead) + input.GrantReadACP = nzstring(i.GrantReadACP) + input.GrantWriteACP = nzstring(i.GrantWriteACP) + input.Metadata = i.Metadata + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext) + input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID) + input.Tagging = nzstring(i.Tagging) + input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation) + input.Expires = nztime(i.Expires) + input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate) + return input +} + +func (i PutObjectInput) mapCreateMultipartUploadInput() *s3.CreateMultipartUploadInput { + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + } + if i.ACL != "" { + input.ACL = s3types.ObjectCannedACL(i.ACL) + } + if i.ChecksumAlgorithm != "" { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } else { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } + if i.ObjectLockLegalHoldStatus != "" { + input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus) + } + if i.ObjectLockMode != "" { + input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode) + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + if i.ServerSideEncryption != "" { + input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption) + } + if i.StorageClass != "" { + input.StorageClass = s3types.StorageClass(i.StorageClass) + } + input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled) + input.CacheControl = nzstring(i.CacheControl) + input.ContentDisposition = nzstring(i.ContentDisposition) + input.ContentEncoding = nzstring(i.ContentEncoding) + input.ContentLanguage = nzstring(i.ContentLanguage) + input.ContentType = nzstring(i.ContentType) + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.GrantFullControl = nzstring(i.GrantFullControl) + input.GrantRead = nzstring(i.GrantRead) + input.GrantReadACP = nzstring(i.GrantReadACP) + input.GrantWriteACP = nzstring(i.GrantWriteACP) + input.Metadata = i.Metadata + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext) + input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID) + input.Tagging = nzstring(i.Tagging) + input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation) + input.Expires = nztime(i.Expires) + input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate) + return input +} + +func (i PutObjectInput) mapCompleteMultipartUploadInput(uploadID *string, completedParts completedParts) *s3.CompleteMultipartUploadInput { + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + UploadId: uploadID, + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + var parts []s3types.CompletedPart + for _, part := range completedParts { + parts = append(parts, part.MapCompletedPart()) + } + if parts != nil { + input.MultipartUpload = &s3types.CompletedMultipartUpload{Parts: parts} + } + return input +} + +func (i PutObjectInput) mapUploadPartInput(body io.Reader, partNum *int32, uploadID *string) *s3.UploadPartInput { + input := &s3.UploadPartInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + Body: body, + PartNumber: partNum, + UploadId: uploadID, + } + if i.ChecksumAlgorithm != "" { + input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm) + } + if i.RequestPayer != "" { + input.RequestPayer = s3types.RequestPayer(i.RequestPayer) + } + input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner) + input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm) + input.SSECustomerKey = nzstring(i.SSECustomerKey) + return input +} + +func (i *PutObjectInput) mapAbortMultipartUploadInput(uploadID *string) *s3.AbortMultipartUploadInput { + input := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(i.Bucket), + Key: aws.String(i.Key), + UploadId: uploadID, + } + return input +} + +// PutObjectOutput represents a response from the Upload() call. +type PutObjectOutput struct { + // The ID for a multipart upload to S3. In the case of an error the error + // can be cast to the MultiUploadFailure interface to extract the upload ID. + // Will be empty string if multipart upload was not used, and the object + // was uploaded as a single PutObject call. + UploadID string + + // The list of parts that were uploaded and their checksums. Will be empty + // if multipart upload was not used, and the object was uploaded as a + // single PutObject call. + CompletedParts []types.CompletedPart + + // Indicates whether the uploaded object uses an S3 Bucket Key for server-side + // encryption with Amazon Web Services KMS (SSE-KMS). + BucketKeyEnabled bool + + // The base64-encoded, 32-bit CRC32 checksum of the object. + ChecksumCRC32 string + + // The base64-encoded, 32-bit CRC32C checksum of the object. + ChecksumCRC32C string + + // The base64-encoded, 160-bit SHA-1 digest of the object. + ChecksumSHA1 string + + // The base64-encoded, 256-bit SHA-256 digest of the object. + ChecksumSHA256 string + + // Entity tag for the uploaded object. + ETag string + + // If the object expiration is configured, this will contain the expiration date + // (expiry-date) and rule ID (rule-id). The value of rule-id is URL encoded. + Expiration string + + // The bucket where the newly created object is put + Bucket string + + // The object key of the newly created object. + Key string + + // If present, indicates that the requester was successfully charged for the + // request. + RequestCharged types.RequestCharged + + // If present, specifies the ID of the Amazon Web Services Key Management Service + // (Amazon Web Services KMS) symmetric customer managed customer master key (CMK) + // that was used for the object. + SSEKMSKeyID string + + // If you specified server-side encryption either with an Amazon S3-managed + // encryption key or an Amazon Web Services KMS customer master key (CMK) in your + // initiate multipart upload request, the response includes this header. It + // confirms the encryption algorithm that Amazon S3 used to encrypt the object. + ServerSideEncryption types.ServerSideEncryption + + // The version of the object that was uploaded. Will only be populated if + // the S3 Bucket is versioned. If the bucket is not versioned this field + // will not be set. + VersionID string + + // Metadata pertaining to the operation's result. + ResultMetadata smithymiddleware.Metadata +} + +func (o *PutObjectOutput) mapFromPutObjectOutput(out *s3.PutObjectOutput, bucket, key string) { + o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled) + o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32) + o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C) + o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1) + o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256) + o.ETag = aws.ToString(out.ETag) + o.Expiration = aws.ToString(out.Expiration) + o.Bucket = bucket + o.Key = key + o.RequestCharged = types.RequestCharged(out.RequestCharged) + o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId) + o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption) + o.VersionID = aws.ToString(out.VersionId) + o.ResultMetadata = out.ResultMetadata.Clone() +} + +func (o *PutObjectOutput) mapFromCompleteMultipartUploadOutput(out *s3.CompleteMultipartUploadOutput, bucket, uploadID string, completedParts completedParts) { + o.UploadID = uploadID + o.CompletedParts = completedParts + o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled) + o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32) + o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C) + o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1) + o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256) + o.ETag = aws.ToString(out.ETag) + o.Expiration = aws.ToString(out.Expiration) + o.Bucket = bucket + o.Key = aws.ToString(out.Key) + o.RequestCharged = types.RequestCharged(out.RequestCharged) + o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId) + o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption) + o.VersionID = aws.ToString(out.VersionId) + o.ResultMetadata = out.ResultMetadata +} + +// PutObject uploads an object to S3, intelligently buffering large +// files into smaller chunks and sending them in parallel across multiple +// goroutines. You can configure the chunk size and concurrency through the +// Options parameters. +// +// Additional functional options can be provided to configure the individual +// upload. These options are copies of the original Options instance, the client of which PutObject is called from. +// Modifying the options will not impact the original Client and Options instance. +func (c *Client) PutObject(ctx context.Context, input *PutObjectInput, opts ...func(*Options)) (*PutObjectOutput, error) { + i := uploader{in: input, options: c.options.Copy()} + for _, opt := range opts { + opt(&i.options) + } + + return i.upload(ctx) +} + +type uploader struct { + options Options + + in *PutObjectInput + + // PartPool allows for the re-usage of streaming payload part buffers between upload calls + partPool bytesBufferPool +} + +func (u *uploader) upload(ctx context.Context) (*PutObjectOutput, error) { + if err := u.init(); err != nil { + return nil, fmt.Errorf("unable to initialize upload: %w", err) + } + defer u.partPool.Close() + + clientOptions := []func(o *s3.Options){ + func(o *s3.Options) { + o.APIOptions = append(o.APIOptions, + middleware.AddSDKAgentKey(middleware.FeatureMetadata, userAgentKey), + addFeatureUserAgent, + ) + }} + + r, _, cleanUp, err := u.nextReader(ctx) + + if err == io.EOF { + return u.singleUpload(ctx, r, cleanUp, clientOptions...) + } else if err != nil { + cleanUp() + return nil, err + } + + mu := multiUploader{ + uploader: u, + } + return mu.upload(ctx, r, cleanUp, clientOptions...) +} + +func (u *uploader) init() error { + if err := u.initSize(); err != nil { + return err + } + u.partPool = newDefaultSlicePool(u.options.PartSizeBytes, u.options.Concurrency+1) + + return nil +} + +// initSize checks user configured partsize and up-size it if calculated part count exceeds max value +func (u *uploader) initSize() error { + if u.options.PartSizeBytes < minPartSizeBytes { + return fmt.Errorf("part size must be at least %d bytes", minPartSizeBytes) + } + + var bodySize int64 + switch r := u.in.Body.(type) { + case io.Seeker: + n, err := types.SeekerLen(r) + if err != nil { + return err + } + bodySize = n + default: + if l := u.in.ContentLength; l > 0 { + bodySize = l + } + } + + // Try to adjust partSize if it is too small and account for + // integer division truncation. + if bodySize/u.options.PartSizeBytes >= int64(defaultMaxUploadParts) { + // Add one to the part size to account for remainders + // during the size calculation. e.g odd number of bytes. + u.options.PartSizeBytes = (bodySize / int64(defaultMaxUploadParts)) + 1 + } + return nil +} + +func (u *uploader) singleUpload(ctx context.Context, r io.Reader, cleanUp func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) { + defer cleanUp() + + params := u.in.mapSingleUploadInput(r, u.options.ChecksumAlgorithm) + + out, err := u.options.S3.PutObject(ctx, params, clientOptions...) + if err != nil { + return nil, err + } + + var output PutObjectOutput + output.mapFromPutObjectOutput(out, u.in.Bucket, u.in.Key) + return &output, nil +} + +// nextReader reads the next chunk of data from input Body +func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), error) { + part, err := u.partPool.Get(ctx) + if err != nil { + return nil, 0, func() {}, err + } + + n, err := readFillBuf(u.in.Body, part) + + cleanup := func() { + u.partPool.Put(part) + } + return bytes.NewReader(part[0:n]), n, cleanup, err +} + +func readFillBuf(r io.Reader, b []byte) (offset int, err error) { + for offset < len(b) && err == nil { + var n int + n, err = r.Read(b[offset:]) + offset += n + } + return offset, err +} + +type multiUploader struct { + *uploader + wg sync.WaitGroup + m sync.Mutex + err error + uploadID *string + parts completedParts +} + +type ulChunk struct { + buf io.Reader + partNum *int32 + cleanup func() +} + +type completedParts []types.CompletedPart + +func (cp completedParts) Len() int { + return len(cp) +} + +func (cp completedParts) Less(i, j int) bool { + return aws.ToInt32(cp[i].PartNumber) < aws.ToInt32(cp[j].PartNumber) +} + +func (cp completedParts) Swap(i, j int) { + cp[i], cp[j] = cp[j], cp[i] +} + +// upload will perform a multipart upload using the firstBuf buffer containing +// the first chunk of data. +func (u *multiUploader) upload(ctx context.Context, firstBuf io.Reader, cleanup func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) { + params := u.uploader.in.mapCreateMultipartUploadInput() + + // Create a multipart + resp, err := u.uploader.options.S3.CreateMultipartUpload(ctx, params, clientOptions...) + if err != nil { + cleanup() + return nil, err + } + u.uploadID = resp.UploadId + + ch := make(chan ulChunk, u.options.Concurrency) + for i := 0; i < u.options.Concurrency; i++ { + // launch workers + u.wg.Add(1) + go u.readChunk(ctx, ch, clientOptions...) + } + + var partNum int32 = 1 + ch <- ulChunk{buf: firstBuf, partNum: aws.Int32(partNum), cleanup: cleanup} + for u.geterr() == nil && err == nil { + partNum++ + var ( + data io.Reader + nextChunkLen int + ok bool + ) + data, nextChunkLen, cleanup, err = u.nextReader(ctx) + ok, err = u.shouldContinue(partNum, nextChunkLen, err) + if !ok { + cleanup() + if err != nil { + u.seterr(err) + } + break + } + + ch <- ulChunk{buf: data, partNum: aws.Int32(partNum), cleanup: cleanup} + } + + // close the channel, wait for workers and complete upload + close(ch) + u.wg.Wait() + completeOut := u.complete(ctx, clientOptions...) + + if err := u.geterr(); err != nil { + return nil, &multipartUploadError{ + err: err, + uploadID: *u.uploadID, + } + } + + var out PutObjectOutput + out.mapFromCompleteMultipartUploadOutput(completeOut, aws.ToString(params.Bucket), aws.ToString(u.uploadID), u.parts) + return &out, nil +} + +func (u *multiUploader) shouldContinue(part int32, nextChunkLen int, err error) (bool, error) { + if err != nil && err != io.EOF { + return false, fmt.Errorf("read multipart upload data failed, %w", err) + } + + if nextChunkLen == 0 { + // No need to upload empty part, if file was empty to start + // with empty single part would of been created and never + // started multipart upload. + return false, nil + } + + // This upload exceeded maximum number of supported parts, error now. + if part > defaultMaxUploadParts { + return false, fmt.Errorf(fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit", defaultMaxUploadParts)) + } + + return true, err +} + +// readChunk runs in worker goroutines to pull chunks off of the ch channel +// and send() them as UploadPart requests. +func (u *multiUploader) readChunk(ctx context.Context, ch chan ulChunk, clientOptions ...func(*s3.Options)) { + defer u.wg.Done() + for { + data, ok := <-ch + + if !ok { + break + } + + if u.geterr() == nil { + if err := u.send(ctx, data, clientOptions...); err != nil { + u.seterr(err) + } + } + + data.cleanup() + } +} + +// send performs an UploadPart request and keeps track of the completed +// part information. +func (u *multiUploader) send(ctx context.Context, c ulChunk, clientOptions ...func(*s3.Options)) error { + params := u.in.mapUploadPartInput(c.buf, c.partNum, u.uploadID) + resp, err := u.options.S3.UploadPart(ctx, params, clientOptions...) + if err != nil { + return err + } + + var completed types.CompletedPart + completed.MapFrom(resp, c.partNum) + + u.m.Lock() + u.parts = append(u.parts, completed) + u.m.Unlock() + + return nil +} + +// geterr is a thread-safe getter for the error object +func (u *multiUploader) geterr() error { + u.m.Lock() + defer u.m.Unlock() + + return u.err +} + +// seterr is a thread-safe setter for the error object +func (u *multiUploader) seterr(e error) { + u.m.Lock() + defer u.m.Unlock() + + u.err = e +} + +// fail will abort the multipart unless LeavePartsOnError is set to true. +func (u *multiUploader) fail(ctx context.Context, clientOptions ...func(*s3.Options)) { + params := u.in.mapAbortMultipartUploadInput(u.uploadID) + _, err := u.options.S3.AbortMultipartUpload(ctx, params, clientOptions...) + if err != nil { + u.seterr(fmt.Errorf("failed to abort multipart upload (%v), triggered after multipart upload failed: %v", err, u.geterr())) + } +} + +// complete successfully completes a multipart upload and returns the response. +func (u *multiUploader) complete(ctx context.Context, clientOptions ...func(*s3.Options)) *s3.CompleteMultipartUploadOutput { + if u.geterr() != nil { + u.fail(ctx) + return nil + } + + // Parts must be sorted in PartNumber order. + sort.Sort(u.parts) + + params := u.in.mapCompleteMultipartUploadInput(u.uploadID, u.parts) + + resp, err := u.options.S3.CompleteMultipartUpload(ctx, params, clientOptions...) + if err != nil { + u.seterr(err) + u.fail(ctx) + } + + return resp +} + +func addFeatureUserAgent(stack *smithymiddleware.Stack) error { + ua, err := getOrAddRequestUserAgent(stack) + if err != nil { + return err + } + + ua.AddUserAgentFeature(middleware.UserAgentFeatureS3Transfer) + return nil +} + +func getOrAddRequestUserAgent(stack *smithymiddleware.Stack) (*middleware.RequestUserAgent, error) { + id := (*middleware.RequestUserAgent)(nil).ID() + mw, ok := stack.Build.Get(id) + if !ok { + mw = middleware.NewRequestUserAgent() + if err := stack.Build.Add(mw, smithymiddleware.After); err != nil { + return nil, err + } + } + + ua, ok := mw.(*middleware.RequestUserAgent) + if !ok { + return nil, fmt.Errorf("%T for %s middleware did not match expected type", mw, id) + } + + return ua, nil +} diff --git a/feature/s3/transfermanager/go.mod b/feature/s3/transfermanager/go.mod new file mode 100644 index 00000000000..066daeb30ed --- /dev/null +++ b/feature/s3/transfermanager/go.mod @@ -0,0 +1,27 @@ +module github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager + +go 1.20 + +require ( + github.com/aws/aws-sdk-go-v2 v1.30.3 + github.com/aws/aws-sdk-go-v2/config v1.27.27 + github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 + github.com/aws/smithy-go v1.20.3 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect +) diff --git a/feature/s3/transfermanager/go.sum b/feature/s3/transfermanager/go.sum new file mode 100644 index 00000000000..2147a83948c --- /dev/null +++ b/feature/s3/transfermanager/go.sum @@ -0,0 +1,36 @@ +github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= +github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= +github.com/aws/aws-sdk-go-v2/config v1.27.27 h1:HdqgGt1OAP0HkEDDShEl0oSYa9ZZBSOmKpdpsDMdO90= +github.com/aws/aws-sdk-go-v2/config v1.27.27/go.mod h1:MVYamCg76dFNINkZFu4n4RjDixhVr51HLj4ErWzrVwg= +github.com/aws/aws-sdk-go-v2/credentials v1.17.27 h1:2raNba6gr2IfA0eqqiP2XiQ0UVOpGPgDSi0I9iAP+UI= +github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 h1:KreluoV8FZDEtI6Co2xuNk/UqI9iwMrOx/87PBNIKqw= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11/go.mod h1:SeSUYBLsMYFoRvHE0Tjvn7kbxaUhl75CJi1sbfhMxkU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 h1:SoNJ4RlFEQEbtDcCEt+QG56MY4fm4W8rYirAmq+/DdU= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15/go.mod h1:U9ke74k1n2bf+RIgoX1SXFed1HLs51OgUSs+Ph0KJP8= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 h1:C6WHdGnTDIYETAm5iErQUiVNsclNx9qbJVPIt03B6bI= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15/go.mod h1:ZQLZqhcu+JhSrA9/NXRm8SkDvsycE+JkV3WGY41e+IM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12 h1:DXFWyt7ymx/l1ygdyTTS0X923e+Q2wXIxConJzrgwc0= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12/go.mod h1:mVOr/LbvaNySK1/BTy4cBOCjhCNY2raWBwK4v+WR5J4= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14 h1:oWccitSnByVU74rQRHac4gLfDqjB6Z1YQGOY/dXKedI= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14/go.mod h1:8SaZBlQdCLrc/2U3CEO48rYj9uR8qRsPRkmzwNM52pM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrxZlQ044RiM+WdoZxp0p+EGM62y3L6pwA4olE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17/go.mod h1:RkZEx4l0EHYDJpWppMJ3nD9wZJAa8/0lq9aVC+r2UII= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12 h1:tzha+v1SCEBpXWEuw6B/+jm4h5z8hZbTpXz0zRZqTnw= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12/go.mod h1:n+nt2qjHGoseWeLHt1vEr6ZRCCxIN2KcNpJxBcYQSwI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 h1:wsg9Z/vNnCmxWikfGIoOlnExtEU459cR+2d+iDJ8elo= +github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1/go.mod h1:8rDw3mVwmvIWWX/+LWY3PPIMZuwnQdJMCt0iVFVT3qw= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 h1:BXx0ZIxvrJdSgSvKTZ+yRBeSqqgPM89VPlulEcl37tM= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.4/go.mod h1:ooyCOXjvJEsUw7x+ZDHeISPMhtwI3ZCB7ggFMcFfWLU= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 h1:yiwVzJW2ZxZTurVbYWA7QOrAaCYQR72t0wrSBfoesUE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4/go.mod h1:0oxfLkpz3rQ/CHlx5hB7H69YUpFiI1tql6Q6Ne+1bCw= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudrvuKpDKgMVRlepGE= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ= +github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= +github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= diff --git a/feature/s3/transfermanager/internal/testing/endpoint.go b/feature/s3/transfermanager/internal/testing/endpoint.go new file mode 100644 index 00000000000..082aedec4f2 --- /dev/null +++ b/feature/s3/transfermanager/internal/testing/endpoint.go @@ -0,0 +1,25 @@ +package testing + +import ( + "context" + "net/url" + + "github.com/aws/aws-sdk-go-v2/service/s3" + smithyendpoints "github.com/aws/smithy-go/endpoints" +) + +// EndpointResolverV2 is a mock s3 endpoint resolver v2 for testing +type EndpointResolverV2 struct { + URL string +} + +// ResolveEndpoint returns the given endpoint url +func (r EndpointResolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) { + u, err := url.Parse(r.URL) + if err != nil { + return smithyendpoints.Endpoint{}, err + } + return smithyendpoints.Endpoint{ + URI: *u, + }, nil +} diff --git a/feature/s3/transfermanager/internal/testing/upload.go b/feature/s3/transfermanager/internal/testing/upload.go new file mode 100644 index 00000000000..10009ccdc0f --- /dev/null +++ b/feature/s3/transfermanager/internal/testing/upload.go @@ -0,0 +1,201 @@ +package testing + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "sync" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// UploadLoggingClient is a mock client that can be used to record and stub responses for testing the Uploader. +type UploadLoggingClient struct { + Invocations []string + Params []interface{} + + ConsumeBody bool + + ignoredOperations []string + + PartNum int + m sync.Mutex + + PutObjectFn func(*UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) + UploadPartFn func(*UploadLoggingClient, *s3.UploadPartInput) (*s3.UploadPartOutput, error) + CreateMultipartUploadFn func(*UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) + CompleteMultipartUploadFn func(*UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) + AbortMultipartUploadFn func(*UploadLoggingClient, *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) +} + +func (u *UploadLoggingClient) simulateHTTPClientOption(optFns ...func(*s3.Options)) error { + + o := s3.Options{ + HTTPClient: httpDoFunc(func(r *http.Request) (*http.Response, error) { + return &http.Response{ + Request: r, + }, nil + }), + } + + for _, fn := range optFns { + fn(&o) + } + + _, err := o.HTTPClient.Do(&http.Request{ + URL: &url.URL{ + Scheme: "https", + Host: "mock.amazonaws.com", + Path: "/key", + RawQuery: "foo=bar", + }, + }) + if err != nil { + return err + } + + return nil +} + +type httpDoFunc func(*http.Request) (*http.Response, error) + +func (f httpDoFunc) Do(r *http.Request) (*http.Response, error) { + return f(r) +} + +func (u *UploadLoggingClient) traceOperation(name string, params interface{}) { + if contains(u.ignoredOperations, name) { + return + } + u.Invocations = append(u.Invocations, name) + u.Params = append(u.Params, params) + +} + +// PutObject is the S3 PutObject API. +func (u *UploadLoggingClient) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + if u.ConsumeBody { + io.Copy(ioutil.Discard, params.Body) + } + + u.traceOperation("PutObject", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.PutObjectFn != nil { + return u.PutObjectFn(u, params) + } + + return &s3.PutObjectOutput{ + VersionId: aws.String("VERSION-ID"), + }, nil +} + +// UploadPart is the S3 UploadPart API. +func (u *UploadLoggingClient) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + if u.ConsumeBody { + io.Copy(ioutil.Discard, params.Body) + } + + u.traceOperation("UploadPart", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.UploadPartFn != nil { + return u.UploadPartFn(u, params) + } + + return &s3.UploadPartOutput{ + ETag: aws.String(fmt.Sprintf("ETAG%d", *params.PartNumber)), + }, nil +} + +// CreateMultipartUpload is the S3 CreateMultipartUpload API. +func (u *UploadLoggingClient) CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + u.traceOperation("CreateMultipartUpload", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.CreateMultipartUploadFn != nil { + return u.CreateMultipartUploadFn(u, params) + } + + return &s3.CreateMultipartUploadOutput{ + UploadId: aws.String("UPLOAD-ID"), + }, nil +} + +// CompleteMultipartUpload is the S3 CompleteMultipartUpload API. +func (u *UploadLoggingClient) CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + u.traceOperation("CompleteMultipartUpload", params) + + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.CompleteMultipartUploadFn != nil { + return u.CompleteMultipartUploadFn(u, params) + } + + return &s3.CompleteMultipartUploadOutput{ + Location: aws.String("http://location"), + VersionId: aws.String("VERSION-ID"), + }, nil +} + +// AbortMultipartUpload is the S3 AbortMultipartUpload API. +func (u *UploadLoggingClient) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + u.m.Lock() + defer u.m.Unlock() + + u.traceOperation("AbortMultipartUpload", params) + if err := u.simulateHTTPClientOption(optFns...); err != nil { + return nil, err + } + + if u.AbortMultipartUploadFn != nil { + return u.AbortMultipartUploadFn(u, params) + } + + return &s3.AbortMultipartUploadOutput{}, nil +} + +// NewUploadLoggingClient returns a new UploadLoggingClient. +func NewUploadLoggingClient(ignoredOps []string) (*UploadLoggingClient, *[]string, *[]interface{}) { + c := &UploadLoggingClient{ + ignoredOperations: ignoredOps, + } + + return c, &c.Invocations, &c.Params +} + +func contains(src []string, s string) bool { + for _, v := range src { + if v == s { + return true + } + } + return false +} diff --git a/feature/s3/transfermanager/options.go b/feature/s3/transfermanager/options.go new file mode 100644 index 00000000000..a49e74afd64 --- /dev/null +++ b/feature/s3/transfermanager/options.go @@ -0,0 +1,63 @@ +package transfermanager + +import "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types" + +// Options provides params needed for transfer api calls +type Options struct { + // The client to use when uploading to S3. + S3 S3APIClient + + // The buffer size (in bytes) to use when buffering data into chunks and + // sending them as parts to S3. The minimum allowed part size is 5MB, and + // if this value is set to zero, the DefaultUploadPartSize value will be used. + PartSizeBytes int64 + + // The threshold bytes to decide when the file should be multi-uploaded + MultipartUploadThreshold int64 + + // Option to disable checksum validation for download + DisableChecksum bool + + // Checksum algorithm to use for upload + ChecksumAlgorithm types.ChecksumAlgorithm + + // The number of goroutines to spin up in parallel per call to Upload when + // sending parts. If this is set to zero, the DefaultUploadConcurrency value + // will be used. + // + // The concurrency pool is not shared between calls to Upload. + Concurrency int +} + +func (o *Options) init() { +} + +func resolveConcurrency(o *Options) { + if o.Concurrency == 0 { + o.Concurrency = defaultTransferConcurrency + } +} + +func resolvePartSizeBytes(o *Options) { + if o.PartSizeBytes == 0 { + o.PartSizeBytes = minPartSizeBytes + } +} + +func resolveChecksumAlgorithm(o *Options) { + if o.ChecksumAlgorithm == "" { + o.ChecksumAlgorithm = types.ChecksumAlgorithmCrc32 + } +} + +func resolveMultipartUploadThreshold(o *Options) { + if o.MultipartUploadThreshold == 0 { + o.MultipartUploadThreshold = defaultMultipartUploadThreshold + } +} + +// Copy returns new copy of the Options +func (o Options) Copy() Options { + to := o + return to +} diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go new file mode 100644 index 00000000000..d681861a121 --- /dev/null +++ b/feature/s3/transfermanager/pool.go @@ -0,0 +1,63 @@ +package transfermanager + +import ( + "context" + "fmt" +) + +type bytesBufferPool interface { + Get(context.Context) ([]byte, error) + Put([]byte) + Close() +} + +type defaultSlicePool struct { + slices chan []byte +} + +func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { + p := &defaultSlicePool{} + + slices := make(chan []byte, capacity) + for i := 0; i < capacity; i++ { + slices <- make([]byte, sliceSize) + } + + p.slices = slices + return p +} + +var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") + +func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + for { + select { + case bs, ok := <-p.slices: + if !ok { + return nil, errZeroCapacity + } + return bs, nil + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + } +} + +func (p *defaultSlicePool) Put(bs []byte) { + p.slices <- bs +} + +func (p *defaultSlicePool) Close() { + close(p.slices) + for range p.slices { + // drain channel + } + p.slices = nil +} diff --git a/feature/s3/transfermanager/pool_test.go b/feature/s3/transfermanager/pool_test.go new file mode 100644 index 00000000000..d6e107bc98c --- /dev/null +++ b/feature/s3/transfermanager/pool_test.go @@ -0,0 +1,47 @@ +package transfermanager + +import ( + "context" + "sync" + "testing" +) + +func TestDefaultSlicePool(t *testing.T) { + pool := newDefaultSlicePool(1, 2) + + var bs []byte + var err error + var wg sync.WaitGroup + + for i := 0; i < 200; i++ { + wg.Add(1) + go func() { + defer wg.Done() + pool.Put(bs) + }() + } + // wait for a slice to be put back + for i := 0; i < 200; i++ { + bs, err = pool.Get(context.Background()) + if err != nil { + t.Errorf("failed to get slice from pool: %v", err) + } + } + + wg.Wait() + + // failed to get a slice due to ctx cancelled + ctx, cancel := context.WithCancel(context.Background()) + cancel() + bs, err = pool.Get(ctx) + if err == nil { + pool.Put(bs) + t.Errorf("expectd no slice to be returned") + } + + if e, a := 2, len(pool.slices); e != a { + t.Errorf("expect pool size to be %v, got %v", e, a) + } + + pool.Close() +} diff --git a/feature/s3/transfermanager/putobject_test.go b/feature/s3/transfermanager/putobject_test.go new file mode 100644 index 00000000000..06cd0ebe149 --- /dev/null +++ b/feature/s3/transfermanager/putobject_test.go @@ -0,0 +1,904 @@ +package transfermanager + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "reflect" + "regexp" + "sort" + "strconv" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" + s3testing "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/internal/testing" + "github.com/aws/aws-sdk-go-v2/internal/awstesting" + "github.com/aws/aws-sdk-go-v2/internal/sdk" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// getReaderLength discards the bytes from reader and returns the length +func getReaderLength(r io.Reader) int64 { + n, _ := io.Copy(ioutil.Discard, r) + return n +} + +func TestUploadOrderMulti(t *testing.T) { + c, invocations, args := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key - value", + Body: bytes.NewReader(buf20MB), + ServerSideEncryption: "aws:kms", + SSEKMSKeyID: "KmsId", + ContentType: "content/type", + }) + + if err != nil { + t.Errorf("Expected no error but received %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Errorf(diff) + } + + if "UPLOAD-ID" != resp.UploadID { + t.Errorf("expect %q, got %q", "UPLOAD-ID", resp.UploadID) + } + + if "VERSION-ID" != resp.VersionID { + t.Errorf("expect %q, got %q", "VERSION-ID", resp.VersionID) + } + + // Validate input values + + // UploadPart + for i := 1; i < 4; i++ { + v := aws.ToString((*args)[i].(*s3.UploadPartInput).UploadId) + if "UPLOAD-ID" != v { + t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v) + } + } + + // CompleteMultipartUpload + v := aws.ToString((*args)[4].(*s3.CompleteMultipartUploadInput).UploadId) + if "UPLOAD-ID" != v { + t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v) + } + + parts := (*args)[4].(*s3.CompleteMultipartUploadInput).MultipartUpload.Parts + + for i := 0; i < 3; i++ { + num := parts[i].PartNumber + etag := aws.ToString(parts[i].ETag) + + if int32(i+1) != aws.ToInt32(num) { + t.Errorf("expect %d, got %d", i+1, num) + } + + if matched, err := regexp.MatchString(`^ETAG\d+$`, etag); !matched || err != nil { + t.Errorf("Failed regexp expression `^ETAG\\d+$`") + } + } + + // Custom headers + cmu := (*args)[0].(*s3.CreateMultipartUploadInput) + + if e, a := types.ServerSideEncryption("aws:kms"), cmu.ServerSideEncryption; e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "KmsId", aws.ToString(cmu.SSEKMSKeyId); e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "content/type", aws.ToString(cmu.ContentType); e != a { + t.Errorf("expect %q, got %q", e, a) + } +} + +func TestUploadOrderMultiDifferentPartSize(t *testing.T) { + c, ops, args := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{ + PartSizeBytes: 1024 * 1024 * 11, + Concurrency: 1, + }) + + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"} + if !reflect.DeepEqual(vals, *ops) { + t.Errorf("expect %v, got %v", vals, *ops) + } + + // Part lengths + if len := getReaderLength((*args)[1].(*s3.UploadPartInput).Body); 1024*1024*11 != len { + t.Errorf("expect %d, got %d", 1024*1024*7, len) + } + if len := getReaderLength((*args)[2].(*s3.UploadPartInput).Body); 1024*1024*9 != len { + t.Errorf("expect %d, got %d", 1024*1024*5, len) + } +} + +func TestUploadFailIfPartSizeTooSmall(t *testing.T) { + mgr := New(s3.New(s3.Options{}), Options{}, + func(o *Options) { + o.PartSizeBytes = 5 + }) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if resp != nil { + t.Errorf("Expected response to be nil, but received %v", resp) + } + + if err == nil { + t.Errorf("Expected error, but received nil") + } + if e, a := "part size must be at least", err.Error(); !strings.Contains(a, e) { + t.Errorf("expect %v to be in %v", e, a) + } +} + +func TestUploadOrderSingle(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key - value", + Body: bytes.NewReader(buf2MB), + ServerSideEncryption: "aws:kms", + SSEKMSKeyID: "KmsId", + ContentType: "content/type", + }) + + if err != nil { + t.Errorf("expect no error but received %v", err) + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if e := "VERSION-ID"; e != resp.VersionID { + t.Errorf("expect %q, got %q", e, resp.VersionID) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect empty string, got %q", resp.UploadID) + } + + putObjectInput := (*params)[0].(*s3.PutObjectInput) + + if e, a := types.ServerSideEncryption("aws:kms"), putObjectInput.ServerSideEncryption; e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "KmsId", aws.ToString(putObjectInput.SSEKMSKeyId); e != a { + t.Errorf("expect %q, got %q", e, a) + } + + if e, a := "content/type", aws.ToString(putObjectInput.ContentType); e != a { + t.Errorf("Expected %q, but received %q", e, a) + } +} + +func TestUploadSingleFailure(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.PutObjectFn = func(*s3testing.UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) { + return nil, fmt.Errorf("put object failure") + } + + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf2MB), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if resp != nil { + t.Errorf("expect response to be nil, got %v", resp) + } +} + +func TestUploadOrderZero(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(make([]byte, 0)), + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect empty string, got %q", resp.UploadID) + } + + if e, a := int64(0), getReaderLength((*params)[0].(*s3.PutObjectInput).Body); e != a { + t.Errorf("Expected %d, but received %d", e, a) + } +} + +func TestUploadOrderMultiFailure(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) { + if *params.PartNumber == 2 { + return nil, fmt.Errorf("an unexpected error") + } + return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil + } + + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderMultiFailureOnComplete(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.CompleteMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) { + return nil, fmt.Errorf("complete multipart error") + } + + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(buf20MB), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", + "CompleteMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderMultiFailureOnCreate(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + c.CreateMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) { + return nil, fmt.Errorf("create multipart upload failure") + } + + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(make([]byte, 1024*1024*12)), + }) + + if err == nil { + t.Error("expect error, got nil") + } + + if diff := cmpDiff([]string{"CreateMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +type failreader struct { + times int + failCount int +} + +func (f *failreader) Read(b []byte) (int, error) { + f.failCount++ + if f.failCount >= f.times { + return 0, fmt.Errorf("random failure") + } + return len(b), nil +} + +func TestUploadOrderReadFail1(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &failreader{times: 1}, + }) + if err == nil { + t.Fatalf("expect error to not be nil") + } + + if e, a := "random failure", err.Error(); !strings.Contains(a, e) { + t.Errorf("expect %v, got %v", e, a) + } + + if diff := cmpDiff([]string(nil), *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderReadFail2(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient([]string{"UploadPart"}) + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &failreader{times: 2}, + }) + if err == nil { + t.Fatalf("expect error to not be nil") + } + + if e, a := "random failure", err.Error(); !strings.Contains(a, e) { + t.Errorf("expect %v, got %q", e, a) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } +} + +type sizedReader struct { + size int + cur int + err error +} + +func (s *sizedReader) Read(p []byte) (n int, err error) { + if s.cur >= s.size { + if s.err == nil { + s.err = io.EOF + } + return 0, s.err + } + + n = len(p) + s.cur += len(p) + if s.cur > s.size { + n -= s.cur - s.size + } + + return n, err +} + +func TestUploadOrderMultiBufferedReader(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 21}, + }) + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", + "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + // Part lengths + var parts []int64 + for i := 1; i <= 3; i++ { + parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i] < parts[j] + }) + + if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderMultiBufferedReaderPartial(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 21, err: io.EOF}, + }) + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", + "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + // Part lengths + var parts []int64 + for i := 1; i <= 3; i++ { + parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i] < parts[j] + }) + + if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { + t.Error(diff) + } +} + +// TestUploadOrderMultiBufferedReaderEOF tests the edge case where the +// file size is the same as part size. +func TestUploadOrderMultiBufferedReaderEOF(t *testing.T) { + c, invocations, params := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 16, err: io.EOF}, + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + // Part lengths + var parts []int64 + for i := 1; i <= 2; i++ { + parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body)) + } + sort.Slice(parts, func(i, j int) bool { + return parts[i] < parts[j] + }) + + if diff := cmpDiff([]int64{1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 { + t.Error(diff) + } +} + +func TestUploadOrderSingleBufferedReader(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &sizedReader{size: 1024 * 1024 * 2}, + }) + + if err != nil { + t.Errorf("expect no error, got %v", err) + } + + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Error(diff) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect no value, got %q", resp.UploadID) + } +} + +func TestUploadZeroLenObject(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + + mgr := New(c, Options{}) + resp, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: strings.NewReader(""), + }) + + if err != nil { + t.Errorf("expect no error but received %v", err) + } + if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 { + t.Errorf("expect request to have been made, but was not, %v", diff) + } + + if len(resp.UploadID) > 0 { + t.Errorf("expect empty string, but received %q", resp.UploadID) + } +} + +type testIncompleteReader struct { + Size int64 + read int64 +} + +func (r *testIncompleteReader) Read(p []byte) (n int, err error) { + r.read += int64(len(p)) + if r.read >= r.Size { + return int(r.read - r.Size), io.ErrUnexpectedEOF + } + return len(p), nil +} + +func TestUploadUnexpectedEOF(t *testing.T) { + c, invocations, _ := s3testing.NewUploadLoggingClient(nil) + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 1 + o.PartSizeBytes = minPartSizeBytes + }) + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: &testIncompleteReader{ + Size: minPartSizeBytes + 1, + }, + }) + if err == nil { + t.Error("expect error, got nil") + } + + // Ensure upload started. + if e, a := "CreateMultipartUpload", (*invocations)[0]; e != a { + t.Errorf("expect %q, got %q", e, a) + } + + // Part may or may not be sent because of timing of sending parts and + // reading next part in upload manager. Just check for the last abort. + if e, a := "AbortMultipartUpload", (*invocations)[len(*invocations)-1]; e != a { + t.Errorf("expect %q, got %q", e, a) + } +} + +func TestSSE(t *testing.T) { + c, _, _ := s3testing.NewUploadLoggingClient(nil) + c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) { + if params.SSECustomerAlgorithm == nil { + t.Fatal("SSECustomerAlgoritm should not be nil") + } + if params.SSECustomerKey == nil { + t.Fatal("SSECustomerKey should not be nil") + } + return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil + } + + mgr := New(c, Options{}, func(o *Options) { + o.Concurrency = 5 + }) + + _, err := mgr.PutObject(context.Background(), &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + SSECustomerAlgorithm: "AES256", + SSECustomerKey: "foo", + Body: bytes.NewBuffer(make([]byte, 1024*1024*10)), + }) + + if err != nil { + t.Fatal("Expected no error, but received" + err.Error()) + } +} + +func TestUploadWithContextCanceled(t *testing.T) { + c := s3.New(s3.Options{ + UsePathStyle: true, + Region: "mock-region", + }) + u := New(c, Options{}) + + ctx := &awstesting.FakeContext{DoneCh: make(chan struct{})} + ctx.Error = fmt.Errorf("context canceled") + close(ctx.DoneCh) + + _, err := u.PutObject(ctx, &PutObjectInput{ + Bucket: "Bucket", + Key: "Key", + Body: bytes.NewReader(make([]byte, 0)), + }) + if err == nil { + t.Fatalf("expect error, got nil") + } + + if e, a := "canceled", err.Error(); !strings.Contains(a, e) { + t.Errorf("expected error message to contain %q, but did not %q", e, a) + } +} + +func TestUploadRetry(t *testing.T) { + const part, retries = 3, 10 + testFile, testFileCleanup, err := createTempFile(t, minPartSizeBytes*part) + if err != nil { + t.Fatalf("failed to create test file, %v", err) + } + defer testFileCleanup(t) + + cases := map[string]struct { + Body io.Reader + PartHandlers func(testing.TB) []http.Handler + }{ + "bytes.Buffer": { + Body: bytes.NewBuffer(make([]byte, minPartSizeBytes*part)), + PartHandlers: func(tb testing.TB) []http.Handler { + return buildFailHandlers(tb, part, retries) + }, + }, + "bytes.Reader": { + Body: bytes.NewReader(make([]byte, minPartSizeBytes*part)), + PartHandlers: func(tb testing.TB) []http.Handler { + return buildFailHandlers(tb, part, retries) + }, + }, + "os.File": { + Body: testFile, + PartHandlers: func(tb testing.TB) []http.Handler { + return buildFailHandlers(tb, part, retries) + }, + }, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + restoreSleep := sdk.TestingUseNopSleep() + defer restoreSleep() + + mux := newMockS3UploadServer(t, c.PartHandlers(t)) + server := httptest.NewServer(mux) + defer server.Close() + + client := s3.New(s3.Options{ + EndpointResolverV2: s3testing.EndpointResolverV2{URL: server.URL}, + UsePathStyle: true, + Retryer: retry.NewStandard(func(o *retry.StandardOptions) { + o.MaxAttempts = retries + 1 + }), + }) + + uploader := New(client, Options{}) + _, err := uploader.PutObject(context.Background(), &PutObjectInput{ + Bucket: "bucket", + Key: "key", + Body: c.Body, + }) + + if err != nil { + t.Fatalf("expect no error, got %v", err) + } + }) + } +} + +func newMockS3UploadServer(tb testing.TB, partHandler []http.Handler) *mockS3UploadServer { + s := &mockS3UploadServer{ + ServeMux: http.NewServeMux(), + partHandlers: partHandler, + tb: tb, + } + + s.HandleFunc("/", s.handleRequest) + + return s +} + +func buildFailHandlers(tb testing.TB, part, retry int) []http.Handler { + handlers := make([]http.Handler, part) + + for i := 0; i < part; i++ { + handlers[i] = &failPartHandler{ + tb: tb, + failLeft: retry, + successPartHandler: &successPartHandler{tb: tb}, + } + } + + return handlers +} + +type mockS3UploadServer struct { + *http.ServeMux + + tb testing.TB + partHandlers []http.Handler +} + +func (s mockS3UploadServer) handleRequest(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := r.Body.Close(); err != nil { + failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) + } + }() + + _, hasUploads := r.URL.Query()["uploads"] + + switch { + case r.Method == "POST" && hasUploads: + // CreateMultipartUpload request + w.Header().Set("Content-Length", strconv.Itoa(len(createUploadResp))) + w.Write([]byte(createUploadResp)) + case r.Method == "PUT": + partStr := r.URL.Query().Get("partNumber") + part, err := strconv.ParseInt(partStr, 10, 64) + if err != nil { + failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to parse partNumber, %q, %v", partStr, err)) + return + } + if part <= 0 || part > int64(len(s.partHandlers)) { + failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid partNumber %v", part)) + return + } + s.partHandlers[part-1].ServeHTTP(w, r) + case r.Method == "POST": + // CompleteMultipartUpload request + w.Header().Set("Content-Length", strconv.Itoa(len(completeUploadResp))) + w.Write([]byte(completeUploadResp)) + case r.Method == "DELETE": + w.Header().Set("Content-Length", strconv.Itoa(len(abortUploadResp))) + w.Write([]byte(abortUploadResp)) + w.WriteHeader(200) + default: + failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid request %v %v", r.Method, r.URL)) + } +} + +func createTempFile(t *testing.T, size int64) (*os.File, func(*testing.T), error) { + file, err := ioutil.TempFile(os.TempDir(), aws.SDKName+t.Name()) + if err != nil { + return nil, nil, err + } + filename := file.Name() + if err := file.Truncate(size); err != nil { + return nil, nil, err + } + + return file, + func(t *testing.T) { + if err := file.Close(); err != nil { + t.Errorf("failed to close temp file, %s, %v", filename, err) + } + if err := os.Remove(filename); err != nil { + t.Errorf("failed to remove temp file, %s, %v", filename, err) + } + }, + nil +} + +type failPartHandler struct { + tb testing.TB + failLeft int + successPartHandler http.Handler +} + +func (h *failPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := r.Body.Close(); err != nil { + failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) + } + }() + + if h.failLeft == 0 && h.successPartHandler != nil { + h.successPartHandler.ServeHTTP(w, r) + return + } + + io.Copy(ioutil.Discard, r.Body) + failRequest(w, 500, "InternalException", fmt.Sprintf("mock error, partNumber %v", r.URL.Query().Get("partNumber"))) + h.failLeft-- +} + +func failRequest(w http.ResponseWriter, status int, code, msg string) { + msg = fmt.Sprintf(baseRequestErrorResp, code, msg) + w.Header().Set("Content-Length", strconv.Itoa(len(msg))) + w.WriteHeader(status) + w.Write([]byte(msg)) +} + +type successPartHandler struct { + tb testing.TB +} + +func (h *successPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer func() { + if err := r.Body.Close(); err != nil { + failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err)) + } + }() + + n, err := io.Copy(ioutil.Discard, r.Body) + if err != nil { + failRequest(w, 400, "BadRequest", fmt.Sprintf("failed to read body, %v", err)) + return + } + contentLength := r.Header.Get("Content-Length") + expectLength, err := strconv.ParseInt(contentLength, 10, 64) + if err != nil { + h.tb.Logf("expect content-length, got %q, %v", contentLength, err) + failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to get content-length %v", err)) + return + } + + if e, a := expectLength, n; e != a { + h.tb.Logf("expect content-length to be %v, got %v", e, a) + failRequest(w, 400, "BadRequest", fmt.Sprintf("content-length and body do not match, %v, %v", e, a)) + return + } + + w.Header().Set("Content-Length", strconv.Itoa(len(uploadPartResp))) + w.Write([]byte(uploadPartResp)) +} + +const createUploadResp = ` + bucket + key + abc123 +` + +const uploadPartResp = ` + key +` +const baseRequestErrorResp = ` + %s + %s + request-id + host-id +` + +const completeUploadResp = ` + bucket + key + key + https://bucket.us-west-2.amazonaws.com/key + abc123 +` + +const abortUploadResp = `` + +func cmpDiff(e, a interface{}) string { + if !reflect.DeepEqual(e, a) { + return fmt.Sprintf("%v != %v", e, a) + } + return "" +} diff --git a/feature/s3/transfermanager/shared_test.go b/feature/s3/transfermanager/shared_test.go new file mode 100644 index 00000000000..364423e96c2 --- /dev/null +++ b/feature/s3/transfermanager/shared_test.go @@ -0,0 +1,4 @@ +package transfermanager + +var buf20MB = make([]byte, 1024*1024*20) +var buf2MB = make([]byte, 1024*1024*2) diff --git a/feature/s3/transfermanager/types/types.go b/feature/s3/transfermanager/types/types.go new file mode 100644 index 00000000000..8a2d877e461 --- /dev/null +++ b/feature/s3/transfermanager/types/types.go @@ -0,0 +1,346 @@ +package types + +import ( + "io" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the +// SDK to accept an io.Reader that is not also an io.Seeker for unsigned +// streaming payload API operations. +// +// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's +// input will prevent that operation being retried in the case of +// network errors, and cause operation requests to fail if the operation +// requires payload signing. +func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser { + return &ReaderSeekerCloser{r} +} + +// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and +// io.Closer interfaces to the underlying object if they are available. +type ReaderSeekerCloser struct { + r io.Reader +} + +// SeekerLen attempts to get the number of bytes remaining at the seeker's +// current position. Returns the number of bytes remaining or error. +func SeekerLen(s io.Seeker) (int64, error) { + // Determine if the seeker is actually seekable. ReaderSeekerCloser + // hides the fact that a io.Readers might not actually be seekable. + switch v := s.(type) { + case *ReaderSeekerCloser: + return v.GetLen() + } + + return computeSeekerLength(s) +} + +// GetLen returns the length of the bytes remaining in the underlying reader. +// Checks first for Len(), then io.Seeker to determine the size of the +// underlying reader. +// +// Will return -1 if the length cannot be determined. +func (r *ReaderSeekerCloser) GetLen() (int64, error) { + if l, ok := r.HasLen(); ok { + return int64(l), nil + } + + if s, ok := r.r.(io.Seeker); ok { + return computeSeekerLength(s) + } + + return -1, nil +} + +func computeSeekerLength(s io.Seeker) (int64, error) { + curOffset, err := s.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + + endOffset, err := s.Seek(0, io.SeekEnd) + if err != nil { + return 0, err + } + + _, err = s.Seek(curOffset, io.SeekStart) + if err != nil { + return 0, err + } + + return endOffset - curOffset, nil +} + +// HasLen returns the length of the underlying reader if the value implements +// the Len() int method. +func (r *ReaderSeekerCloser) HasLen() (int, bool) { + type lenner interface { + Len() int + } + + if lr, ok := r.r.(lenner); ok { + return lr.Len(), true + } + + return 0, false +} + +// Read reads from the reader up to size of p. The number of bytes read, and +// error if it occurred will be returned. +// +// If the reader is not an io.Reader zero bytes read, and nil error will be +// returned. +// +// Performs the same functionality as io.Reader Read +func (r *ReaderSeekerCloser) Read(p []byte) (int, error) { + switch t := r.r.(type) { + case io.Reader: + return t.Read(p) + } + return 0, nil +} + +// Seek sets the offset for the next Read to offset, interpreted according to +// whence: 0 means relative to the origin of the file, 1 means relative to the +// current offset, and 2 means relative to the end. Seek returns the new offset +// and an error, if any. +// +// If the ReaderSeekerCloser is not an io.Seeker nothing will be done. +func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) { + switch t := r.r.(type) { + case io.Seeker: + return t.Seek(offset, whence) + } + return int64(0), nil +} + +// IsSeeker returns if the underlying reader is also a seeker. +func (r *ReaderSeekerCloser) IsSeeker() bool { + _, ok := r.r.(io.Seeker) + return ok +} + +// Close closes the ReaderSeekerCloser. +// +// If the ReaderSeekerCloser is not an io.Closer nothing will be done. +func (r *ReaderSeekerCloser) Close() error { + switch t := r.r.(type) { + case io.Closer: + return t.Close() + } + return nil +} + +// ChecksumAlgorithm indicates the algorithm used to create the checksum for the object +type ChecksumAlgorithm string + +// Enum values for ChecksumAlgorithm +const ( + ChecksumAlgorithmCrc32 ChecksumAlgorithm = "CRC32" + ChecksumAlgorithmCrc32c = "CRC32C" + ChecksumAlgorithmSha1 = "SHA1" + ChecksumAlgorithmSha256 = "SHA256" +) + +// ObjectCannedACL defines the canned ACL to apply to the object, see [Canned ACL] in the +// Amazon S3 User Guide. +type ObjectCannedACL string + +// Enum values for ObjectCannedACL +const ( + ObjectCannedACLPrivate ObjectCannedACL = "private" + ObjectCannedACLPublicRead = "public-read" + ObjectCannedACLPublicReadWrite = "public-read-write" + ObjectCannedACLAuthenticatedRead = "authenticated-read" + ObjectCannedACLAwsExecRead = "aws-exec-read" + ObjectCannedACLBucketOwnerRead = "bucket-owner-read" + ObjectCannedACLBucketOwnerFullControl = "bucket-owner-full-control" +) + +// Values returns all known values for ObjectCannedACL. Note that this can be +// expanded in the future, and so it is only as up to date as the client. +// +// The ordering of this slice is not guaranteed to be stable across updates. +func (ObjectCannedACL) Values() []ObjectCannedACL { + return []ObjectCannedACL{ + "private", + "public-read", + "public-read-write", + "authenticated-read", + "aws-exec-read", + "bucket-owner-read", + "bucket-owner-full-control", + } +} + +// ObjectLockLegalHoldStatus specifies whether a legal hold will be applied to this object. For more +// information about S3 Object Lock, see [Object Lock] in the Amazon S3 User Guide. +type ObjectLockLegalHoldStatus string + +// Enum values for ObjectLockLegalHoldStatus +const ( + ObjectLockLegalHoldStatusOn ObjectLockLegalHoldStatus = "ON" + ObjectLockLegalHoldStatusOff = "OFF" +) + +// ObjectLockMode is the Object Lock mode that you want to apply to this object. +type ObjectLockMode string + +// Enum values for ObjectLockMode +const ( + ObjectLockModeGovernance ObjectLockMode = "GOVERNANCE" + ObjectLockModeCompliance = "COMPLIANCE" +) + +// RequestPayer confirms that the requester knows that they will be charged for the request. +// Bucket owners need not specify this parameter in their requests. If either the +// source or destination S3 bucket has Requester Pays enabled, the requester will +// pay for corresponding charges to copy the object. For information about +// downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User +// Guide. +type RequestPayer string + +// Enum values for RequestPayer +const ( + RequestPayerRequester RequestPayer = "requester" +) + +// ServerSideEncryption indicates the server-side encryption algorithm that was used when you store this object +// in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ) +type ServerSideEncryption string + +// Enum values for ServerSideEncryption +const ( + ServerSideEncryptionAes256 ServerSideEncryption = "AES256" + ServerSideEncryptionAwsKms = "aws:kms" + ServerSideEncryptionAwsKmsDsse = "aws:kms:dsse" +) + +// StorageClass specifies class to store newly created +// objects, which has default value of STANDARD. For more information, see +// [Storage Classes] in the Amazon S3 User Guide. +type StorageClass string + +// Enum values for StorageClass +const ( + StorageClassStandard StorageClass = "STANDARD" + StorageClassReducedRedundancy = "REDUCED_REDUNDANCY" + StorageClassStandardIa = "STANDARD_IA" + StorageClassOnezoneIa = "ONEZONE_IA" + StorageClassIntelligentTiering = "INTELLIGENT_TIERING" + StorageClassGlacier = "GLACIER" + StorageClassDeepArchive = "DEEP_ARCHIVE" + StorageClassOutposts = "OUTPOSTS" + StorageClassGlacierIr = "GLACIER_IR" + StorageClassSnow = "SNOW" + StorageClassExpressOnezone = "EXPRESS_ONEZONE" +) + +// CompletedPart includes details of the parts that were uploaded. +type CompletedPart struct { + + // The base64-encoded, 32-bit CRC32 checksum of the object. This will only be + // present if it was uploaded with the object. When you use an API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumCRC32 *string + + // The base64-encoded, 32-bit CRC32C checksum of the object. This will only be + // present if it was uploaded with the object. When you use an API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumCRC32C *string + + // The base64-encoded, 160-bit SHA-1 digest of the object. This will only be + // present if it was uploaded with the object. When you use the API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumSHA1 *string + + // The base64-encoded, 256-bit SHA-256 digest of the object. This will only be + // present if it was uploaded with the object. When you use an API operation on an + // object that was uploaded using multipart uploads, this value may not be a direct + // checksum value of the full object. Instead, it's a calculation based on the + // checksum values of each individual part. For more information about how + // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User + // Guide. + // + // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums + ChecksumSHA256 *string + + // Entity tag returned when the part was uploaded. + ETag *string + + // Part number that identifies the part. This is a positive integer between 1 and + // 10,000. + // + // - General purpose buckets - In CompleteMultipartUpload , when a additional + // checksum (including x-amz-checksum-crc32 , x-amz-checksum-crc32c , + // x-amz-checksum-sha1 , or x-amz-checksum-sha256 ) is applied to each part, the + // PartNumber must start at 1 and the part numbers must be consecutive. + // Otherwise, Amazon S3 generates an HTTP 400 Bad Request status code and an + // InvalidPartOrder error code. + // + // - Directory buckets - In CompleteMultipartUpload , the PartNumber must start + // at 1 and the part numbers must be consecutive. + PartNumber *int32 +} + +// MapCompletedPart maps CompletedPart to s3 types +func (cp CompletedPart) MapCompletedPart() types.CompletedPart { + return types.CompletedPart{ + ChecksumCRC32: cp.ChecksumCRC32, + ChecksumCRC32C: cp.ChecksumCRC32C, + ChecksumSHA1: cp.ChecksumSHA1, + ChecksumSHA256: cp.ChecksumSHA256, + ETag: cp.ETag, + PartNumber: cp.PartNumber, + } +} + +// MapFrom set CompletedPart fields from s3 UploadPartOutput +func (cp *CompletedPart) MapFrom(resp *s3.UploadPartOutput, partNum *int32) { + cp.ChecksumCRC32 = resp.ChecksumCRC32 + cp.ChecksumCRC32C = resp.ChecksumCRC32C + cp.ChecksumSHA1 = resp.ChecksumSHA1 + cp.ChecksumSHA256 = resp.ChecksumSHA256 + cp.ETag = resp.ETag + cp.PartNumber = partNum +} + +// RequestCharged indicates that the requester was successfully charged for the request. +type RequestCharged string + +// Enum values for RequestCharged +const ( + RequestChargedRequester RequestCharged = "requester" +) + +// Metadata provides storing and reading metadata values. Keys may be any +// comparable value type. Get and set will panic if key is not a comparable +// value type. +// +// Metadata uses lazy initialization, and Set method must be called as an +// addressable value, or pointer. Not doing so may cause key/value pair to not +// be set. +type Metadata struct { + values map[interface{}]interface{} +} From ff114b75e4af4ae0bffd697edbcf3ee2232d2ddb Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Fri, 1 Nov 2024 11:31:37 -0400 Subject: [PATCH 02/12] change pool to store slice pointer --- feature/s3/transfermanager/api_op_PutObject.go | 4 ++-- feature/s3/transfermanager/pool.go | 15 ++++++++------- feature/s3/transfermanager/pool_test.go | 5 +++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go index bba6b8036d3..0bca2aca0f7 100644 --- a/feature/s3/transfermanager/api_op_PutObject.go +++ b/feature/s3/transfermanager/api_op_PutObject.go @@ -753,12 +753,12 @@ func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), erro return nil, 0, func() {}, err } - n, err := readFillBuf(u.in.Body, part) + n, err := readFillBuf(u.in.Body, *part) cleanup := func() { u.partPool.Put(part) } - return bytes.NewReader(part[0:n]), n, cleanup, err + return bytes.NewReader((*part)[0:n]), n, cleanup, err } func readFillBuf(r io.Reader, b []byte) (offset int, err error) { diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index d681861a121..04b7aec640e 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -6,21 +6,22 @@ import ( ) type bytesBufferPool interface { - Get(context.Context) ([]byte, error) - Put([]byte) + Get(context.Context) (*[]byte, error) + Put(*[]byte) Close() } type defaultSlicePool struct { - slices chan []byte + slices chan *[]byte } func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { p := &defaultSlicePool{} - slices := make(chan []byte, capacity) + slices := make(chan *[]byte, capacity) for i := 0; i < capacity; i++ { - slices <- make([]byte, sliceSize) + s := make([]byte, sliceSize) + slices <- &s } p.slices = slices @@ -29,7 +30,7 @@ func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") -func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { +func (p *defaultSlicePool) Get(ctx context.Context) (*[]byte, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -50,7 +51,7 @@ func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { } } -func (p *defaultSlicePool) Put(bs []byte) { +func (p *defaultSlicePool) Put(bs *[]byte) { p.slices <- bs } diff --git a/feature/s3/transfermanager/pool_test.go b/feature/s3/transfermanager/pool_test.go index d6e107bc98c..b30d0d710dc 100644 --- a/feature/s3/transfermanager/pool_test.go +++ b/feature/s3/transfermanager/pool_test.go @@ -9,7 +9,7 @@ import ( func TestDefaultSlicePool(t *testing.T) { pool := newDefaultSlicePool(1, 2) - var bs []byte + var bs *[]byte var err error var wg sync.WaitGroup @@ -17,7 +17,8 @@ func TestDefaultSlicePool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - pool.Put(bs) + bs := make([]byte, 1) + pool.Put(&bs) }() } // wait for a slice to be put back From 32c3e06cf4c66bdb7ea9fb333757d2cd8d3780c2 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Mon, 4 Nov 2024 17:17:05 -0500 Subject: [PATCH 03/12] add integ test for putobject --- .../s3/transfermanager/api_op_PutObject.go | 4 +- feature/s3/transfermanager/pool.go | 16 +- feature/s3/transfermanager/pool_test.go | 5 +- .../internal/integrationtest/s3/setup_test.go | 4 +- .../s3transfermanager/api_test.go | 24 ++ .../s3transfermanager/setup_test.go | 259 ++++++++++++++++++ 6 files changed, 296 insertions(+), 16 deletions(-) create mode 100644 service/internal/integrationtest/s3transfermanager/api_test.go create mode 100644 service/internal/integrationtest/s3transfermanager/setup_test.go diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go index 0bca2aca0f7..bba6b8036d3 100644 --- a/feature/s3/transfermanager/api_op_PutObject.go +++ b/feature/s3/transfermanager/api_op_PutObject.go @@ -753,12 +753,12 @@ func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), erro return nil, 0, func() {}, err } - n, err := readFillBuf(u.in.Body, *part) + n, err := readFillBuf(u.in.Body, part) cleanup := func() { u.partPool.Put(part) } - return bytes.NewReader((*part)[0:n]), n, cleanup, err + return bytes.NewReader(part[0:n]), n, cleanup, err } func readFillBuf(r io.Reader, b []byte) (offset int, err error) { diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index 04b7aec640e..a4a786e2357 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -6,22 +6,21 @@ import ( ) type bytesBufferPool interface { - Get(context.Context) (*[]byte, error) - Put(*[]byte) + Get(context.Context) ([]byte, error) + Put([]byte) Close() } type defaultSlicePool struct { - slices chan *[]byte + slices chan []byte } func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { p := &defaultSlicePool{} - slices := make(chan *[]byte, capacity) + slices := make(chan []byte, capacity) for i := 0; i < capacity; i++ { - s := make([]byte, sliceSize) - slices <- &s + slices <- make([]byte, sliceSize) } p.slices = slices @@ -30,7 +29,7 @@ func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") -func (p *defaultSlicePool) Get(ctx context.Context) (*[]byte, error) { +func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -46,12 +45,11 @@ func (p *defaultSlicePool) Get(ctx context.Context) (*[]byte, error) { return bs, nil case <-ctx.Done(): return nil, ctx.Err() - default: } } } -func (p *defaultSlicePool) Put(bs *[]byte) { +func (p *defaultSlicePool) Put(bs []byte) { p.slices <- bs } diff --git a/feature/s3/transfermanager/pool_test.go b/feature/s3/transfermanager/pool_test.go index b30d0d710dc..74a6d9b49b0 100644 --- a/feature/s3/transfermanager/pool_test.go +++ b/feature/s3/transfermanager/pool_test.go @@ -9,7 +9,7 @@ import ( func TestDefaultSlicePool(t *testing.T) { pool := newDefaultSlicePool(1, 2) - var bs *[]byte + var bs []byte var err error var wg sync.WaitGroup @@ -17,8 +17,7 @@ func TestDefaultSlicePool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - bs := make([]byte, 1) - pool.Put(&bs) + pool.Put(make([]byte, 1)) }() } // wait for a slice to be put back diff --git a/service/internal/integrationtest/s3/setup_test.go b/service/internal/integrationtest/s3/setup_test.go index ea00037d6e3..e164336f6be 100644 --- a/service/internal/integrationtest/s3/setup_test.go +++ b/service/internal/integrationtest/s3/setup_test.go @@ -346,8 +346,8 @@ func testWriteToObject(t *testing.T, bucket string, testData writeToObjectTestDa } } else { - if len(testData.ExpectError) != 0 { - t.Fatalf("expected error: %v, got none", err) + if e := testData.ExpectError; len(e) != 0 { + t.Fatalf("expected error: %v, got none", e) } } diff --git a/service/internal/integrationtest/s3transfermanager/api_test.go b/service/internal/integrationtest/s3transfermanager/api_test.go new file mode 100644 index 00000000000..67b3de1881d --- /dev/null +++ b/service/internal/integrationtest/s3transfermanager/api_test.go @@ -0,0 +1,24 @@ +//go:build integration +// +build integration + +package s3transfermanager + +import ( + "bytes" + "strings" + "testing" +) + +func TestInteg_PutObject(t *testing.T) { + cases := map[string]putObjectTestData{ + "seekable body": {Body: strings.NewReader("hello world"), ExpectBody: []byte("hello world")}, + "empty string body": {Body: strings.NewReader(""), ExpectBody: []byte("")}, + "multipart upload body": {Body: bytes.NewReader(largeObjectBuf), ExpectBody: largeObjectBuf}, + } + + for name, c := range cases { + t.Run(name, func(t *testing.T) { + testPutObject(t, setupMetadata.Buckets.Source.Name, c) + }) + } +} diff --git a/service/internal/integrationtest/s3transfermanager/setup_test.go b/service/internal/integrationtest/s3transfermanager/setup_test.go new file mode 100644 index 00000000000..6ac6a16f4a9 --- /dev/null +++ b/service/internal/integrationtest/s3transfermanager/setup_test.go @@ -0,0 +1,259 @@ +//go:build integration +// +build integration + +package s3transfermanager + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/tls" + "flag" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/arn" + tm "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager" + "github.com/aws/aws-sdk-go-v2/service/internal/integrationtest" + "github.com/aws/aws-sdk-go-v2/service/internal/integrationtest/s3shared" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/sts" +) + +var setupMetadata = struct { + AccountID string + Region string + Buckets struct { + Source struct { + Name string + ARN string + } + } +}{} + +// s3 client to use for integ testing +var s3Client *s3.Client + +// s3TransferManagerClient to use for integ testing +var s3TransferManagerClient *tm.Client + +// sts client to use for integ testing +var stsClient *sts.Client + +// http client setting to use for integ testing +var httpClient *http.Client + +var region = "us-west-2" + +// large object buffer to test multipart upload +var largeObjectBuf []byte + +// TestMain executes at start of package tests +func TestMain(m *testing.M) { + flag.Parse() + flag.CommandLine.Visit(func(f *flag.Flag) { + if !(f.Name == "run" || f.Name == "test.run") { + return + } + value := f.Value.String() + if value == `NONE` { + os.Exit(0) + } + }) + + var result int + defer func() { + if r := recover(); r != nil { + fmt.Fprintln(os.Stderr, "S3 TransferManager integration tests panic,", r) + result = 1 + } + os.Exit(result) + }() + + var verifyTLS bool + var s3Endpoint string + + flag.StringVar(&s3Endpoint, "s3-endpoint", "", "integration endpoint for S3") + + flag.StringVar(&setupMetadata.AccountID, "account", "", "integration account id") + flag.BoolVar(&verifyTLS, "verify-tls", true, "verify server TLS certificate") + flag.Parse() + + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: verifyTLS}, + }, + } + + cfg, err := integrationtest.LoadConfigWithDefaultRegion(region) + if err != nil { + fmt.Fprintf(os.Stderr, "Error occurred while loading config with region %v, %v", region, err) + result = 1 + return + } + + // assign the http client + cfg.HTTPClient = httpClient + + // create a s3 client + s3cfg := cfg.Copy() + if len(s3Endpoint) != 0 { + s3cfg.EndpointResolver = aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: s3Endpoint, + PartitionID: "aws", + SigningName: "s3", + SigningRegion: region, + SigningMethod: "s3v4", + }, nil + }) + } + + // build s3 client from config + s3Client = s3.NewFromConfig(s3cfg) + + // build s3 transfermanager client from config + s3TransferManagerClient = tm.NewFromConfig(s3Client, s3cfg) + + // build sts client from config + stsClient = sts.NewFromConfig(cfg) + + // context + ctx := context.Background() + + setupMetadata.AccountID, err = getAccountID(ctx) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to get integration aws account id: %v\n", err) + result = 1 + return + } + + bucketCleanup, err := setupBuckets(ctx) + defer bucketCleanup() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to setup integration test buckets: %v\n", err) + result = 1 + return + } + + largeObjectBuf = make([]byte, 20*1024*1024) + _, err = rand.Read(largeObjectBuf) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to generate large object for multipart upload: %v\n", err) + result = 1 + return + } + + result = m.Run() +} + +// getAccountID retrieves account id +func getAccountID(ctx context.Context) (string, error) { + if len(setupMetadata.AccountID) != 0 { + return setupMetadata.AccountID, nil + } + identity, err := stsClient.GetCallerIdentity(ctx, nil) + if err != nil { + return "", fmt.Errorf("error fetching caller identity, %w", err) + } + return *identity.Account, nil +} + +// setupBuckets creates buckets needed for integration test +func setupBuckets(ctx context.Context) (func(), error) { + var cleanups []func() + + cleanup := func() { + for i := range cleanups { + cleanups[i]() + } + } + + bucketCreates := []struct { + name *string + arn *string + }{ + {name: &setupMetadata.Buckets.Source.Name, arn: &setupMetadata.Buckets.Source.ARN}, + } + + for _, bucket := range bucketCreates { + *bucket.name = s3shared.GenerateBucketName() + + if err := s3shared.SetupBucket(ctx, s3Client, *bucket.name); err != nil { + return cleanup, err + } + + // Compute ARN + bARN := arn.ARN{ + Partition: "aws", + Service: "s3", + Region: region, + AccountID: setupMetadata.AccountID, + Resource: fmt.Sprintf("bucket_name:%s", *bucket.name), + }.String() + + *bucket.arn = bARN + + bucketName := *bucket.name + cleanups = append(cleanups, func() { + if err := s3shared.CleanupBucket(ctx, s3Client, bucketName); err != nil { + fmt.Fprintln(os.Stderr, err) + } + }) + } + + return cleanup, nil +} + +type putObjectTestData struct { + Body io.Reader + ExpectBody []byte + ExpectError string +} + +func testPutObject(t *testing.T, bucket string, testData putObjectTestData, opts ...func(options *tm.Options)) { + key := integrationtest.UniqueID() + + _, err := s3TransferManagerClient.PutObject(context.Background(), + &tm.PutObjectInput{ + Bucket: bucket, + Key: key, + Body: testData.Body, + }, opts...) + if err != nil { + if len(testData.ExpectError) == 0 { + t.Fatalf("expect no error, got %v", err) + } + if e, a := testData.ExpectError, err.Error(); !strings.Contains(a, e) { + t.Fatalf("expect error to contain %v, got %v", e, a) + } + } else { + if e := testData.ExpectError; len(e) != 0 { + t.Fatalf("expect error: %v, got none", e) + } + } + + if len(testData.ExpectError) != 0 { + return + } + + resp, err := s3Client.GetObject(context.Background(), + &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("expect no error, got %v", err) + } + + b, _ := ioutil.ReadAll(resp.Body) + if e, a := testData.ExpectBody, b; !bytes.EqualFold(e, a) { + t.Errorf("expect %s, got %s", e, a) + } +} From fe670516ce8fd34ceaa49d149a30ac77efe430a6 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Mon, 4 Nov 2024 23:51:41 -0500 Subject: [PATCH 04/12] update go mod --- feature/s3/transfermanager/go.mod | 4 +++- service/internal/integrationtest/go.mod | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/feature/s3/transfermanager/go.mod b/feature/s3/transfermanager/go.mod index 066daeb30ed..51338c4bf6d 100644 --- a/feature/s3/transfermanager/go.mod +++ b/feature/s3/transfermanager/go.mod @@ -3,7 +3,7 @@ module github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager go 1.20 require ( - github.com/aws/aws-sdk-go-v2 v1.30.3 + github.com/aws/aws-sdk-go-v2 v1.32.3 github.com/aws/aws-sdk-go-v2/config v1.27.27 github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 github.com/aws/smithy-go v1.20.3 @@ -25,3 +25,5 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect ) + +replace github.com/aws/aws-sdk-go-v2 => ../../../ diff --git a/service/internal/integrationtest/go.mod b/service/internal/integrationtest/go.mod index 87c5b3b83bb..f429c8b627a 100644 --- a/service/internal/integrationtest/go.mod +++ b/service/internal/integrationtest/go.mod @@ -4,6 +4,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.32.3 github.com/aws/aws-sdk-go-v2/config v1.28.1 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.35 + github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v1.0.0 github.com/aws/aws-sdk-go-v2/service/acm v1.30.3 github.com/aws/aws-sdk-go-v2/service/apigateway v1.27.3 github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.33.3 @@ -296,3 +297,5 @@ replace github.com/aws/aws-sdk-go-v2/service/wafregional => ../../../service/waf replace github.com/aws/aws-sdk-go-v2/service/wafv2 => ../../../service/wafv2/ replace github.com/aws/aws-sdk-go-v2/service/workspaces => ../../../service/workspaces/ + +replace github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager => ../../../feature/s3/transfermanager From 5744e645566003eafb1735c33163273461475e71 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Tue, 5 Nov 2024 13:11:11 -0500 Subject: [PATCH 05/12] minor changes for v0.1.0 --- feature/s3/transfermanager/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index a4a786e2357..51aa180ce57 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -56,7 +56,7 @@ func (p *defaultSlicePool) Put(bs []byte) { func (p *defaultSlicePool) Close() { close(p.slices) for range p.slices { - // drain channel + // drain the whole channel } p.slices = nil } From 4916e3ae3252a9a105163e89df3c46d8bb8c3de8 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Tue, 5 Nov 2024 13:15:44 -0500 Subject: [PATCH 06/12] update tags --- feature/s3/transfermanager/pool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index 51aa180ce57..003f62679f7 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -56,7 +56,6 @@ func (p *defaultSlicePool) Put(bs []byte) { func (p *defaultSlicePool) Close() { close(p.slices) for range p.slices { - // drain the whole channel } p.slices = nil } From 87da1104a875f361f48a77b8eb800f0d8d62c97f Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Tue, 5 Nov 2024 13:19:12 -0500 Subject: [PATCH 07/12] update tags --- feature/s3/transfermanager/pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index 003f62679f7..a4a786e2357 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -56,6 +56,7 @@ func (p *defaultSlicePool) Put(bs []byte) { func (p *defaultSlicePool) Close() { close(p.slices) for range p.slices { + // drain channel } p.slices = nil } From 99ef473369a87138df75e2252a0456b8ecf440e9 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Tue, 5 Nov 2024 13:30:21 -0500 Subject: [PATCH 08/12] update integ test dependency version --- feature/s3/transfermanager/pool.go | 4 ++-- service/internal/integrationtest/go.mod | 2 +- service/internal/integrationtest/go.sum | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index a4a786e2357..a75e2a54673 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -27,7 +27,7 @@ func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { return p } -var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") +var zeroCapacityErr = fmt.Errorf("get called on zero capacity pool") func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { select { @@ -40,7 +40,7 @@ func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { select { case bs, ok := <-p.slices: if !ok { - return nil, errZeroCapacity + return nil, zeroCapacityErr } return bs, nil case <-ctx.Done(): diff --git a/service/internal/integrationtest/go.mod b/service/internal/integrationtest/go.mod index f429c8b627a..0c918c3af93 100644 --- a/service/internal/integrationtest/go.mod +++ b/service/internal/integrationtest/go.mod @@ -4,7 +4,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.32.3 github.com/aws/aws-sdk-go-v2/config v1.28.1 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.35 - github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v1.0.0 + github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.1.0 github.com/aws/aws-sdk-go-v2/service/acm v1.30.3 github.com/aws/aws-sdk-go-v2/service/apigateway v1.27.3 github.com/aws/aws-sdk-go-v2/service/applicationautoscaling v1.33.3 diff --git a/service/internal/integrationtest/go.sum b/service/internal/integrationtest/go.sum index 609e6296b2c..7c80ae776bc 100644 --- a/service/internal/integrationtest/go.sum +++ b/service/internal/integrationtest/go.sum @@ -1,7 +1,8 @@ github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -10,5 +11,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= From 74f42d375d7cf8b14c4002c3a49a77a419dddcee Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Tue, 5 Nov 2024 13:34:40 -0500 Subject: [PATCH 09/12] change err var name --- feature/s3/transfermanager/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go index a75e2a54673..a4a786e2357 100644 --- a/feature/s3/transfermanager/pool.go +++ b/feature/s3/transfermanager/pool.go @@ -27,7 +27,7 @@ func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool { return p } -var zeroCapacityErr = fmt.Errorf("get called on zero capacity pool") +var errZeroCapacity = fmt.Errorf("get called on zero capacity pool") func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { select { @@ -40,7 +40,7 @@ func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) { select { case bs, ok := <-p.slices: if !ok { - return nil, zeroCapacityErr + return nil, errZeroCapacity } return bs, nil case <-ctx.Done(): From 8fcb58c597d59174e84812c49e1a9fb93d4e80e3 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Tue, 5 Nov 2024 13:44:10 -0500 Subject: [PATCH 10/12] update go mod --- feature/s3/transfermanager/go.mod | 13 ++++--------- feature/s3/transfermanager/go.sum | 20 ++------------------ 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/feature/s3/transfermanager/go.mod b/feature/s3/transfermanager/go.mod index 51338c4bf6d..ea0e3d4bbbd 100644 --- a/feature/s3/transfermanager/go.mod +++ b/feature/s3/transfermanager/go.mod @@ -1,29 +1,24 @@ module github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager -go 1.20 +go 1.21 + +toolchain go1.22.4 require ( github.com/aws/aws-sdk-go-v2 v1.32.3 - github.com/aws/aws-sdk-go-v2/config v1.27.27 github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 - github.com/aws/smithy-go v1.20.3 + github.com/aws/smithy-go v1.22.0 ) require ( github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect ) replace github.com/aws/aws-sdk-go-v2 => ../../../ diff --git a/feature/s3/transfermanager/go.sum b/feature/s3/transfermanager/go.sum index 2147a83948c..b9beb150394 100644 --- a/feature/s3/transfermanager/go.sum +++ b/feature/s3/transfermanager/go.sum @@ -1,19 +1,9 @@ -github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= -github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg= -github.com/aws/aws-sdk-go-v2/config v1.27.27 h1:HdqgGt1OAP0HkEDDShEl0oSYa9ZZBSOmKpdpsDMdO90= -github.com/aws/aws-sdk-go-v2/config v1.27.27/go.mod h1:MVYamCg76dFNINkZFu4n4RjDixhVr51HLj4ErWzrVwg= -github.com/aws/aws-sdk-go-v2/credentials v1.17.27 h1:2raNba6gr2IfA0eqqiP2XiQ0UVOpGPgDSi0I9iAP+UI= -github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 h1:KreluoV8FZDEtI6Co2xuNk/UqI9iwMrOx/87PBNIKqw= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11/go.mod h1:SeSUYBLsMYFoRvHE0Tjvn7kbxaUhl75CJi1sbfhMxkU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 h1:SoNJ4RlFEQEbtDcCEt+QG56MY4fm4W8rYirAmq+/DdU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15/go.mod h1:U9ke74k1n2bf+RIgoX1SXFed1HLs51OgUSs+Ph0KJP8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 h1:C6WHdGnTDIYETAm5iErQUiVNsclNx9qbJVPIt03B6bI= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15/go.mod h1:ZQLZqhcu+JhSrA9/NXRm8SkDvsycE+JkV3WGY41e+IM= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12 h1:DXFWyt7ymx/l1ygdyTTS0X923e+Q2wXIxConJzrgwc0= github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12/go.mod h1:mVOr/LbvaNySK1/BTy4cBOCjhCNY2raWBwK4v+WR5J4= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8= @@ -26,11 +16,5 @@ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12 h1:tzha+v1SCEBpX github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12/go.mod h1:n+nt2qjHGoseWeLHt1vEr6ZRCCxIN2KcNpJxBcYQSwI= github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 h1:wsg9Z/vNnCmxWikfGIoOlnExtEU459cR+2d+iDJ8elo= github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1/go.mod h1:8rDw3mVwmvIWWX/+LWY3PPIMZuwnQdJMCt0iVFVT3qw= -github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 h1:BXx0ZIxvrJdSgSvKTZ+yRBeSqqgPM89VPlulEcl37tM= -github.com/aws/aws-sdk-go-v2/service/sso v1.22.4/go.mod h1:ooyCOXjvJEsUw7x+ZDHeISPMhtwI3ZCB7ggFMcFfWLU= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 h1:yiwVzJW2ZxZTurVbYWA7QOrAaCYQR72t0wrSBfoesUE= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4/go.mod h1:0oxfLkpz3rQ/CHlx5hB7H69YUpFiI1tql6Q6Ne+1bCw= -github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudrvuKpDKgMVRlepGE= -github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ= -github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= -github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= From 52b37fe04ccc360b83684000c905b180aa693fbd Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Wed, 6 Nov 2024 12:41:22 -0500 Subject: [PATCH 11/12] change input/output type comment --- feature/s3/transfermanager/api_op_PutObject.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go index bba6b8036d3..367327cb45c 100644 --- a/feature/s3/transfermanager/api_op_PutObject.go +++ b/feature/s3/transfermanager/api_op_PutObject.go @@ -76,7 +76,8 @@ func (m *multipartUploadError) UploadID() string { return m.uploadID } -// PutObjectInput represents a request to the PutObject() call. +// PutObjectInput represents a request to the PutObject() call. It contains common fields +// of s3 PutObject and CreateMultipartUpload input type PutObjectInput struct { // Bucket the object is uploaded into Bucket string @@ -535,7 +536,8 @@ func (i *PutObjectInput) mapAbortMultipartUploadInput(uploadID *string) *s3.Abor return input } -// PutObjectOutput represents a response from the Upload() call. +// PutObjectOutput represents a response from the Upload() call. It contains common fields +// of s3 PutObject and CompleteMultipartUpload output type PutObjectOutput struct { // The ID for a multipart upload to S3. In the case of an error the error // can be cast to the MultiUploadFailure interface to extract the upload ID. From 51a6ea0847de77dd2bf41a0f492e9cdf1cad1c75 Mon Sep 17 00:00:00 2001 From: Tianyi Wang Date: Wed, 6 Nov 2024 16:42:37 -0500 Subject: [PATCH 12/12] minor change --- feature/s3/transfermanager/api_op_PutObject.go | 3 --- .../s3/transfermanager/internal/testing/upload.go | 12 ++---------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go index 367327cb45c..0785ec4e81e 100644 --- a/feature/s3/transfermanager/api_op_PutObject.go +++ b/feature/s3/transfermanager/api_op_PutObject.go @@ -55,8 +55,6 @@ type multipartUploadError struct { // Error returns the string representation of the error. // -// # See apierr.BaseError ErrorWithExtra for output format -// // Satisfies the error interface. func (m *multipartUploadError) Error() string { var extra string @@ -936,7 +934,6 @@ func (u *multiUploader) seterr(e error) { u.err = e } -// fail will abort the multipart unless LeavePartsOnError is set to true. func (u *multiUploader) fail(ctx context.Context, clientOptions ...func(*s3.Options)) { params := u.in.mapAbortMultipartUploadInput(u.uploadID) _, err := u.options.S3.AbortMultipartUpload(ctx, params, clientOptions...) diff --git a/feature/s3/transfermanager/internal/testing/upload.go b/feature/s3/transfermanager/internal/testing/upload.go index 10009ccdc0f..1764fc089e2 100644 --- a/feature/s3/transfermanager/internal/testing/upload.go +++ b/feature/s3/transfermanager/internal/testing/upload.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "net/url" + "slices" "sync" "github.com/aws/aws-sdk-go-v2/aws" @@ -68,7 +69,7 @@ func (f httpDoFunc) Do(r *http.Request) (*http.Response, error) { } func (u *UploadLoggingClient) traceOperation(name string, params interface{}) { - if contains(u.ignoredOperations, name) { + if slices.Contains(u.ignoredOperations, name) { return } u.Invocations = append(u.Invocations, name) @@ -190,12 +191,3 @@ func NewUploadLoggingClient(ignoredOps []string) (*UploadLoggingClient, *[]strin return c, &c.Invocations, &c.Params } - -func contains(src []string, s string) bool { - for _, v := range src { - if v == s { - return true - } - } - return false -}