diff --git a/lib/dynamo/message.go b/lib/dynamo/message.go index 20c19727..f9bd09cc 100644 --- a/lib/dynamo/message.go +++ b/lib/dynamo/message.go @@ -1,7 +1,7 @@ package dynamo import ( - "log/slog" + "fmt" "strconv" "time" @@ -26,66 +26,81 @@ func stringToFloat64(s string) (float64, error) { // transformAttributeValue converts a DynamoDB AttributeValue to a Go type. // References: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html -func transformAttributeValue(attr *dynamodb.AttributeValue) any { +func transformAttributeValue(attr *dynamodb.AttributeValue) (any, error) { switch { case attr.S != nil: - return *attr.S + return *attr.S, nil case attr.N != nil: number, err := stringToFloat64(*attr.N) - if err == nil { - return number - } else { - slog.Error("Failed to convert string to float64", slog.Any("err", err), slog.String("string", *attr.N)) - // TODO - should we throw an error here? - return nil + if err != nil { + return nil, fmt.Errorf("failed to convert string to float64: %w", err) } + + return number, nil case attr.BOOL != nil: - return *attr.BOOL + return *attr.BOOL, nil case attr.M != nil: result := make(map[string]any) for k, v := range attr.M { - result[k] = transformAttributeValue(v) + val, err := transformAttributeValue(v) + if err != nil { + return nil, fmt.Errorf("failed to transform attribute value: %w", err) + } + + result[k] = val } - return result + + return result, nil case attr.L != nil: list := make([]any, len(attr.L)) for i, item := range attr.L { - list[i] = transformAttributeValue(item) + val, err := transformAttributeValue(item) + if err != nil { + return nil, fmt.Errorf("failed to transform attribute value: %w", err) + } + + list[i] = val } - return list + + return list, nil case attr.SS != nil: // Convert the string set to a slice of strings. strSet := make([]string, len(attr.SS)) for i, s := range attr.SS { strSet[i] = *s } - return strSet + + return strSet, nil case attr.NS != nil: // Convert the number set to a slice of strings (since the numbers are stored as strings). numSet := make([]float64, len(attr.NS)) for i, n := range attr.NS { number, err := stringToFloat64(*n) if err != nil { - slog.Error("Failed to convert string to float64", slog.Any("err", err), slog.String("string", *n)) - // TODO - should we throw an error here? - return nil + return nil, fmt.Errorf("failed to convert string to float64: %w", err) } numSet[i] = number } - return numSet + return numSet, nil } - return nil + return nil, nil } -func transformImage(data map[string]*dynamodb.AttributeValue) map[string]any { +func transformImage(data map[string]*dynamodb.AttributeValue) (map[string]any, error) { transformed := make(map[string]any) for key, attrValue := range data { - transformed[key] = transformAttributeValue(attrValue) + val, err := transformAttributeValue(attrValue) + if err != nil { + return nil, fmt.Errorf("failed to transform attribute value: %w", err) + } + + transformed[key] = val } - return transformed + + return transformed, nil } func (m *Message) artieMessage() *util.SchemaEventPayload { diff --git a/lib/dynamo/message_test.go b/lib/dynamo/message_test.go index 6f151885..25ad2081 100644 --- a/lib/dynamo/message_test.go +++ b/lib/dynamo/message_test.go @@ -119,7 +119,8 @@ func TestTransformAttributeValue(t *testing.T) { } for _, tc := range tcs { - actualValue := transformAttributeValue(tc.attr) + actualValue, err := transformAttributeValue(tc.attr) + assert.NoError(t, err, tc.name) assert.Equal(t, tc.expectedValue, actualValue, tc.name) } } diff --git a/lib/dynamo/parse_message.go b/lib/dynamo/parse_message.go index ea2a6f25..28290627 100644 --- a/lib/dynamo/parse_message.go +++ b/lib/dynamo/parse_message.go @@ -17,11 +17,11 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s return nil, fmt.Errorf("keys is nil") } - // Snapshot time does not exist on the row - // Perhaps we can have it inferred from the manifest file in the future. - executionTime := time.Now() + rowData, err := transformImage(item.Item) + if err != nil { + return nil, fmt.Errorf("failed to transform item: %w", err) + } - rowData := transformImage(item.Item) primaryKeys := make(map[string]any) for _, key := range keys { val, isOk := rowData[key] @@ -33,9 +33,11 @@ func NewMessageFromExport(item dynamodb.ItemResponse, keys []string, tableName s } return &Message{ - op: "r", - tableName: tableName, - executionTime: executionTime, + op: "r", + tableName: tableName, + // Snapshot time does not exist on the row + // Perhaps we can have it inferred from the manifest file in the future. + executionTime: time.Now(), afterRowData: rowData, primaryKey: primaryKeys, }, nil @@ -67,12 +69,27 @@ func NewMessage(record *dynamodbstreams.Record, tableName string) (*Message, err } } + beforeData, err := transformImage(record.Dynamodb.OldImage) + if err != nil { + return nil, fmt.Errorf("failed to transform old image: %w", err) + } + + afterData, err := transformImage(record.Dynamodb.NewImage) + if err != nil { + return nil, fmt.Errorf("failed to transform new image: %w", err) + } + + primaryKey, err := transformImage(record.Dynamodb.Keys) + if err != nil { + return nil, fmt.Errorf("failed to transform keys: %w", err) + } + return &Message{ op: op, tableName: tableName, executionTime: executionTime, - beforeRowData: transformImage(record.Dynamodb.OldImage), - afterRowData: transformImage(record.Dynamodb.NewImage), - primaryKey: transformImage(record.Dynamodb.Keys), + beforeRowData: beforeData, + afterRowData: afterData, + primaryKey: primaryKey, }, nil }