Skip to content

Commit

Permalink
[sqs] Implement ReceiptHandle and deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
dzbarsky committed Sep 20, 2023
1 parent 91eda0b commit 2d0c2f0
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
8 changes: 8 additions & 0 deletions services/sqs/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ func EmptyBatchRequest(message string) *awserrors.Error {
func TooManyEntriesInBatchRequest(message string) *awserrors.Error {
return awserrors.Generate400Exception("TooManyEntriesInBatchRequest", message)
}

func InvalidIdFormat(message string) *awserrors.Error {
return awserrors.Generate400Exception("InvalidIdFormat", message)
}

func ReceiptHandleIsInvalid(message string) *awserrors.Error {
return awserrors.Generate400Exception("ReceiptHandleIsInvalid", message)
}
25 changes: 24 additions & 1 deletion services/sqs/sqs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package sqs

import (
"bytes"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"log/slog"
"maps"
Expand All @@ -11,6 +13,8 @@ import (
"sync"
"time"

"github.com/gofrs/uuid/v5"

"aws-in-a-box/arn"
"aws-in-a-box/awserrors"
)
Expand All @@ -36,6 +40,8 @@ var (
)

type Message struct {
UUID uuid.UUID

Body string
MD5OfBody string
MessageAttributes map[string]APIAttribute
Expand Down Expand Up @@ -184,6 +190,7 @@ func (s *SQS) SendMessage(input SendMessageInput) (*SendMessageOutput, *awserror

MD5OfBody := hexMD5([]byte(input.MessageBody))
queue.Messages = append(queue.Messages, &Message{
UUID: uuid.Must(uuid.NewV4()),
Body: input.MessageBody,
MD5OfBody: MD5OfBody,
MessageAttributes: input.MessageAttributes,
Expand Down Expand Up @@ -345,6 +352,10 @@ func (s *SQS) ReceiveMessage(input ReceiveMessageInput) (*ReceiveMessageOutput,
Body: message.Body,
MD5OfBody: message.MD5OfBody,
MessageAttributes: filterAttributes(message.MessageAttributes, input.MessageAttributeNames),
MessageId: message.UUID.String(),
// TODO: It seems AWS encodes additional data in here, there's some sort of binary blob
// This is fine for now as long as nobody tries to get clever with the representation.
ReceiptHandle: base64.StdEncoding.EncodeToString(message.UUID[:]),
})

message.VisibleAt = now.Add(visibilityTimeout)
Expand Down Expand Up @@ -388,7 +399,19 @@ func (s *SQS) DeleteMessage(input DeleteMessageInput) (*DeleteMessageOutput, *aw
}

func (s *SQS) lockedDeleteMessage(queue *Queue, receiptHandle string) *awserrors.Error {
return nil
uuid, err := base64.StdEncoding.DecodeString(receiptHandle)
if err != nil {
return InvalidIdFormat("")
}

for _, message := range queue.Messages {
if bytes.Equal(message.UUID.Bytes(), uuid) {
message.Deleted = true
return nil
}
}

return ReceiptHandleIsInvalid("")
}

// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_DeleteMessageBatch.html
Expand Down

0 comments on commit 2d0c2f0

Please sign in to comment.