Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with Amazon SQS Extended Client Library for Java #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 109 additions & 42 deletions src/ExtendedSqsClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const S3_MESSAGE_KEY_MARKER = '-..s3Key..-';
const S3_BUCKET_NAME_MARKER = '-..s3BucketName..-';

const S3_MESSAGE_BODY_KEY = 'S3MessageBodyKey';
const COMPATIBLE_ATTRIBUTE_NAME = "ExtendedPayloadSize";
const COMPATIBLE_ATTRIBUTE_NAME_LEGACY = "SQSLargePayloadSize";

function defaultSendTransform(alwaysUseS3, messageSizeThreshold) {
return (message) => {
Expand All @@ -24,31 +26,6 @@ function defaultReceiveTransform() {
};
}

function getS3MessageKeyAndBucket(message) {
const messageAttributes = message.messageAttributes || message.MessageAttributes || {};

if (!messageAttributes[S3_MESSAGE_BODY_KEY]) {
return {
bucketName: null,
s3MessageKey: null,
};
}

const s3MessageKeyAttr = messageAttributes[S3_MESSAGE_BODY_KEY];
const s3MessageKey = s3MessageKeyAttr.stringValue || s3MessageKeyAttr.StringValue;

if (!s3MessageKey) {
throw new Error(`Invalid ${S3_MESSAGE_BODY_KEY} message attribute: Missing stringValue/StringValue`);
}

const s3MessageKeyRegexMatch = s3MessageKey.match(/^\((.*)\)(.*)?/);

return {
bucketName: s3MessageKeyRegexMatch[1],
s3MessageKey: s3MessageKeyRegexMatch[2],
};
}

function embedS3MarkersInReceiptHandle(bucketName, s3MessageKey, receiptHandle) {
return `${S3_BUCKET_NAME_MARKER}${bucketName}${S3_BUCKET_NAME_MARKER}${S3_MESSAGE_KEY_MARKER}${s3MessageKey}${S3_MESSAGE_KEY_MARKER}${receiptHandle}`;
}
Expand Down Expand Up @@ -81,16 +58,6 @@ function getOriginReceiptHandle(receiptHandle) {
: receiptHandle;
}

function addS3MessageKeyAttribute(s3MessageKey, attributes) {
return {
...attributes,
[S3_MESSAGE_BODY_KEY]: {
DataType: 'String',
StringValue: s3MessageKey,
},
};
}

function wrapRequest(request, callback, sendFn) {
if (callback) {
sendFn(callback);
Expand Down Expand Up @@ -184,6 +151,26 @@ class ExtendedSqsClient {
this.sendTransform =
options.sendTransform || defaultSendTransform(options.alwaysUseS3, options.messageSizeThreshold);
this.receiveTransform = options.receiveTransform || defaultReceiveTransform();

// Compatible with Amazon SQS Extended Client Library for Java
this.compatibleMode = options.compatibleMode || false;

// Change attribute name for compatibility with sending to older client using e.g. 'SQSLargePayloadSize'
this.useS3AttributeName = options.useS3AttributeName || null;

if (options.useS3AttributeName) {
this.useS3AttributeNameForSend = options.useS3AttributeName;
} else if (this.compatibleMode) {
this.useS3AttributeNameForSend = COMPATIBLE_ATTRIBUTE_NAME;
} else {
this.useS3AttributeNameForSend = S3_MESSAGE_BODY_KEY;
}

if (options.useS3AttributeName && !this.compatibleMode) {
this.s3MessageBodyKey = options.useS3AttributeName;
} else {
this.s3MessageBodyKey = S3_MESSAGE_BODY_KEY;
}
}

_storeS3Content(key, s3Content) {
Expand Down Expand Up @@ -220,7 +207,7 @@ class ExtendedSqsClient {
before: async ({ event }) => {
await Promise.all(
event.Records.map(async (record) => {
const { bucketName, s3MessageKey } = getS3MessageKeyAndBucket(record);
const { bucketName, s3MessageKey } = this._getS3MessageKeyAndBucket(record);

if (s3MessageKey) {
/* eslint-disable-next-line no-param-reassign */
Expand Down Expand Up @@ -324,18 +311,28 @@ class ExtendedSqsClient {

const sendObj = this.sendTransform(sendParams);
const existingS3MessageKey =
params.MessageAttributes && params.MessageAttributes[ExtendedSqsClient.RESERVED_ATTRIBUTE_NAME];
params.MessageAttributes && params.MessageAttributes[this.s3MessageBodyKey];
let s3MessageKey;

if (!sendObj.s3Content || existingS3MessageKey) {
sendParams.MessageBody = sendObj.messageBody || existingS3MessageKey.StringValue;
} else {
s3MessageKey = uuidv4();
sendParams.MessageAttributes = addS3MessageKeyAttribute(
sendParams.MessageAttributes = this._addS3MessageKeyAttribute(
`(${this.bucketName})${s3MessageKey}`,
sendParams.MessageAttributes
sendParams.MessageAttributes,
sendParams.MessageBody
);
sendParams.MessageBody = sendObj.messageBody || s3MessageKey;
if (sendObj.messageBody) {
sendParams.MessageBody = sendObj.messageBody;
} else if (this.compatibleMode) {
sendParams.MessageBody = JSON.stringify({
s3BucketName: this.bucketName,
s3Key: s3MessageKey
});
} else {
sendParams.MessageBody = s3MessageKey;
}
}

return {
Expand All @@ -345,6 +342,25 @@ class ExtendedSqsClient {
};
}

_addS3MessageKeyAttribute(s3MessageKey, attributes, messageBody) {
let xAttrVal;
if (this.compatibleMode) {
xAttrVal = {
DataType: 'Number',
StringValue: messageBody.length.toString()
};
} else {
xAttrVal = {
DataType: 'String',
StringValue: s3MessageKey,
};
}
return {
...attributes,
[this.useS3AttributeNameForSend]: xAttrVal,
};
}

sendMessage(params, callback) {
if (!this.bucketName) {
throw new Error('bucketName option is required for sending messages');
Expand Down Expand Up @@ -398,7 +414,7 @@ class ExtendedSqsClient {
return (response) =>
Promise.all(
(response.Messages || []).map(async (message) => {
const { bucketName, s3MessageKey } = getS3MessageKeyAndBucket(message);
const { bucketName, s3MessageKey } = this._getS3MessageKeyAndBucket(message);

if (s3MessageKey) {
/* eslint-disable-next-line no-param-reassign */
Expand All @@ -420,12 +436,63 @@ class ExtendedSqsClient {
);
}

_getS3MessageKeyAndBucket(message) {
const messageAttributes = message.messageAttributes || message.MessageAttributes || {};

if (messageAttributes[this.s3MessageBodyKey]) {
const s3MessageKeyAttr = messageAttributes[this.s3MessageBodyKey];
const s3MessageKey = s3MessageKeyAttr.stringValue || s3MessageKeyAttr.StringValue;

if (!s3MessageKey) {
throw new Error(`Invalid ${this.s3MessageBodyKey} message attribute: Missing stringValue/StringValue`);
}

const s3MessageKeyRegexMatch = s3MessageKey.match(/^\((.*)\)(.*)?/);

return {
bucketName: s3MessageKeyRegexMatch[1],
s3MessageKey: s3MessageKeyRegexMatch[2],
};
}

if (this.compatibleMode && (
messageAttributes[COMPATIBLE_ATTRIBUTE_NAME]
|| (messageAttributes[COMPATIBLE_ATTRIBUTE_NAME_LEGACY])
|| (this.useS3AttributeName && messageAttributes[this.useS3AttributeName])
)) {
let body;
try {
body = JSON.parse(message.Body);
}
catch(err) {
throw new Error(`Invalid message body: Cannot parse JSON for useS3 message`);
}
if (!(body.s3BucketName && body.s3Key)) {
throw new Error(`Invalid message body: Mising s3BucketName and/or s3Key`);
}
return {
bucketName: body.s3BucketName,
s3MessageKey: body.s3Key
};
}

return {
bucketName: null,
s3MessageKey: null,
};
}

receiveMessage(params, callback) {
const modifiedParams = {
...params,
MessageAttributeNames: [...(params.MessageAttributeNames || []), ExtendedSqsClient.RESERVED_ATTRIBUTE_NAME],
};

if (this.compatibleMode) {
modifiedParams.MessageAttributeNames.push(COMPATIBLE_ATTRIBUTE_NAME, COMPATIBLE_ATTRIBUTE_NAME_LEGACY);
}
if (this.useS3AttributeName) {
modifiedParams.MessageAttributeNames.push(this.useS3AttributeName);
}
const request = this.sqs.receiveMessage(modifiedParams);
return wrapRequest(request, callback, invokeFnAfterRequest(request, this._processReceive()));
}
Expand Down