Skip to content

Commit

Permalink
fix: paginate through all DynamoDB scan results
Browse files Browse the repository at this point in the history
  • Loading branch information
sammarks committed Aug 21, 2019
1 parent 7106f3a commit 850cda4
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 23 deletions.
36 changes: 29 additions & 7 deletions src/schedule.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,47 @@
const AWS = require('aws-sdk')

module.exports.handler = async () => {
const sns = new AWS.SNS()
const documentClient = new AWS.DynamoDB.DocumentClient()
const { Items } = await documentClient.scan({
const scanPage = async (documentClient, maxItems = 25, items = [], lastEvaluatedKey = null) => {
console.info(`Scanning with ${maxItems} max items and ${items.length} items.`)
console.info(`LastEvaluatedKey: ${lastEvaluatedKey}`)
const params = {
TableName: process.env.TASKS_TABLE,
Limit: 25,
FilterExpression: '#executeTime <= :executeTime',
ExpressionAttributeNames: {
'#executeTime': 'executeTime'
},
ExpressionAttributeValues: {
':executeTime': (new Date()).getTime() / 1000
}
}).promise()
}
if (lastEvaluatedKey) {
params.ExclusiveStartKey = lastEvaluatedKey
}
const { Items, LastEvaluatedKey } = await documentClient.scan(params).promise()
const newItems = items.concat(Items)
if (newItems.length >= maxItems) {
console.info(`${newItems.length} >= ${maxItems} - returning value`)
return newItems
} else {
if (LastEvaluatedKey) {
console.info(`${newItems.length} < ${maxItems} - performing another scan`)
return scanPage(documentClient, maxItems, newItems, LastEvaluatedKey)
} else {
console.info('No LastEvaluatedKey found. Returning results')
return newItems
}
}
}

module.exports.handler = async () => {
const sns = new AWS.SNS()
const documentClient = new AWS.DynamoDB.DocumentClient()
const Items = await scanPage(documentClient)
console.info(`Found ${Items.length} tasks to trigger...`)
await Promise.all(Items.map(async ({ topicArn, payload, taskId }) => {
console.info(`Triggering task ID '${taskId}' - sending to '${topicArn}'...`)
await sns.publish({
TopicArn: topicArn,
Message: JSON.stringify(payload)
Message: JSON.stringify(payload || {})
}).promise()
console.info(`Deleting '${taskId}'...`)
await documentClient.delete({
Expand Down
65 changes: 49 additions & 16 deletions test/schedule.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,35 @@ describe('schedule handler', () => {
})
describe('when there are tasks in the past that need to be executed', () => {
beforeEach(() => {
scanStub = jest.fn((params, callback) => callback(null, {
Items: [
{
taskId: 'test-task-one',
payload: { foo: 'bar' },
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
},
{
taskId: 'test-task-two',
payload: { foo2: 'bar2' },
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
}
]
}))
scanStub = jest
.fn()
.mockImplementationOnce((params, callback) => callback(null, {
LastEvaluatedKey: 'last-evaluated-key',
Items: [
{
taskId: 'test-task-one',
payload: { foo: 'bar' },
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
}
]
}))
.mockImplementationOnce((params, callback) => callback(null, {
LastEvaluatedKey: null,
Items: [
{
taskId: 'test-task-two',
payload: { foo2: 'bar2' },
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
}
]
}))
AWS.mock('DynamoDB.DocumentClient', 'scan', scanStub)
return handler()
})
it('scans for the items properly', () => {
expect(scanStub.mock.calls.length).toEqual(1)
expect(scanStub.mock.calls.length).toEqual(2)
expect(scanStub.mock.calls[0][0]).toEqual({
TableName: 'tasks-table',
Limit: 25,
FilterExpression: '#executeTime <= :executeTime',
ExpressionAttributeNames: {
'#executeTime': 'executeTime'
Expand All @@ -52,6 +59,7 @@ describe('schedule handler', () => {
':executeTime': 20
}
})
expect(scanStub.mock.calls[1][0].ExclusiveStartKey).toEqual('last-evaluated-key')
})
it('posts a message to the SNS topic with the payload', () => {
expect(publishStub.mock.calls.length).toEqual(2)
Expand Down Expand Up @@ -89,4 +97,29 @@ describe('schedule handler', () => {
expect(deleteStub.mock.calls.length).toEqual(0)
})
})
describe('when there are more than 25 items in the past that need to be executed', () => {
beforeEach(() => {
const task = {
taskId: 'test-task-one',
topicArn: 'arn:aws:sns:us-east-1:123456789:test-topic'
}
scanStub = jest
.fn()
.mockImplementationOnce((params, callback) => callback(null, {
LastEvaluatedKey: 'last-evaluated-key',
Items: (new Array(23)).fill(task)
}))
.mockImplementationOnce((params, callback) => callback(null, {
LastEvaluatedKey: 'another-evaluated-key',
Items: (new Array(20)).fill(task)
}))
AWS.mock('DynamoDB.DocumentClient', 'scan', scanStub)
return handler()
})
it('only executes the first 25', () => {
expect(scanStub.mock.calls.length).toEqual(2)
expect(publishStub.mock.calls.length).toEqual(43)
expect(deleteStub.mock.calls.length).toEqual(43)
})
})
})

0 comments on commit 850cda4

Please sign in to comment.