Skip to content

Commit

Permalink
feat!: support partial batch responses on all handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
swain committed May 24, 2024
1 parent 4c9edbe commit 6c9a325
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 100 deletions.
22 changes: 16 additions & 6 deletions src/dynamo-streams.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { LoggerInterface } from '@lifeomic/logging';
import { v4 as uuid } from 'uuid';
import {
Context as AWSContext,
DynamoDBStreamEvent,
DynamoDBStreamHandler,
DynamoDBRecord,
} from 'aws-lambda';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
import {
BaseContext,
BaseHandlerConfig,
PartialBatchResponse,
handleUnprocessedRecords,
processWithOrdering,
withHealthCheckHandling,
} from './utils';
Expand Down Expand Up @@ -198,7 +200,10 @@ export class DynamoStreamHandler<Entity, Context> {
* Returns a DynamoDB stream lambda handler that will perform the configured
* actions.
*/
lambda(): DynamoDBStreamHandler {
lambda(): (
event: DynamoDBStreamEvent,
context: AWSContext,
) => Promise<PartialBatchResponse> {
return withHealthCheckHandling(async (event, ctx) => {
const correlationId = uuid();

Expand All @@ -220,7 +225,7 @@ export class DynamoStreamHandler<Entity, Context> {
'Processing DynamoDB stream event',
);

const processingResult = await processWithOrdering(
const { unprocessedRecords } = await processWithOrdering(
{
items: event.Records,
orderBy: (record) => {
Expand Down Expand Up @@ -314,8 +319,13 @@ export class DynamoStreamHandler<Entity, Context> {
},
);

processingResult.throwOnUnprocessedRecords();
context.logger.info('Successfully processed all DynamoDB stream records');
return handleUnprocessedRecords({
logger: context.logger,
unprocessedRecords,
usePartialBatchResponses: !!this.config.usePartialBatchResponses,
getItemIdentifier: (record) =>
record.dynamodb?.SequenceNumber ?? uuid(),
});
});
}

Expand Down Expand Up @@ -362,7 +372,7 @@ export class DynamoStreamHandler<Entity, Context> {
};

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await lambda(dynamoEvent, {} as any, null as any);
await lambda(dynamoEvent, {} as any);
},
};
}
Expand Down
18 changes: 14 additions & 4 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { KinesisStreamEvent, Context as AWSContext } from 'aws-lambda';
import {
BaseContext,
BaseHandlerConfig,
PartialBatchResponse,
handleUnprocessedRecords,
processWithOrdering,
withHealthCheckHandling,
} from './utils';
Expand Down Expand Up @@ -62,7 +64,10 @@ export class KinesisEventHandler<Event, Context> {
return this;
}

lambda(): (event: KinesisStreamEvent, context: AWSContext) => Promise<void> {
lambda(): (
event: KinesisStreamEvent,
context: AWSContext,
) => Promise<PartialBatchResponse> {
return withHealthCheckHandling(async (event, awsContext) => {
// 1. Build the context.
const correlationId = uuid();
Expand All @@ -77,7 +82,7 @@ export class KinesisEventHandler<Event, Context> {
Object.assign(context, await this.config.createRunContext(context));

// 2. Process all the records.
const processingResult = await processWithOrdering(
const { unprocessedRecords } = await processWithOrdering(
{
items: event.Records,
orderBy: (record) => record.kinesis.partitionKey,
Expand All @@ -100,8 +105,13 @@ export class KinesisEventHandler<Event, Context> {
},
);

processingResult.throwOnUnprocessedRecords();
context.logger.info('Successfully processed all Kinesis records');
// 3. Handle unprocessed records, if need be.
return handleUnprocessedRecords({
logger: context.logger,
unprocessedRecords,
usePartialBatchResponses: !!this.config.usePartialBatchResponses,
getItemIdentifier: (record) => record.kinesis.sequenceNumber,
});
});
}

Expand Down
73 changes: 12 additions & 61 deletions src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { SQSEvent, Context as AWSContext, SQSRecord } from 'aws-lambda';
import {
BaseContext,
BaseHandlerConfig,
PartialBatchResponse,
handleUnprocessedRecords,
processWithOrdering,
withHealthCheckHandling,
} from './utils';
Expand All @@ -16,15 +18,6 @@ export type SQSMessageHandlerConfig<Message, Context> =
*/
parseMessage: (body: string) => Message;

/**
* Whether or not to use SQS partial batch responses. If set to true, make
* sure to also turn on partial batch responses when configuring your event
* source mapping by specifying ReportBatchItemFailures for the
* FunctionResponseTypes action. For more details see:
* https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
*/
usePartialBatchResponses?: boolean;

redactionConfig?: {
/**
* This will be called to redact the message body before logging it. By
Expand Down Expand Up @@ -73,12 +66,6 @@ export type SQSMessageHandlerHarnessContext<Message> = {
sendEvent: (event: { messages: Message[] }) => Promise<void>;
};

export type SQSPartialBatchResponse = {
batchItemFailures: {
itemIdentifier: string;
}[];
};

const safeRedactor =
(
logger: LoggerInterface,
Expand Down Expand Up @@ -143,7 +130,7 @@ export class SQSMessageHandler<Message, Context> {
lambda(): (
event: SQSEvent,
context: AWSContext,
) => Promise<void | SQSPartialBatchResponse> {
) => Promise<PartialBatchResponse> {
return withHealthCheckHandling(async (event, awsContext) => {
// 1. Build the context.
const correlationId = uuid();
Expand Down Expand Up @@ -181,13 +168,14 @@ export class SQSMessageHandler<Message, Context> {
'Processing SQS topic message',
);

const processingResult = await processWithOrdering(
const { unprocessedRecords } = await processWithOrdering(
{
items: event.Records,
// If there is not a MessageGroupId, then we don't care about
// the ordering for the event. We can just generate a UUID for the
// ordering key.
orderBy: (record) => record.attributes.MessageGroupId ?? uuid(),
orderBy: (record: SQSRecord) =>
record.attributes.MessageGroupId ?? uuid(),
concurrency: this.config.concurrency ?? 5,
},
async (record) => {
Expand All @@ -205,49 +193,12 @@ export class SQSMessageHandler<Message, Context> {
},
);

const unprocessedRecordsByGroupIdEntries = Object.entries(
processingResult.unprocessedRecordsByGroupId,
);

if (!unprocessedRecordsByGroupIdEntries.length) {
context.logger.info('Successfully processed all SQS messages');
return;
}

if (!this.config.usePartialBatchResponses) {
processingResult.throwOnUnprocessedRecords();
}

// SQS partial batching expects that you return an ordered list of
// failures. We map through each group and add them to the batch item
// failures in order for each group.
const batchItemFailures = unprocessedRecordsByGroupIdEntries
.map(([groupId, record]) => {
const [failedRecord, ...subsequentUnprocessedRecords] = record.items;

context.logger.error(
{
groupId,
err: record.error,
failedRecord: redactRecord(failedRecord),
subsequentUnprocessedRecords:
subsequentUnprocessedRecords.map(redactRecord),
},
'Failed to fully process message group',
);

return record.items.map((item) => ({
itemIdentifier: item.messageId,
}));
})
.flat();

context.logger.info(
{ batchItemFailures },
'Sending SQS partial batch response',
);

return { batchItemFailures };
return handleUnprocessedRecords({
logger: context.logger,
unprocessedRecords,
usePartialBatchResponses: !!this.config.usePartialBatchResponses,
getItemIdentifier: (record) => record.messageId,
});
});
}

Expand Down
95 changes: 66 additions & 29 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ export type BaseHandlerConfig<Context> = {
* @default 5
*/
concurrency?: number;

/**
* Whether or not to use partial batch responses. If set to true, make
* sure to also turn on partial batch responses when configuring your event
* source mapping by specifying ReportBatchItemFailures for the
* FunctionResponseTypes action. For more details see:
* https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
*/
usePartialBatchResponses?: boolean;
};

export const withHealthCheckHandling =
Expand All @@ -46,6 +55,10 @@ export const withHealthCheckHandling =
return handler(event, context);
};

export type PartialBatchResponse =
| { batchItemFailures: { itemIdentifier: string }[] }
| undefined;

export type ProcessWithOrderingParams<T> = {
/**
* The list of items to process.
Expand All @@ -62,6 +75,10 @@ export type ProcessWithOrderingParams<T> = {
concurrency: number;
};

export type ProcessWithOrderingReturn<T> = {
unprocessedRecords: { item: T; error?: any }[];
};

/**
* A utility for performing parallel asynchronous processing of a
* list of items, while also maintaining ordering.
Expand Down Expand Up @@ -89,52 +106,72 @@ export type ProcessWithOrderingParams<T> = {
export const processWithOrdering = async <T>(
params: ProcessWithOrderingParams<T>,
process: (item: T) => Promise<void>,
) => {
): Promise<ProcessWithOrderingReturn<T>> => {
const groupedItems = groupBy(params.items, params.orderBy);
const groupIds = Object.keys(groupedItems);
const groups = Object.values(groupedItems);
const unprocessedRecordsByGroupId: Record<
string,
{
error: any;
items: T[];
}
> = {};
const unprocessedRecords: { item: T; error?: any }[] = [];

await pMap(
groups,
async (group, groupIndex) => {
async (group) => {
for (let i = 0; i < group.length; i++) {
const item = group[i];

try {
await process(item);
} catch (error) {
// Keep track of all unprocessed items and stop processing the current
// group as soon as we encounter the first error.
unprocessedRecordsByGroupId[groupIds[groupIndex]] = {
error,
items: group.slice(i),
};
// Track the error on this item.
unprocessedRecords.push({ item, error });
// Also, track any subsequent items in this group.
unprocessedRecords.push(
...group.slice(i + 1).map((item) => ({ item })),
);
return;
}
}
},
{
concurrency: params.concurrency,
},
{ concurrency: params.concurrency },
);

return {
unprocessedRecordsByGroupId,
throwOnUnprocessedRecords: () => {
const aggregateErrors = Object.values(unprocessedRecordsByGroupId).map(
(record) => record.error,
return { unprocessedRecords };
};

export type HandleUnprocessedRecordsReturn =
| { batchItemFailures: { itemIdentifier: string }[] }
| undefined;

export const handleUnprocessedRecords = <Record>(params: {
logger: LoggerInterface;
unprocessedRecords: ProcessWithOrderingReturn<Record>['unprocessedRecords'];
usePartialBatchResponses: boolean;
getItemIdentifier: (record: Record) => string;
}): HandleUnprocessedRecordsReturn => {
if (params.unprocessedRecords.length === 0) {
params.logger.info('Successfully processed all messages');
return { batchItemFailures: [] };
}

if (!params.usePartialBatchResponses) {
throw new AggregateError(
params.unprocessedRecords.filter((i) => 'error' in i).map((e) => e.error),
);
}

// Log all the failures.
for (const { item, error } of params.unprocessedRecords) {
if (error) {
params.logger.error(
{ err: error, identifier: params.getItemIdentifier(item) },
'Encountered error processing message',
);
}
}

if (aggregateErrors.length) {
throw new AggregateError(aggregateErrors);
}
},
};
const batchItemFailures = params.unprocessedRecords.map(({ item }) => ({
itemIdentifier: params.getItemIdentifier(item),
}));

params.logger.info({ batchItemFailures }, 'Sending partial batch response');

return { batchItemFailures };
};

0 comments on commit 6c9a325

Please sign in to comment.