Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed May 31, 2024
1 parent eb0f2aa commit 350185f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
3 changes: 2 additions & 1 deletion lib/dynamo/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func TestTransformAttributeValue(t *testing.T) {
}

for _, tc := range tcs {
actualValue := transformAttributeValue(tc.attr)
actualValue, err := transformAttributeValue(tc.attr)
assert.NoError(t, err, tc.name)
assert.Equal(t, tc.expectedValue, actualValue, tc.name)
}
}
29 changes: 23 additions & 6 deletions lib/dynamo/parse_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s

// Snapshot time does not exist on the row
// Perhaps we can have it inferred from the manifest file in the future.
executionTime := time.Now()
rowData, err := transformImage(item.Item)
if err != nil {
return nil, fmt.Errorf("failed to transform item: %w", err)
}

rowData := transformImage(item.Item)
primaryKeys := make(map[string]any)
for _, key := range keys {
val, isOk := rowData[key]
Expand All @@ -35,7 +37,7 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s
return &Message{
op: "r",
tableName: tableName,
executionTime: executionTime,
executionTime: time.Now(),
afterRowData: rowData,
primaryKey: primaryKeys,
}, nil
Expand Down Expand Up @@ -67,12 +69,27 @@ func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, err
}
}

beforeData, err := transformImage(record.Dynamodb.OldImage)
if err != nil {
return nil, fmt.Errorf("failed to transform old image: %w", err)
}

afterData, err := transformImage(record.Dynamodb.NewImage)
if err != nil {
return nil, fmt.Errorf("failed to transform new image: %w", err)
}

primaryKey, err := transformImage(record.Dynamodb.Keys)
if err != nil {
return nil, fmt.Errorf("failed to transform keys: %w", err)
}

return &Message{
op: op,
tableName: tableName,
executionTime: executionTime,
beforeRowData: transformImage(record.Dynamodb.OldImage),
afterRowData: transformImage(record.Dynamodb.NewImage),
primaryKey: transformImage(record.Dynamodb.Keys),
beforeRowData: beforeData,
afterRowData: afterData,
primaryKey: primaryKey,
}, nil
}

0 comments on commit 350185f

Please sign in to comment.