Skip to content

Commit

Permalink
Merge pull request #57 from lifeomic/logger-obfucate-option
Browse files Browse the repository at this point in the history
feat!: add obfuscate logging option
  • Loading branch information
rmneidermyer authored Nov 14, 2023
2 parents d1ae182 + 8e0e3b2 commit 7d246a3
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 35 deletions.
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { DynamoStreamHandler } from '@lifeomic/delta';

const stream = new DynamoStreamHandler({
logger,
// Optionally specify a list of image keys to obfuscate the values of
loggerObfuscateImageKeys: ['api-secret'],
parse: (item) => {
// parse the item using your custom logic, e.g. using zod or ajv.
return { id: item.id };
Expand Down Expand Up @@ -165,12 +167,22 @@ test('something', async () => {

### Parallel Processing + Ordering

By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and `SQSMessageHandler`) will process events in parallel. To control the parallelization, specify a `concurrency` value when creating the handler.
By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and
`SQSMessageHandler`) will process events in parallel. To control the
parallelization, specify a `concurrency` value when creating the handler.

These abstractions also ensure that within a batch of events correct _ordering_ of events is maintained according to the ordering semantics of the upstream event source, even when processing in parallel.
These abstractions also ensure that within a batch of events correct _ordering_
of events is maintained according to the ordering semantics of the upstream
event source, even when processing in parallel.

In `DynamoStreamHandler`, events for the same _key_ will always be processed serially -- events from different keys will be processed in parallel.
In `DynamoStreamHandler`, events for the same _key_ will always be processed
serially -- events from different keys will be processed in parallel.

In `SQSMessageHandler`, events with the same `MessageGroupId` will always processed serially -- events with different `MessageGroupId` values will be processed in parallel.
In `SQSMessageHandler`, events with the same `MessageGroupId` will always
processed serially -- events with different `MessageGroupId` values will be
processed in parallel.

**Note**: while the ordering semantics above will always be preserved, events that do _not_ need to be ordered will not necessarily be processed in the same order they were received in the batch (even when using a `concurrency` value of `1`).
**Note**: while the ordering semantics above will always be preserved, events
that do _not_ need to be ordered will not necessarily be processed in the same
order they were received in the batch (even when using a `concurrency` value of
`1`).
25 changes: 18 additions & 7 deletions src/dynamo-streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ describe('DynamoStreamHandler', () => {
);

expect(dataSources.doSomething).toHaveBeenCalledTimes(5);

expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, 'insert 1', {
id: 'test-id-1',
});
Expand Down Expand Up @@ -431,6 +430,7 @@ describe('DynamoStreamHandler', () => {
describe('error scenarios', () => {
const lambda = new DynamoStreamHandler({
logger,
loggerObfuscateImageKeys: ['secret'],
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
}).lambda();
Expand All @@ -443,7 +443,6 @@ describe('DynamoStreamHandler', () => {
);

expect(logger.error).toHaveBeenCalledWith(
expect.anything(),
'The dynamodb property was not present on event',
);
});
Expand All @@ -456,7 +455,6 @@ describe('DynamoStreamHandler', () => {
);

expect(logger.error).toHaveBeenCalledWith(
expect.anything(),
'No NewImage was defined for an INSERT event',
);
});
Expand All @@ -476,9 +474,14 @@ describe('DynamoStreamHandler', () => {
);

expect(logger.error).toHaveBeenCalledWith(
expect.anything(),
'No NewImage was defined for a MODIFY event',
);
expect(logger.child).toHaveBeenCalledWith({
record: {
eventName: 'MODIFY',
dynamodb: { OldImage: { id: { S: 'test-id' } } },
},
});
});

test('MODIFY with no OldImage', async () => {
Expand All @@ -487,7 +490,9 @@ describe('DynamoStreamHandler', () => {
Records: [
{
eventName: 'MODIFY',
dynamodb: { NewImage: { id: { S: 'test-id' } } },
dynamodb: {
NewImage: { id: { S: 'test-id' }, secret: { S: 'test-id' } },
},
},
],
},
Expand All @@ -496,9 +501,16 @@ describe('DynamoStreamHandler', () => {
);

expect(logger.error).toHaveBeenCalledWith(
expect.anything(),
'No OldImage was defined for a MODIFY event',
);
expect(logger.child).toHaveBeenCalledWith({
record: {
eventName: 'MODIFY',
dynamodb: {
NewImage: { id: { S: 'test-id' }, secret: { S: 'obfuscated' } },
},
},
});
});

test('REMOVE with no OldImage', async () => {
Expand All @@ -509,7 +521,6 @@ describe('DynamoStreamHandler', () => {
);

expect(logger.error).toHaveBeenCalledWith(
expect.anything(),
'No OldImage was defined for a REMOVE event',
);
});
Expand Down
85 changes: 62 additions & 23 deletions src/dynamo-streams.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { LoggerInterface } from '@lifeomic/logging';
import { v4 as uuid } from 'uuid';
import { DynamoDBStreamEvent, DynamoDBStreamHandler } from 'aws-lambda';
import {
DynamoDBStreamEvent,
DynamoDBStreamHandler,
DynamoDBRecord,
} from 'aws-lambda';
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb';
import {
BaseContext,
Expand All @@ -13,6 +17,11 @@ export type DynamoStreamHandlerConfig<Entity, Context> = {
* A logger to use in the context.
*/
logger: LoggerInterface;
/**
* A listing of keys within a dynamo record's images to obfuscate in logging
* output.
*/
loggerObfuscateImageKeys?: string[];
/**
* A function for parsing images from the stream into your custom type.
*
Expand Down Expand Up @@ -60,7 +69,6 @@ export type DynamoStreamHandlerHarnessConfig<Context> = {
* An optional override for the logger.
*/
logger?: LoggerInterface;

/**
* An optional override for creating the run context.
*/
Expand Down Expand Up @@ -127,6 +135,45 @@ export class DynamoStreamHandler<Entity, Context> {
return copy;
}

private obfuscate(blob: any, keys: string[]): any {
if (blob === undefined) return undefined;
const obfuscated = blob;
keys.forEach((k) => {
if (obfuscated[k]) {
obfuscated[k] = { S: 'obfuscated' };
}
});
return obfuscated;
}

private obfuscateRecord(dynamoRecord: DynamoDBRecord): DynamoDBRecord {
if (this.config.loggerObfuscateImageKeys && dynamoRecord.dynamodb) {
return {
...dynamoRecord,
dynamodb: {
...dynamoRecord.dynamodb,
NewImage: this.obfuscate(
dynamoRecord.dynamodb.NewImage,
this.config.loggerObfuscateImageKeys,
),
OldImage: this.obfuscate(
dynamoRecord.dynamodb.OldImage,
this.config.loggerObfuscateImageKeys,
),
},
};
}
return dynamoRecord;
}

private obfuscateEvent(
dynamoEvent: DynamoDBStreamEvent,
): DynamoDBStreamEvent {
return {
Records: dynamoEvent.Records.map((r) => this.obfuscateRecord(r)),
};
}

/**
* Adds an "INSERT" event handler.
*/
Expand Down Expand Up @@ -178,7 +225,10 @@ export class DynamoStreamHandler<Entity, Context> {
...base,
};

context.logger.info({ event }, 'Processing DynamoDB stream event');
context.logger.info(
{ event: this.obfuscateEvent(event) },
'Processing DynamoDB stream event',
);

await processWithOrdering(
{
Expand All @@ -194,7 +244,7 @@ export class DynamoStreamHandler<Entity, Context> {
// We need to order by key -- so, just stringify the key.
//
// But, add custom logic to ensure that the key object is stringified
// determinstically, regardless of the order of its keys. (e.g. we
// deterministically, regardless of the order of its keys. (e.g. we
// should stringify { a: 1, b: 2 } and { b: 2, a: 1 } to the same string)
//
// It's possible that AWS already ensures that the keys are deterministically
Expand All @@ -210,10 +260,11 @@ export class DynamoStreamHandler<Entity, Context> {
stopOnError: false,
},
async (record) => {
const recordLogger = this.config.logger.child({ record });
const recordLogger = this.config.logger.child({
record: this.obfuscateRecord(record),
});
if (!record.dynamodb) {
recordLogger.error(
{ record },
'The dynamodb property was not present on event',
);
return;
Expand All @@ -233,10 +284,7 @@ export class DynamoStreamHandler<Entity, Context> {
// Handle INSERT events -- invoke the INSERT actions in order.
if (record.eventName === 'INSERT') {
if (!newEntity) {
recordLogger.error(
{ record },
'No NewImage was defined for an INSERT event',
);
recordLogger.error('No NewImage was defined for an INSERT event');
return;
}

Expand All @@ -247,17 +295,11 @@ export class DynamoStreamHandler<Entity, Context> {
// Handle MODIFY events -- invoke the MODIFY actions in order.
else if (record.eventName === 'MODIFY') {
if (!oldEntity) {
recordLogger.error(
{ record },
'No OldImage was defined for a MODIFY event',
);
recordLogger.error('No OldImage was defined for a MODIFY event');
return;
}
if (!newEntity) {
recordLogger.error(
{ record },
'No NewImage was defined for a MODIFY event',
);
recordLogger.error('No NewImage was defined for a MODIFY event');
return;
}

Expand All @@ -272,10 +314,7 @@ export class DynamoStreamHandler<Entity, Context> {
// Handle REMOVE events -- invoke the REMOVE actions in order.
else if (record.eventName === 'REMOVE') {
if (!oldEntity) {
recordLogger.error(
{ record },
'No OldImage was defined for a REMOVE event',
);
recordLogger.error('No OldImage was defined for a REMOVE event');
return;
}

Expand All @@ -290,7 +329,7 @@ export class DynamoStreamHandler<Entity, Context> {

/**
* Returns a test harness for exercising the handler, with an optional
* overriden context.
* overridden context.
*/
harness(
options?: DynamoStreamHandlerHarnessConfig<Context>,
Expand Down

0 comments on commit 7d246a3

Please sign in to comment.