Skip to content

Commit

Permalink
Merge branch 'master' into dana/throttle-dynamo
Browse files Browse the repository at this point in the history
  • Loading branch information
danafallon committed Jul 31, 2024
2 parents 40d3fd5 + b4c2cbc commit a4ea600
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 33 deletions.
16 changes: 11 additions & 5 deletions examples/dynamodb/service_account.tf
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ resource "aws_iam_role" "dynamodb_streams_role" {
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Action = "sts:AssumeRole",
Principal = {
Service = "ec2.amazonaws.com"
},
Expand All @@ -45,8 +45,11 @@ resource "aws_iam_policy" "dynamodb_streams_access" {
"dynamodb:GetRecords",
"dynamodb:ListStreams",

// Stuff only required for export (snapshot)
"dynamodb:DescribeTable"
// Required for export
"dynamodb:DescribeTable",
"dynamodb:ListExports",
"dynamodb:DescribeExport",
"dynamodb:ExportTableToPointInTime"
],
// Don't want to use "*"? You can specify like this:
// Resource = [ TABLE_ARN, TABLE_ARN + "/stream/*" ]
Expand All @@ -63,9 +66,12 @@ resource "aws_iam_policy" "dynamodb_streams_access" {
{
"Effect" : "Allow",
"Action" : [
"s3:GetObject"
"s3:GetObject",
// Required for export
"s3:PutObject",
"s3:GetBucketLocation"
],
"Resource" : "arn:aws:s3:::artie-transfer-test/AWSDynamoDB/*"
"Resource" : "arn:aws:s3:::artie-transfer-test/*"
}
]
})
Expand Down
3 changes: 3 additions & 0 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error
}

var msgs []kafka.Message
var sampleExecutionTime time.Time
for _, rawMsg := range rawMsgs {
sampleExecutionTime = rawMsg.Event().GetExecutionTime()
kafkaMsg, err := newMessage(b.cfg.TopicPrefix, rawMsg)
if err != nil {
return fmt.Errorf("failed to encode kafka message: %w", err)
Expand Down Expand Up @@ -135,6 +137,7 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error

if b.statsD != nil {
b.statsD.Count("kafka.publish", int64(len(batch)), tags)
b.statsD.Gauge("kafka.lag_ms", float64(time.Since(sampleExecutionTime).Milliseconds()), tags)
}

if kafkaErr != nil {
Expand Down
42 changes: 30 additions & 12 deletions sources/mongo/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/artie-labs/transfer/lib/cdc/mongo"
"github.com/artie-labs/transfer/lib/debezium"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type Message struct {
Expand Down Expand Up @@ -40,32 +41,49 @@ func (m *Message) ToRawMessage(collection config.Collection, database string) (l
}

func ParseMessage(result bson.M, op string) (*Message, error) {
bsonPk, isOk := result["_id"]
if !isOk {
return nil, fmt.Errorf("failed to get partition key, row: %v", result)
}

// When canonical is enabled, it will emphasize type preservation
jsonExtendedBytes, err := bson.MarshalExtJSON(result, true, false)
if err != nil {
return nil, fmt.Errorf("failed to marshal document to JSON extended: %w", err)
}

var jsonExtendedMap map[string]any
if err = json.Unmarshal(jsonExtendedBytes, &jsonExtendedMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON extended to map: %w", err)
}
var idString string
switch castedPk := bsonPk.(type) {
case primitive.ObjectID:
idString = fmt.Sprintf(`{"$oid":"%s"}`, castedPk.Hex())
case string:
idString = castedPk
case int, int32, int64:
idString = fmt.Sprintf("%d", castedPk)
default:
var jsonExtendedMap map[string]any
if err = json.Unmarshal(jsonExtendedBytes, &jsonExtendedMap); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON extended to map: %w", err)
}

pk, isOk := jsonExtendedMap["_id"]
if !isOk {
return nil, fmt.Errorf("failed to get partition key, row: %v", jsonExtendedMap)
}
pk, isOk := jsonExtendedMap["_id"]
if !isOk {
return nil, fmt.Errorf("failed to get partition key, row: %v", jsonExtendedMap)
}

pkBytes, err := json.Marshal(pk)
if err != nil {
return nil, fmt.Errorf("failed to marshal ext json: %w", err)
pkBytes, err := json.Marshal(pk)
if err != nil {
return nil, fmt.Errorf("failed to marshal ext json: %w", err)
}

idString = string(pkBytes)
}

return &Message{
jsonExtendedString: string(jsonExtendedBytes),
operation: op,
pkMap: map[string]any{
"id": string(pkBytes),
"id": idString,
},
}, nil
}
50 changes: 34 additions & 16 deletions sources/mongo/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mongo

import (
"encoding/json"
"fmt"
"testing"
"time"

Expand All @@ -15,22 +16,39 @@ import (
)

func TestParseMessagePartitionKey(t *testing.T) {
objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011")
assert.NoError(t, err)
msg, err := ParseMessage(bson.M{"_id": objId}, "r")
assert.NoError(t, err)
assert.Equal(t, `{"$oid":"507f1f77bcf86cd799439011"}`, msg.pkMap["id"])

rawMsg, err := msg.ToRawMessage(config.Collection{}, "database")
assert.NoError(t, err)

rawMsgBytes, err := json.Marshal(rawMsg.PartitionKey())
assert.NoError(t, err)

var dbz transferMongo.Debezium
pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
assert.NoError(t, err)
assert.Equal(t, "507f1f77bcf86cd799439011", pkMap["_id"])
{
// Primary key as object ID
objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011")
assert.NoError(t, err)
msg, err := ParseMessage(bson.M{"_id": objId}, "r")
assert.NoError(t, err)
assert.Equal(t, `{"$oid":"507f1f77bcf86cd799439011"}`, msg.pkMap["id"])

rawMsg, err := msg.ToRawMessage(config.Collection{}, "database")
assert.NoError(t, err)

rawMsgBytes, err := json.Marshal(rawMsg.PartitionKey())
assert.NoError(t, err)

var dbz transferMongo.Debezium
pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt})
assert.NoError(t, err)
assert.Equal(t, "507f1f77bcf86cd799439011", pkMap["_id"])
}
{
// Primary key as string
msg, err := ParseMessage(bson.M{"_id": "hello world"}, "r")
assert.NoError(t, err)
assert.Equal(t, "hello world", msg.pkMap["id"])
}
{
// Primary key as ints
for _, val := range []any{1001, int32(1002), int64(1003)} {
msg, err := ParseMessage(bson.M{"_id": val}, "r")
assert.NoError(t, err)
assert.Equal(t, fmt.Sprint(val), msg.pkMap["id"])
}
}
}

func TestParseMessage(t *testing.T) {
Expand Down

0 comments on commit a4ea600

Please sign in to comment.