Skip to content

Commit

Permalink
feat: upgrade to node 20 and aws sdk v3
Browse files Browse the repository at this point in the history
  • Loading branch information
sammarks committed Dec 26, 2023
1 parent da55de9 commit 9dab372
Show file tree
Hide file tree
Showing 7 changed files with 1,312 additions and 278 deletions.
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
"deploy": "./deploy-to-s3.sh"
},
"devDependencies": {
"aws-sdk": "^2.717.0",
"aws-sdk-mock": "^5.1.0",
"coveralls": "^3.1.0",
"eslint": "^7.5.0",
"eslint-config-standard": "^14.1.1",
Expand All @@ -32,5 +30,10 @@
},
"resolutions": {
"lodash": "4.17.19"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.478.0",
"@aws-sdk/client-sns": "^3.481.0",
"@aws-sdk/lib-dynamodb": "^3.478.0"
}
}
4 changes: 2 additions & 2 deletions sam-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Resources:
Type: 'AWS::Serverless::Function'
Properties:
Handler: src/ingest.handler
Runtime: nodejs18.x
Runtime: nodejs20.x
CodeUri: src
MemorySize: 128
Timeout: 3
Expand Down Expand Up @@ -58,7 +58,7 @@ Resources:
Type: 'AWS::Serverless::Function'
Properties:
Handler: src/schedule.handler
Runtime: nodejs18.x
Runtime: nodejs20.x
CodeUri: src
MemorySize: 128
Timeout: 60
Expand Down
14 changes: 8 additions & 6 deletions src/ingest.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
const AWS = require('aws-sdk')
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb')
const { DynamoDBDocumentClient, UpdateCommand, DeleteCommand } = require('@aws-sdk/lib-dynamodb')

module.exports.handler = async (event) => {
const documentClient = new AWS.DynamoDB.DocumentClient()
const dynamoDbClient = new DynamoDBClient({})
const documentClient = DynamoDBDocumentClient.from(dynamoDbClient)
return Promise.all(event.Records.map(async (record) => {
const { executeTime, taskId, topicArn, payload = {} } = JSON.parse(record.Sns.Message)
console.info(`Processing incoming task request: ${taskId}...`)
if (executeTime) {
console.info(`Creating task '${taskId}' to execute at '${executeTime}'...`)
await documentClient.update({
await documentClient.send(new UpdateCommand({
TableName: process.env.TASKS_TABLE,
Key: { taskId },
UpdateExpression: 'SET #executeTime = :executeTime, #executeHuman = :executeHuman, #topicArn = :topicArn, #payload = :payload',
Expand All @@ -23,13 +25,13 @@ module.exports.handler = async (event) => {
':topicArn': topicArn,
':payload': payload
}
}).promise()
}))
} else {
console.info(`Deleting task '${taskId}'...`)
await documentClient.delete({
await documentClient.send(new DeleteCommand({
TableName: process.env.TASKS_TABLE,
Key: { taskId }
}).promise()
}))
}
}))
}
63 changes: 43 additions & 20 deletions src/schedule.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
const AWS = require('aws-sdk')
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb')
const {
DynamoDBDocumentClient,
ScanCommand,
DeleteCommand
} = require('@aws-sdk/lib-dynamodb')
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns')

