Skip to content

Commit

Permalink
[DynamoDB] Removing TODO (#403)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored May 31, 2024
1 parent 9b4d39a commit 51f3eb8
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 34 deletions.
61 changes: 38 additions & 23 deletions lib/dynamo/message.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package dynamo

import (
"log/slog"
"fmt"
"strconv"
"time"

Expand All @@ -26,66 +26,81 @@ func stringToFloat64(s string) (float64, error) {

// transformAttributeValue converts a DynamoDB AttributeValue to a Go type.
// References: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html
func transformAttributeValue(attr *dynamodb.AttributeValue) any {
func transformAttributeValue(attr *dynamodb.AttributeValue) (any, error) {
switch {
case attr.S != nil:
return *attr.S
return *attr.S, nil
case attr.N != nil:
number, err := stringToFloat64(*attr.N)
if err == nil {
return number
} else {
slog.Error("Failed to convert string to float64", slog.Any("err", err), slog.String("string", *attr.N))
// TODO - should we throw an error here?
return nil
if err != nil {
return nil, fmt.Errorf("failed to convert string to float64: %w", err)
}

return number, nil
case attr.BOOL != nil:
return *attr.BOOL
return *attr.BOOL, nil
case attr.M != nil:
result := make(map[string]any)
for k, v := range attr.M {
result[k] = transformAttributeValue(v)
val, err := transformAttributeValue(v)
if err != nil {
return nil, fmt.Errorf("failed to transform attribute value: %w", err)
}

result[k] = val
}
return result

return result, nil
case attr.L != nil:
list := make([]any, len(attr.L))
for i, item := range attr.L {
list[i] = transformAttributeValue(item)
val, err := transformAttributeValue(item)
if err != nil {
return nil, fmt.Errorf("failed to transform attribute value: %w", err)
}

list[i] = val
}
return list

return list, nil
case attr.SS != nil:
// Convert the string set to a slice of strings.
strSet := make([]string, len(attr.SS))
for i, s := range attr.SS {
strSet[i] = *s
}
return strSet

return strSet, nil
case attr.NS != nil:
// Convert the number set to a slice of strings (since the numbers are stored as strings).
numSet := make([]float64, len(attr.NS))
for i, n := range attr.NS {
number, err := stringToFloat64(*n)
if err != nil {
slog.Error("Failed to convert string to float64", slog.Any("err", err), slog.String("string", *n))
// TODO - should we throw an error here?
return nil
return nil, fmt.Errorf("failed to convert string to float64: %w", err)
}

numSet[i] = number
}

return numSet
return numSet, nil
}

return nil
return nil, nil
}

func transformImage(data map[string]*dynamodb.AttributeValue) map[string]any {
func transformImage(data map[string]*dynamodb.AttributeValue) (map[string]any, error) {
transformed := make(map[string]any)
for key, attrValue := range data {
transformed[key] = transformAttributeValue(attrValue)
val, err := transformAttributeValue(attrValue)
if err != nil {
return nil, fmt.Errorf("failed to transform attribute value: %w", err)
}

transformed[key] = val
}
return transformed

return transformed, nil
}

func (m *Message) artieMessage() *util.SchemaEventPayload {
Expand Down
3 changes: 2 additions & 1 deletion lib/dynamo/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func TestTransformAttributeValue(t *testing.T) {
}

for _, tc := range tcs {
actualValue := transformAttributeValue(tc.attr)
actualValue, err := transformAttributeValue(tc.attr)
assert.NoError(t, err, tc.name)
assert.Equal(t, tc.expectedValue, actualValue, tc.name)
}
}
37 changes: 27 additions & 10 deletions lib/dynamo/parse_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s
return nil, fmt.Errorf("keys is nil")
}

// Snapshot time does not exist on the row
// Perhaps we can have it inferred from the manifest file in the future.
executionTime := time.Now()
rowData, err := transformImage(item.Item)
if err != nil {
return nil, fmt.Errorf("failed to transform item: %w", err)
}

rowData := transformImage(item.Item)
primaryKeys := make(map[string]any)
for _, key := range keys {
val, isOk := rowData[key]
Expand All @@ -33,9 +33,11 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s
}

return &Message{
op: "r",
tableName: tableName,
executionTime: executionTime,
op: "r",
tableName: tableName,
// Snapshot time does not exist on the row
// Perhaps we can have it inferred from the manifest file in the future.
executionTime: time.Now(),
afterRowData: rowData,
primaryKey: primaryKeys,
}, nil
Expand Down Expand Up @@ -67,12 +69,27 @@ func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, err
}
}

beforeData, err := transformImage(record.Dynamodb.OldImage)
if err != nil {
return nil, fmt.Errorf("failed to transform old image: %w", err)
}

afterData, err := transformImage(record.Dynamodb.NewImage)
if err != nil {
return nil, fmt.Errorf("failed to transform new image: %w", err)
}

primaryKey, err := transformImage(record.Dynamodb.Keys)
if err != nil {
return nil, fmt.Errorf("failed to transform keys: %w", err)
}

return &Message{
op: op,
tableName: tableName,
executionTime: executionTime,
beforeRowData: transformImage(record.Dynamodb.OldImage),
afterRowData: transformImage(record.Dynamodb.NewImage),
primaryKey: transformImage(record.Dynamodb.Keys),
beforeRowData: beforeData,
afterRowData: afterData,
primaryKey: primaryKey,
}, nil
}

0 comments on commit 51f3eb8

Please sign in to comment.