diff --git a/examples/dynamodb/service_account.tf b/examples/dynamodb/service_account.tf index b3709fc9..1276c316 100644 --- a/examples/dynamodb/service_account.tf +++ b/examples/dynamodb/service_account.tf @@ -19,7 +19,7 @@ resource "aws_iam_role" "dynamodb_streams_role" { Version = "2012-10-17", Statement = [ { - Action = "sts:AssumeRole", + Action = "sts:AssumeRole", Principal = { Service = "ec2.amazonaws.com" }, @@ -45,8 +45,11 @@ resource "aws_iam_policy" "dynamodb_streams_access" { "dynamodb:GetRecords", "dynamodb:ListStreams", - // Stuff only required for export (snapshot) - "dynamodb:DescribeTable" + // Required for export + "dynamodb:DescribeTable", + "dynamodb:ListExports", + "dynamodb:DescribeExport", + "dynamodb:ExportTableToPointInTime" ], // Don't want to use "*"? You can specify like this: // Resource = [ TABLE_ARN, TABLE_ARN + "/stream/*" ] @@ -63,9 +66,12 @@ resource "aws_iam_policy" "dynamodb_streams_access" { { "Effect" : "Allow", "Action" : [ - "s3:GetObject" + "s3:GetObject", + // Required for export + "s3:PutObject", + "s3:GetBucketLocation" ], - "Resource" : "arn:aws:s3:::artie-transfer-test/AWSDynamoDB/*" + "Resource" : "arn:aws:s3:::artie-transfer-test/*" } ] }) diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index cc312d1f..7533dc11 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -89,7 +89,9 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error } var msgs []kafka.Message + var sampleExecutionTime time.Time for _, rawMsg := range rawMsgs { + sampleExecutionTime = rawMsg.Event().GetExecutionTime() kafkaMsg, err := newMessage(b.cfg.TopicPrefix, rawMsg) if err != nil { return fmt.Errorf("failed to encode kafka message: %w", err) @@ -135,6 +137,7 @@ func (b *BatchWriter) Write(ctx context.Context, rawMsgs []lib.RawMessage) error if b.statsD != nil { b.statsD.Count("kafka.publish", int64(len(batch)), tags) + b.statsD.Gauge("kafka.lag_ms", float64(time.Since(sampleExecutionTime).Milliseconds()), tags) } if kafkaErr != nil { diff --git a/sources/mongo/message.go b/sources/mongo/message.go index b18e3a06..73afdd72 100644 --- a/sources/mongo/message.go +++ b/sources/mongo/message.go @@ -10,6 +10,7 @@ import ( "github.com/artie-labs/transfer/lib/cdc/mongo" "github.com/artie-labs/transfer/lib/debezium" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" ) type Message struct { @@ -40,32 +41,49 @@ func (m *Message) ToRawMessage(collection config.Collection, database string) (l } func ParseMessage(result bson.M, op string) (*Message, error) { + bsonPk, isOk := result["_id"] + if !isOk { + return nil, fmt.Errorf("failed to get partition key, row: %v", result) + } + // When canonical is enabled, it will emphasize type preservation jsonExtendedBytes, err := bson.MarshalExtJSON(result, true, false) if err != nil { return nil, fmt.Errorf("failed to marshal document to JSON extended: %w", err) } - var jsonExtendedMap map[string]any - if err = json.Unmarshal(jsonExtendedBytes, &jsonExtendedMap); err != nil { - return nil, fmt.Errorf("failed to unmarshal JSON extended to map: %w", err) - } + var idString string + switch castedPk := bsonPk.(type) { + case primitive.ObjectID: + idString = fmt.Sprintf(`{"$oid":"%s"}`, castedPk.Hex()) + case string: + idString = castedPk + case int, int32, int64: + idString = fmt.Sprintf("%d", castedPk) + default: + var jsonExtendedMap map[string]any + if err = json.Unmarshal(jsonExtendedBytes, &jsonExtendedMap); err != nil { + return nil, fmt.Errorf("failed to unmarshal JSON extended to map: %w", err) + } - pk, isOk := jsonExtendedMap["_id"] - if !isOk { - return nil, fmt.Errorf("failed to get partition key, row: %v", jsonExtendedMap) - } + pk, isOk := jsonExtendedMap["_id"] + if !isOk { + return nil, fmt.Errorf("failed to get partition key, row: %v", jsonExtendedMap) + } - pkBytes, err := json.Marshal(pk) - if err != nil { - return nil, fmt.Errorf("failed to marshal ext json: %w", err) + pkBytes, err := json.Marshal(pk) + if err != nil { + return nil, fmt.Errorf("failed to marshal ext json: %w", err) + } + + idString = string(pkBytes) } return &Message{ jsonExtendedString: string(jsonExtendedBytes), operation: op, pkMap: map[string]any{ - "id": string(pkBytes), + "id": idString, }, }, nil } diff --git a/sources/mongo/message_test.go b/sources/mongo/message_test.go index 32325ea0..87fd8fea 100644 --- a/sources/mongo/message_test.go +++ b/sources/mongo/message_test.go @@ -2,6 +2,7 @@ package mongo import ( "encoding/json" + "fmt" "testing" "time" @@ -15,22 +16,39 @@ import ( ) func TestParseMessagePartitionKey(t *testing.T) { - objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011") - assert.NoError(t, err) - msg, err := ParseMessage(bson.M{"_id": objId}, "r") - assert.NoError(t, err) - assert.Equal(t, `{"$oid":"507f1f77bcf86cd799439011"}`, msg.pkMap["id"]) - - rawMsg, err := msg.ToRawMessage(config.Collection{}, "database") - assert.NoError(t, err) - - rawMsgBytes, err := json.Marshal(rawMsg.PartitionKey()) - assert.NoError(t, err) - - var dbz transferMongo.Debezium - pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) - assert.NoError(t, err) - assert.Equal(t, "507f1f77bcf86cd799439011", pkMap["_id"]) + { + // Primary key as object ID + objId, err := primitive.ObjectIDFromHex("507f1f77bcf86cd799439011") + assert.NoError(t, err) + msg, err := ParseMessage(bson.M{"_id": objId}, "r") + assert.NoError(t, err) + assert.Equal(t, `{"$oid":"507f1f77bcf86cd799439011"}`, msg.pkMap["id"]) + + rawMsg, err := msg.ToRawMessage(config.Collection{}, "database") + assert.NoError(t, err) + + rawMsgBytes, err := json.Marshal(rawMsg.PartitionKey()) + assert.NoError(t, err) + + var dbz transferMongo.Debezium + pkMap, err := dbz.GetPrimaryKey(rawMsgBytes, &kafkalib.TopicConfig{CDCKeyFormat: kafkalib.JSONKeyFmt}) + assert.NoError(t, err) + assert.Equal(t, "507f1f77bcf86cd799439011", pkMap["_id"]) + } + { + // Primary key as string + msg, err := ParseMessage(bson.M{"_id": "hello world"}, "r") + assert.NoError(t, err) + assert.Equal(t, "hello world", msg.pkMap["id"]) + } + { + // Primary key as ints + for _, val := range []any{1001, int32(1002), int64(1003)} { + msg, err := ParseMessage(bson.M{"_id": val}, "r") + assert.NoError(t, err) + assert.Equal(t, fmt.Sprint(val), msg.pkMap["id"]) + } + } } func TestParseMessage(t *testing.T) {