const scanPage = async (documentClient, maxItems = 25, items = [], lastEvaluatedKey = null) => {
console.info(`Scanning with ${maxItems} max items and ${items.length} items.`)
const scanPage = async (
documentClient,
maxItems = 25,
items = [],
lastEvaluatedKey = null
) => {
console.info(
`Scanning with ${maxItems} max items and ${items.length} items.`
)
console.info(`LastEvaluatedKey: ${lastEvaluatedKey}`)
const params = {
TableName: process.env.TASKS_TABLE,
Expand All @@ -10,20 +23,23 @@ const scanPage = async (documentClient, maxItems = 25, items = [], lastEvaluated
'#executeTime': 'executeTime'
},
ExpressionAttributeValues: {
':executeTime': (new Date()).getTime() / 1000
':executeTime': new Date().getTime() / 1000
}
}
if (lastEvaluatedKey) {
params.ExclusiveStartKey = lastEvaluatedKey
}
const { Items, LastEvaluatedKey } = await documentClient.scan(params).promise()
const { Items, LastEvaluatedKey } = await documentClient
.send(new ScanCommand(params))
const newItems = items.concat(Items)
if (newItems.length >= maxItems) {
console.info(`${newItems.length} >= ${maxItems} - returning value`)
return newItems
} else {
if (LastEvaluatedKey) {
console.info(`${newItems.length} < ${maxItems} - performing another scan`)
console.info(
`${newItems.length} < ${maxItems} - performing another scan`
)
return scanPage(documentClient, maxItems, newItems, LastEvaluatedKey)
} else {
console.info('No LastEvaluatedKey found. Returning results')
Expand All @@ -33,20 +49,27 @@ const scanPage = async (documentClient, maxItems = 25, items = [], lastEvaluated
}

module.exports.handler = async () => {
const sns = new AWS.SNS()
const documentClient = new AWS.DynamoDB.DocumentClient()
const sns = new SNSClient({})
const dynamoDBClient = new DynamoDBClient({})
const documentClient = DynamoDBDocumentClient.from(dynamoDBClient)
const Items = await scanPage(documentClient)
console.info(`Found ${Items.length} tasks to trigger...`)
await Promise.all(Items.map(async ({ topicArn, payload, taskId }) => {
console.info(`Triggering task ID '${taskId}' - sending to '${topicArn}'...`)
await sns.publish({
TopicArn: topicArn,
Message: JSON.stringify(payload || {})
}).promise()
console.info(`Deleting '${taskId}'...`)
await documentClient.delete({
TableName: process.env.TASKS_TABLE,
Key: { taskId }
}).promise()
}))
await Promise.all(
Items.map(async ({ topicArn, payload, taskId }) => {
console.info(
`Triggering task ID '${taskId}' - sending to '${topicArn}'...`
)
await sns
.send(new PublishCommand({
TopicArn: topicArn,
Message: JSON.stringify(payload || {})
}))
console.info(`Deleting '${taskId}'...`)
await documentClient
.send(new DeleteCommand({
TableName: process.env.TASKS_TABLE,
Key: { taskId }
}))
})
)
}
80 changes: 47 additions & 33 deletions test/ingest.test.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
const { handler } = require('../src/ingest')
const AWS = require('aws-sdk-mock')
const { DynamoDBDocumentClient, UpdateCommand, DeleteCommand } = require('@aws-sdk/lib-dynamodb')

jest.mock('@aws-sdk/lib-dynamodb', () => {
const originalModule = jest.requireActual('@aws-sdk/lib-dynamodb')
const sendMock = jest.fn()
return {
...originalModule,
DynamoDBDocumentClient: {
from: () => ({
send: sendMock
})
}
}
})
afterEach(() => {
AWS.restore()
jest.clearAllMocks()
})

const { handler } = require('../src/ingest')

describe('ingest handler', () => {
let event, updateStub, deleteStub
let event, sendMock

beforeEach(() => {
process.env.TASKS_TABLE = 'tasks-table'
Expand All @@ -26,33 +39,33 @@ describe('ingest handler', () => {
}
]
}
updateStub = jest.fn((params, callback) => callback(null, 'success'))
deleteStub = jest.fn((params, callback) => callback(null, 'success'))
AWS.mock('DynamoDB.DocumentClient', 'update', updateStub)
AWS.mock('DynamoDB.DocumentClient', 'delete', deleteStub)
sendMock = DynamoDBDocumentClient.from().send.mockImplementation(async command => Promise.resolve('success'))
})

it('creates or updates the task', async () => {
await handler(event)
expect(updateStub.mock.calls.length).toEqual(1)
expect(deleteStub.mock.calls.length).toEqual(0)
expect(updateStub.mock.calls[0][0]).toEqual({
TableName: 'tasks-table',
Key: { taskId: 'test-task-id' },
UpdateExpression: 'SET #executeTime = :executeTime, #executeHuman = :executeHuman, #topicArn = :topicArn, #payload = :payload',
ExpressionAttributeNames: {
'#executeTime': 'executeTime',
'#executeHuman': 'executeHuman',
'#topicArn': 'topicArn',
'#payload': 'payload'
},
ExpressionAttributeValues: {
':executeTime': 20,
':executeHuman': (new Date(20000)).toString(),
':topicArn': 'arn:aws:sns:us-east-1:123456789:test-topic',
':payload': { foo: 'bar' }
}
})
expect(sendMock).toHaveBeenCalledTimes(1)
expect(sendMock).toHaveBeenCalledWith(expect.any(UpdateCommand))
expect(sendMock.mock.calls[0][0].clientCommand.input).toEqual(
expect.objectContaining({
TableName: 'tasks-table',
Key: { taskId: 'test-task-id' },
UpdateExpression:
'SET #executeTime = :executeTime, #executeHuman = :executeHuman, #topicArn = :topicArn, #payload = :payload',
ExpressionAttributeNames: {
'#executeTime': 'executeTime',
'#executeHuman': 'executeHuman',
'#topicArn': 'topicArn',
'#payload': 'payload'
},
ExpressionAttributeValues: {
':executeTime': 20,
':executeHuman': new Date(20000).toString(),
':topicArn': 'arn:aws:sns:us-east-1:123456789:test-topic',
':payload': { foo: 'bar' }
}
})
)
})

it('defaults payload to an empty object', async () => {
Expand All @@ -62,8 +75,9 @@ describe('ingest handler', () => {
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
})
await handler(event)
expect(updateStub.mock.calls[0][0].ExpressionAttributeValues[':payload']).toEqual({})
expect(deleteStub.mock.calls.length).toEqual(0)
expect(sendMock).toHaveBeenCalledTimes(1)
expect(sendMock).toHaveBeenCalledWith(expect.any(UpdateCommand))
expect(sendMock.mock.calls[0][0].clientCommand.input.ExpressionAttributeValues[':payload']).toEqual({})
})

it('deletes the entry if executeTime is falsy', async () => {
Expand All @@ -73,11 +87,11 @@ describe('ingest handler', () => {
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
})
await handler(event)
expect(updateStub.mock.calls.length).toEqual(0)
expect(deleteStub.mock.calls.length).toEqual(1)
expect(deleteStub.mock.calls[0][0]).toEqual({
expect(sendMock).toHaveBeenCalledTimes(1)
expect(sendMock).toHaveBeenCalledWith(expect.any(DeleteCommand))
expect(sendMock.mock.calls[0][0].clientCommand.input).toEqual(expect.objectContaining({
TableName: 'tasks-table',
Key: { taskId: 'test-task-id' }
})
}))
})
})
Loading

0 comments on commit 9dab372

Please sign in to comment.