diff --git a/lib/dynamo/message_test.go b/lib/dynamo/message_test.go index 6f151885..25ad2081 100644 --- a/lib/dynamo/message_test.go +++ b/lib/dynamo/message_test.go @@ -119,7 +119,8 @@ func TestTransformAttributeValue(t *testing.T) { } for _, tc := range tcs { - actualValue := transformAttributeValue(tc.attr) + actualValue, err := transformAttributeValue(tc.attr) + assert.NoError(t, err, tc.name) assert.Equal(t, tc.expectedValue, actualValue, tc.name) } } diff --git a/lib/dynamo/parse_message.go b/lib/dynamo/parse_message.go index ea2a6f25..65786454 100644 --- a/lib/dynamo/parse_message.go +++ b/lib/dynamo/parse_message.go @@ -19,9 +19,11 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s // Snapshot time does not exist on the row // Perhaps we can have it inferred from the manifest file in the future. - executionTime := time.Now() + rowData, err := transformImage(item.Item) + if err != nil { + return nil, fmt.Errorf("failed to transform item: %w", err) + } - rowData := transformImage(item.Item) primaryKeys := make(map[string]any) for _, key := range keys { val, isOk := rowData[key] @@ -35,7 +37,7 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s return &Message{ op: "r", tableName: tableName, - executionTime: executionTime, + executionTime: time.Now(), afterRowData: rowData, primaryKey: primaryKeys, }, nil @@ -67,12 +69,27 @@ func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, err } } + beforeData, err := transformImage(record.Dynamodb.OldImage) + if err != nil { + return nil, fmt.Errorf("failed to transform old image: %w", err) + } + + afterData, err := transformImage(record.Dynamodb.NewImage) + if err != nil { + return nil, fmt.Errorf("failed to transform new image: %w", err) + } + + primaryKey, err := transformImage(record.Dynamodb.Keys) + if err != nil { + return nil, fmt.Errorf("failed to transform keys: %w", err) + } + return &Message{ op: op, tableName: tableName, executionTime: executionTime, - beforeRowData: transformImage(record.Dynamodb.OldImage), - afterRowData: transformImage(record.Dynamodb.NewImage), - primaryKey: transformImage(record.Dynamodb.Keys), + beforeRowData: beforeData, + afterRowData: afterData, + primaryKey: primaryKey, }, nil }