diff --git a/README.md b/README.md index 91bbf32ba..8e2384568 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,14 @@ To run Artie Transfer's stack locally, please refer to the [examples folder](htt ## What is currently supported? Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Currently Transfer supports: +- Message Queues + - Kafka (default) + - Google Pub/Sub + - [Destinations](https://docs.artie.so/configurations/real-time-destinations/overview): - Snowflake - BigQuery + - [Sources](https://docs.artie.so/configurations/real-time-sources/overview): - MongoDB (w/ Debezium) - Postgres (w/ Debezium), we support the following replication slot plug-ins: `pgoutput, decoderbufs, wal2json` diff --git a/go.mod b/go.mod index 154214afc..09593ca1b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/artie-labs/transfer go 1.19 require ( + cloud.google.com/go/pubsub v1.28.0 github.com/DataDog/datadog-go v4.8.3+incompatible github.com/evalphobia/logrus_sentry v0.8.2 github.com/google/uuid v1.3.0 @@ -13,12 +14,15 @@ require ( github.com/stretchr/testify v1.8.1 github.com/viant/bigquery v0.2.1-0.20230129024722-24ed6fd5555f go.mongodb.org/mongo-driver v1.11.0 + google.golang.org/api v0.103.0 gopkg.in/yaml.v3 v3.0.1 ) require ( + cloud.google.com/go v0.105.0 // indirect cloud.google.com/go/compute v1.13.0 // indirect cloud.google.com/go/compute/metadata v0.2.1 // indirect + cloud.google.com/go/iam v0.7.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/azure-storage-blob-go v0.15.0 // indirect github.com/Microsoft/go-winio v0.6.0 // indirect @@ -49,6 +53,7 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/flatbuffers v22.9.29+incompatible // indirect + github.com/google/go-cmp v0.5.9 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect github.com/googleapis/gax-go/v2 v2.7.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect @@ -74,11 +79,11 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.7.0 // indirect golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.5.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/tools v0.1.12 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect - google.golang.org/api v0.103.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect google.golang.org/grpc v1.51.0 // indirect diff --git a/go.sum b/go.sum index 3f2907b9b..1954a9dff 100644 --- a/go.sum +++ b/go.sum @@ -3,11 +3,17 @@ cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.37.0/go.mod h1:TS1dMSSfndXH133OKGwekG838Om/cQT0BUHV3HcBgoo= cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= +cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM= cloud.google.com/go/compute v1.13.0 h1:AYrLkB8NPdDRslNp4Jxmzrhdr03fUAIDbiGFjLWowoU= cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARyZtRXDJ8GE= cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= +cloud.google.com/go/iam v0.7.0 h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs= +cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQEYOeg= +cloud.google.com/go/kms v1.6.0 h1:OWRZzrPmOZUzurjI2FBGtgY2mB1WaJkqhw6oIwSj0Yg= cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs= +cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is= +cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU= @@ -175,6 +181,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -418,6 +425,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/lib/artie/message.go b/lib/artie/message.go new file mode 100644 index 000000000..5ea71bcf5 --- /dev/null +++ b/lib/artie/message.go @@ -0,0 +1,125 @@ +package artie + +import ( + "cloud.google.com/go/pubsub" + "context" + "fmt" + "github.com/artie-labs/transfer/lib/telemetry/metrics" + "github.com/segmentio/kafka-go" + "time" +) + +type Kind int + +const ( + Invalid Kind = iota + Kafka + PubSub +) + +type pubsubWrapper struct { + topic string + *pubsub.Message +} + +type Message struct { + KafkaMsg *kafka.Message + PubSub *pubsubWrapper +} + +func NewMessage(kafkaMsg *kafka.Message, pubsubMsg *pubsub.Message, topic string) Message { + var msg Message + if kafkaMsg != nil { + msg.KafkaMsg = kafkaMsg + } + + if pubsubMsg != nil { + msg.PubSub = &pubsubWrapper{ + topic: topic, + Message: pubsubMsg, + } + } + + return msg +} + +func (m *Message) Kind() Kind { + if m.KafkaMsg != nil { + return Kafka + } + + if m.PubSub != nil { + return PubSub + } + + return Invalid +} + +func (m *Message) EmitIngestionLag(ctx context.Context, groupID string) { + metrics.FromContext(ctx).Timing("ingestion.lag", time.Since(m.PublishTime()), map[string]string{ + "groupID": groupID, + "topic": m.Topic(), + "partition": m.Partition(), + }) +} + +func (m *Message) PublishTime() time.Time { + if m.KafkaMsg != nil { + return m.KafkaMsg.Time + } + + if m.PubSub != nil { + return m.PubSub.PublishTime + } + + return time.Time{} +} + +func (m *Message) Topic() string { + if m.KafkaMsg != nil { + return m.KafkaMsg.Topic + } + + if m.PubSub != nil { + return m.PubSub.topic + } + + return "" +} + +func (m *Message) Partition() string { + if m.KafkaMsg != nil { + return fmt.Sprint(m.KafkaMsg.Partition) + } + + if m.PubSub != nil { + // Pub/Sub doesn't have partitions. + return "no_partition" + } + + return "" +} + +func (m *Message) Key() []byte { + if m.KafkaMsg != nil { + return m.KafkaMsg.Key + } + + if m.PubSub != nil { + return []byte(m.PubSub.OrderingKey) + } + + return nil +} + +func (m *Message) Value() []byte { + if m.KafkaMsg != nil { + return m.KafkaMsg.Value + } + + if m.PubSub != nil { + return m.PubSub.Data + } + + return nil +} diff --git a/lib/artie/message_test.go b/lib/artie/message_test.go new file mode 100644 index 000000000..c6d3f2e30 --- /dev/null +++ b/lib/artie/message_test.go @@ -0,0 +1,43 @@ +package artie + +import ( + "cloud.google.com/go/pubsub" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "testing" +) + +const keyString = "Struct{id=12}" + +func TestNewMessage_Pubsub(t *testing.T) { + msg := NewMessage(nil, nil, "") + assert.Equal(t, msg.Topic(), "", "empty topic") + + pubsubMsg := &pubsub.Message{} + msg = NewMessage(nil, pubsubMsg, "") + assert.Equal(t, "no_partition", msg.Partition()) + + pubsubMsg.Data = []byte("hello_world") + assert.Equal(t, "hello_world", string(msg.Value())) + + pubsubMsg.OrderingKey = keyString + assert.Equal(t, []byte(keyString), msg.Key()) + + msg = NewMessage(nil, pubsubMsg, "database.schema.table") + assert.Equal(t, "database.schema.table", msg.Topic()) +} + +func TestNewMessage(t *testing.T) { + kafkaMsg := &kafka.Message{ + Topic: "test_topic", + Partition: 5, + Key: []byte(keyString), + Value: []byte("kafka_value"), + } + + msg := NewMessage(kafkaMsg, nil, "") + assert.Equal(t, "test_topic", msg.Topic()) + assert.Equal(t, "5", msg.Partition()) + assert.Equal(t, keyString, string(msg.Key())) + assert.Equal(t, "kafka_value", string(msg.Value())) +} diff --git a/lib/config/config.go b/lib/config/config.go index f2ee39ff9..7c19338ac 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -16,6 +16,12 @@ type Sentry struct { DSN string `yaml:"dsn"` } +type Pubsub struct { + ProjectID string `yaml:"projectID"` + TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` + PathToCredentials string `yaml:"pathToCredentials"` +} + type Kafka struct { BootstrapServer string `yaml:"bootstrapServer"` GroupID string `yaml:"groupID"` @@ -24,6 +30,10 @@ type Kafka struct { TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` } +func (p *Pubsub) String() string { + return fmt.Sprintf("project_id=%s, pathToCredentials=%s", p.ProjectID, p.PathToCredentials) +} + func (k *Kafka) String() string { // Don't log credentials. return fmt.Sprintf("bootstrapServer=%s, groupID=%s, user_set=%v, pass_set=%v", @@ -32,6 +42,9 @@ func (k *Kafka) String() string { type Config struct { Output constants.DestinationKind `yaml:"outputSource"` + Queue constants.QueueKind `yaml:"queue"` + + Pubsub *Pubsub Kafka *Kafka BigQuery struct { @@ -83,6 +96,11 @@ func readFileToConfig(pathToConfig string) (*Config, error) { return nil, err } + if config.Queue == "" { + // We default to Kafka for backwards compatibility + config.Queue = constants.Kafka + } + return &config, nil } @@ -98,21 +116,37 @@ func (c *Config) Validate() error { return fmt.Errorf("output: %s is invalid", c.Output) } - // TopicConfigs - if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 { - return fmt.Errorf("no kafka topic configs, kafka: %v", c.Kafka) - } + if c.Queue == constants.Kafka { + if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 { + return fmt.Errorf("no kafka topic configs, kafka: %v", c.Kafka) + } + + for _, topicConfig := range c.Kafka.TopicConfigs { + if valid := topicConfig.Valid(); !valid { + return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String()) + } + } - for _, topicConfig := range c.Kafka.TopicConfigs { - if valid := topicConfig.Valid(); !valid { - return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String()) + // Username and password are not required (if it's within the same VPC or connecting locally + if array.Empty([]string{c.Kafka.GroupID, c.Kafka.BootstrapServer}) { + return fmt.Errorf("kafka settings is invalid, kafka: %s", c.Kafka.String()) } } - // Kafka config - // Username and password are not required (if it's within the same VPC or connecting locally - if array.Empty([]string{c.Kafka.GroupID, c.Kafka.BootstrapServer}) { - return fmt.Errorf("kafka settings is invalid, kafka: %s", c.Kafka.String()) + if c.Queue == constants.PubSub { + if c.Pubsub == nil || len(c.Pubsub.TopicConfigs) == 0 { + return fmt.Errorf("no pubsub topic configs, pubsub: %v", c.Pubsub) + } + + for _, topicConfig := range c.Pubsub.TopicConfigs { + if valid := topicConfig.Valid(); !valid { + return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String()) + } + } + + if array.Empty([]string{c.Pubsub.ProjectID, c.Pubsub.PathToCredentials}) { + return fmt.Errorf("pubsub settings is invalid, pubsub: %s", c.Pubsub.String()) + } } return nil diff --git a/lib/config/config_test.go b/lib/config/config_test.go index 2da2037d4..c25be6dc9 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -2,6 +2,8 @@ package config import ( "fmt" + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/kafkalib" "io" "os" "strings" @@ -338,3 +340,34 @@ reporting: assert.Equal(t, config.Snowflake.Region, region) assert.Equal(t, config.Reporting.Sentry.DSN, sentryDSN) } + +func TestConfig_Validate(t *testing.T) { + pubsub := &Pubsub{} + cfg := &Config{ + Pubsub: pubsub, + } + + assert.Contains(t, cfg.Validate().Error(), "is invalid") + + cfg.Output = constants.Snowflake + cfg.Queue = constants.PubSub + assert.Contains(t, cfg.Validate().Error(), "no pubsub topic configs") + + pubsub.TopicConfigs = []*kafkalib.TopicConfig{ + { + Database: "db", + TableName: "table", + Schema: "schema", + Topic: "topic", + }, + } + + assert.Contains(t, cfg.Validate().Error(), "topic config is invalid") + pubsub.TopicConfigs[0].CDCFormat = constants.DBZPostgresAltFormat + pubsub.TopicConfigs[0].CDCKeyFormat = "org.apache.kafka.connect.json.JsonConverter" + assert.Contains(t, cfg.Validate().Error(), "pubsub settings is invalid") + + pubsub.ProjectID = "project_id" + pubsub.PathToCredentials = "/tmp/abc" + assert.Nil(t, cfg.Validate()) +} diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index 23a6c674e..ff96daf7d 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -10,6 +10,7 @@ const ( // SnowflakeArraySize is used because Snowflake has a max of 16,384 elements in an expression, // https://github.com/snowflakedb/snowflake-connector-python/issues/37 SnowflakeArraySize = 15000 + FlushTimeInterval = 10 * time.Second // DBZPostgresFormat is the only supported CDC format right now DBZPostgresFormat = "debezium.postgres" @@ -32,6 +33,13 @@ const ( Delete ColumnOperation = "drop" ) +type QueueKind string + +const ( + Kafka QueueKind = "kafka" + PubSub QueueKind = "pubsub" +) + type DestinationKind string const ( diff --git a/lib/optimization/event.go b/lib/optimization/event.go index 25c410ae1..154bcc79c 100644 --- a/lib/optimization/event.go +++ b/lib/optimization/event.go @@ -1,11 +1,10 @@ package optimization import ( + "github.com/artie-labs/transfer/lib/artie" "strings" "time" - "github.com/segmentio/kafka-go" - "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/ext" @@ -17,8 +16,10 @@ type TableData struct { PrimaryKey string kafkalib.TopicConfig - // Partition to the latest offset. - PartitionsToLastMessage map[int]kafka.Message + // Partition to the latest offset(s). + // For Kafka, we only need the last message to commit the offset + // However, pub/sub requires every single message to be acked + PartitionsToLastMessage map[string][]artie.Message // This is used for the automatic schema detection LatestCDCTs time.Time diff --git a/main.go b/main.go index af3075508..2849d5d1c 100644 --- a/main.go +++ b/main.go @@ -4,14 +4,14 @@ import ( "context" "os" "sync" - "time" "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/dwh/utils" "github.com/artie-labs/transfer/lib/logger" "github.com/artie-labs/transfer/lib/telemetry/metrics" "github.com/artie-labs/transfer/models" - "github.com/artie-labs/transfer/processes/kafka" + "github.com/artie-labs/transfer/processes/consumer" "github.com/artie-labs/transfer/processes/pool" ) @@ -32,14 +32,25 @@ func main() { wg.Add(1) go func() { defer wg.Done() - pool.StartPool(ctx, 10*time.Second, flushChan) + pool.StartPool(ctx, constants.FlushTimeInterval, flushChan) }() wg.Add(1) - go func() { + go func(ctx context.Context) { defer wg.Done() - kafka.StartConsumer(ctx, flushChan) - }() + + switch config.GetSettings().Config.Queue { + case constants.Kafka: + consumer.StartConsumer(ctx, flushChan) + break + case constants.PubSub: + consumer.StartSubscriber(ctx, flushChan) + break + default: + logger.FromContext(ctx).Fatalf("message queue: %s not supported", config.GetSettings().Config.Queue) + } + + }(ctx) wg.Wait() } diff --git a/models/flush/flush.go b/models/flush/flush.go index 4f89d2fef..0270e8899 100644 --- a/models/flush/flush.go +++ b/models/flush/flush.go @@ -3,12 +3,11 @@ package flush import ( "context" "github.com/artie-labs/transfer/lib/dwh/utils" - "time" - "github.com/artie-labs/transfer/lib/logger" "github.com/artie-labs/transfer/lib/telemetry/metrics" "github.com/artie-labs/transfer/models" - "github.com/artie-labs/transfer/processes/kafka" + "github.com/artie-labs/transfer/processes/consumer" + "time" ) func Flush(ctx context.Context) error { @@ -40,7 +39,7 @@ func Flush(ctx context.Context) error { log.WithError(err).WithFields(logFields).Warn("Failed to execute merge...not going to flush memory") } else { log.WithFields(logFields).Info("Merge success, clearing memory...") - commitErr := kafka.CommitOffset(ctx, tableData.Topic, tableData.PartitionsToLastMessage) + commitErr := consumer.CommitOffset(ctx, tableData.Topic, tableData.PartitionsToLastMessage) if commitErr == nil { models.GetMemoryDB().ClearTableConfig(tableName) } else { diff --git a/models/flush/flush_suite_test.go b/models/flush/flush_suite_test.go index 0b0adac44..adbf31bf4 100644 --- a/models/flush/flush_suite_test.go +++ b/models/flush/flush_suite_test.go @@ -8,7 +8,7 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/mocks" "github.com/artie-labs/transfer/models" - "github.com/artie-labs/transfer/processes/kafka" + "github.com/artie-labs/transfer/processes/consumer" "github.com/stretchr/testify/suite" "testing" ) @@ -34,7 +34,7 @@ func (f *FlushTestSuite) SetupTest() { models.LoadMemoryDB() f.fakeConsumer = &mocks.FakeConsumer{} - kafka.SetKafkaConsumer(map[string]kafkalib.Consumer{"foo": f.fakeConsumer}) + consumer.SetKafkaConsumer(map[string]kafkalib.Consumer{"foo": f.fakeConsumer}) } func TestFlushTestSuite(t *testing.T) { diff --git a/models/flush/flush_test.go b/models/flush/flush_test.go index ba134879f..5783b5e2b 100644 --- a/models/flush/flush_test.go +++ b/models/flush/flush_test.go @@ -2,6 +2,7 @@ package flush import ( "fmt" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/config/constants" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" @@ -30,7 +31,9 @@ func (f *FlushTestSuite) TestMemoryBasic() { "hi": "hello", }, } - _, err := event.Save(topicConfig, kafka.Message{Partition: 1, Offset: 1}) + + kafkaMsg := kafka.Message{Partition: 1, Offset: 1} + _, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(f.T(), err) assert.Equal(f.T(), len(models.GetMemoryDB().TableData["foo"].RowsData), i+1) } @@ -52,7 +55,8 @@ func (f *FlushTestSuite) TestShouldFlush() { } var err error - flush, err = event.Save(topicConfig, kafka.Message{Partition: 1, Offset: int64(i)}) + kafkaMsg := kafka.Message{Partition: 1, Offset: int64(i)} + flush, err = event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(f.T(), err) if flush { @@ -85,7 +89,8 @@ func (f *FlushTestSuite) TestMemoryConcurrency() { }, } - _, err := event.Save(topicConfig, kafka.Message{Partition: 1, Offset: int64(i)}) + kafkaMsg := kafka.Message{Partition: 1, Offset: int64(i)} + _, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(f.T(), err) } diff --git a/models/memory.go b/models/memory.go index 328788730..5ad9ae948 100644 --- a/models/memory.go +++ b/models/memory.go @@ -3,12 +3,12 @@ package models import ( "errors" "fmt" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" - "github.com/segmentio/kafka-go" "strings" "sync" ) @@ -35,7 +35,7 @@ func (d *DatabaseData) ClearTableConfig(tableName string) { // The boolean signifies whether we should flush immediately or not. This is because Snowflake has a constraint // On the number of elements within an expression. // The other, error - is returned to see if anything went awry. -func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message kafka.Message) (bool, error) { +func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message artie.Message) (bool, error) { if topicConfig == nil { return false, errors.New("topicConfig is missing") } @@ -55,13 +55,21 @@ func (e *Event) Save(topicConfig *kafkalib.TopicConfig, message kafka.Message) ( InMemoryColumns: map[string]typing.KindDetails{}, PrimaryKey: e.PrimaryKeyName, TopicConfig: *topicConfig, - PartitionsToLastMessage: map[int]kafka.Message{}, + PartitionsToLastMessage: map[string][]artie.Message{}, } } // Update the key, offset and TS inMemoryDB.TableData[e.Table].RowsData[fmt.Sprint(e.PrimaryKeyValue)] = e.Data - inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition] = message + + // If the message is Kafka, then we only need the latest one + // If it's pubsub, we will store all of them in memory. This is because GCP pub/sub REQUIRES us to ack every single message + if message.Kind() == artie.Kafka { + inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = []artie.Message{message} + } else { + inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()] = append(inMemoryDB.TableData[e.Table].PartitionsToLastMessage[message.Partition()], message) + } + inMemoryDB.TableData[e.Table].LatestCDCTs = e.ExecutionTime // Increment row count diff --git a/models/memory_test.go b/models/memory_test.go index 1c080b14a..99567f0a2 100644 --- a/models/memory_test.go +++ b/models/memory_test.go @@ -1,6 +1,7 @@ package models import ( + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/typing" @@ -31,7 +32,8 @@ func (m *ModelsTestSuite) SaveEvent() { }, } - _, err := event.Save(topicConfig, kafka.Message{}) + kafkaMsg := kafka.Message{} + _, err := event.Save(topicConfig, artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic)) assert.Nil(m.T(), err) optimization := GetMemoryDB().TableData["foo"] @@ -61,7 +63,8 @@ func (m *ModelsTestSuite) SaveEvent() { }, } - _, err = edgeCaseEvent.Save(topicConfig, kafka.Message{}) + newKafkaMsg := kafka.Message{} + _, err = edgeCaseEvent.Save(topicConfig, artie.NewMessage(&newKafkaMsg, nil, newKafkaMsg.Topic)) assert.NoError(m.T(), err) val, isOk := GetMemoryDB().TableData["foo"].InMemoryColumns[badColumn] assert.True(m.T(), isOk) diff --git a/processes/consumer/configs.go b/processes/consumer/configs.go new file mode 100644 index 000000000..92287aa0e --- /dev/null +++ b/processes/consumer/configs.go @@ -0,0 +1,33 @@ +package consumer + +import ( + "context" + "github.com/artie-labs/transfer/lib/artie" + "github.com/artie-labs/transfer/lib/cdc" + "github.com/artie-labs/transfer/lib/kafkalib" +) + +type TopicConfigFormatter struct { + tc *kafkalib.TopicConfig + cdc.Format +} + +func CommitOffset(ctx context.Context, topic string, partitionsToOffset map[string][]artie.Message) error { + var err error + for _, msgs := range partitionsToOffset { + for _, msg := range msgs { + if msg.KafkaMsg != nil { + err = topicToConsumer[topic].CommitMessages(ctx, *msg.KafkaMsg) + if err != nil { + return err + } + } + + if msg.PubSub != nil { + msg.PubSub.Ack() + } + } + } + + return err +} diff --git a/processes/kafka/consumer.go b/processes/consumer/kafka.go similarity index 75% rename from processes/kafka/consumer.go rename to processes/consumer/kafka.go index 37214e958..92b70fb20 100644 --- a/processes/kafka/consumer.go +++ b/processes/consumer/kafka.go @@ -1,28 +1,21 @@ -package kafka +package consumer import ( "context" "crypto/tls" - "fmt" + "github.com/artie-labs/transfer/lib/artie" "sync" "time" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" - "github.com/artie-labs/transfer/lib/cdc" "github.com/artie-labs/transfer/lib/cdc/format" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/logger" - "github.com/artie-labs/transfer/lib/telemetry/metrics" ) -type TopicConfigFormatter struct { - tc *kafkalib.TopicConfig - cdc.Format -} - var topicToConsumer map[string]kafkalib.Consumer // SetKafkaConsumer - This is used for tests. @@ -30,18 +23,6 @@ func SetKafkaConsumer(_topicToConsumer map[string]kafkalib.Consumer) { topicToConsumer = _topicToConsumer } -func CommitOffset(ctx context.Context, topic string, partitionsToOffset map[int]kafka.Message) error { - var err error - for _, msg := range partitionsToOffset { - err = topicToConsumer[topic].CommitMessages(ctx, msg) - if err != nil { - return err - } - } - - return err -} - func StartConsumer(ctx context.Context, flushChan chan bool) { log := logger.FromContext(ctx) log.Info("Starting Kafka consumer...", config.GetSettings().Config.Kafka) @@ -88,12 +69,13 @@ func StartConsumer(ctx context.Context, flushChan chan bool) { topicToConsumer[topic] = kafkaConsumer for { - msg, err := kafkaConsumer.FetchMessage(ctx) + kafkaMsg, err := kafkaConsumer.FetchMessage(ctx) + msg := artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic) logFields := map[string]interface{}{ "topic": msg.Topic, - "offset": msg.Offset, - "key": string(msg.Key), - "value": string(msg.Value), + "offset": kafkaMsg.Offset, + "key": string(msg.Key()), + "value": string(msg.Value()), } if err != nil { @@ -101,12 +83,7 @@ func StartConsumer(ctx context.Context, flushChan chan bool) { continue } - metrics.FromContext(ctx).Timing("ingestion.lag", time.Since(msg.Time), map[string]string{ - "groupID": kafkaConsumer.Config().GroupID, - "topic": msg.Topic, - "partition": fmt.Sprint(msg.Partition), - }) - + msg.EmitIngestionLag(ctx, kafkaConsumer.Config().GroupID) shouldFlush, processErr := processMessage(ctx, msg, topicToConfigFmtMap, kafkaConsumer.Config().GroupID) if processErr != nil { log.WithError(processErr).WithFields(logFields).Warn("skipping message...") diff --git a/processes/kafka/process.go b/processes/consumer/process.go similarity index 70% rename from processes/kafka/process.go rename to processes/consumer/process.go index 05c73aadc..13f9aea00 100644 --- a/processes/kafka/process.go +++ b/processes/consumer/process.go @@ -1,18 +1,18 @@ -package kafka +package consumer import ( "context" "fmt" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/telemetry/metrics" "github.com/artie-labs/transfer/models" - "github.com/segmentio/kafka-go" "time" ) -func processMessage(ctx context.Context, msg kafka.Message, topicToConfigFmtMap map[string]TopicConfigFormatter, groupID string) (shouldFlush bool, err error) { +func processMessage(ctx context.Context, msg artie.Message, topicToConfigFmtMap map[string]TopicConfigFormatter, groupID string) (shouldFlush bool, err error) { tags := map[string]string{ "groupID": groupID, - "topic": msg.Topic, + "topic": msg.Topic(), "what": "success", } st := time.Now() @@ -20,27 +20,24 @@ func processMessage(ctx context.Context, msg kafka.Message, topicToConfigFmtMap metrics.FromContext(ctx).Timing("process.message", time.Since(st), tags) }() - topicConfig, isOk := topicToConfigFmtMap[msg.Topic] + topicConfig, isOk := topicToConfigFmtMap[msg.Topic()] if !isOk { tags["what"] = "failed_topic_lookup" - return false, fmt.Errorf("failed to get topic name: %s", msg.Topic) + return false, fmt.Errorf("failed to get topic name: %s", msg.Topic()) } tags["database"] = topicConfig.tc.Database tags["schema"] = topicConfig.tc.Schema tags["table"] = topicConfig.tc.TableName - pkName, pkValue, err := topicConfig.GetPrimaryKey(ctx, msg.Key, topicConfig.tc) + pkName, pkValue, err := topicConfig.GetPrimaryKey(ctx, msg.Key(), topicConfig.tc) if err != nil { tags["what"] = "marshall_pk_err" - return false, fmt.Errorf("cannot unmarshall key, key: %s, err: %v", string(msg.Key), err) + return false, fmt.Errorf("cannot unmarshall key, key: %s, err: %v", string(msg.Key()), err) } - event, err := topicConfig.GetEventFromBytes(ctx, msg.Value) + event, err := topicConfig.GetEventFromBytes(ctx, msg.Value()) if err != nil { - // TODO: Can we filter tombstone events? - // A tombstone event will be sent to Kafka when a DELETE happens. - // Which causes marshalling error. tags["what"] = "marshall_value_err" return false, fmt.Errorf("cannot unmarshall event, err: %v", err) } diff --git a/processes/kafka/process_test.go b/processes/consumer/process_test.go similarity index 85% rename from processes/kafka/process_test.go rename to processes/consumer/process_test.go index e94ce533b..350bd5c33 100644 --- a/processes/kafka/process_test.go +++ b/processes/consumer/process_test.go @@ -1,8 +1,9 @@ -package kafka +package consumer import ( "context" "fmt" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/cdc/mongo" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" @@ -18,7 +19,7 @@ func TestProcessMessageFailures(t *testing.T) { models.LoadMemoryDB() ctx := context.Background() - msg := kafka.Message{ + kafkaMsg := kafka.Message{ Topic: "foo", Partition: 0, Offset: 0, @@ -29,6 +30,7 @@ func TestProcessMessageFailures(t *testing.T) { Time: time.Time{}, } + msg := artie.NewMessage(&kafkaMsg, nil, kafkaMsg.Topic) shouldFlush, err := processMessage(ctx, msg, nil, "foo") assert.False(t, shouldFlush) assert.True(t, strings.Contains(err.Error(), "failed to get topic"), err.Error()) @@ -41,12 +43,12 @@ func TestProcessMessageFailures(t *testing.T) { ) topicToConfigFmtMap := map[string]TopicConfigFormatter{ - msg.Topic: { + msg.Topic(): { tc: &kafkalib.TopicConfig{ Database: db, TableName: table, Schema: schema, - Topic: msg.Topic, + Topic: msg.Topic(), IdempotentKey: "", CDCFormat: "", CDCKeyFormat: "", @@ -58,10 +60,10 @@ func TestProcessMessageFailures(t *testing.T) { shouldFlush, err = processMessage(ctx, msg, topicToConfigFmtMap, "foo") assert.False(t, shouldFlush) assert.True(t, strings.Contains(err.Error(), - fmt.Sprintf("err: format: %s is not supported", topicToConfigFmtMap[msg.Topic].tc.CDCKeyFormat)), err.Error()) + fmt.Sprintf("err: format: %s is not supported", topicToConfigFmtMap[msg.Topic()].tc.CDCKeyFormat)), err.Error()) assert.True(t, strings.Contains(err.Error(), "cannot unmarshall key"), err.Error()) - topicToConfigFmtMap[msg.Topic].tc.CDCKeyFormat = "org.apache.kafka.connect.storage.StringConverter" + topicToConfigFmtMap[msg.Topic()].tc.CDCKeyFormat = "org.apache.kafka.connect.storage.StringConverter" vals := []string{ "", @@ -99,9 +101,9 @@ func TestProcessMessageFailures(t *testing.T) { memoryDB := models.GetMemoryDB() for _, val := range vals { idx += 1 - msg.Key = []byte(fmt.Sprintf("Struct{id=%v}", idx)) + msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", idx)) if val != "" { - msg.Value = []byte(val) + msg.KafkaMsg.Value = []byte(val) } shouldFlush, err := processMessage(ctx, msg, topicToConfigFmtMap, "foo") @@ -122,7 +124,7 @@ func TestProcessMessageFailures(t *testing.T) { assert.True(t, isOk) assert.False(t, val.(bool)) - msg.Value = []byte("not a json object") + msg.KafkaMsg.Value = []byte("not a json object") shouldFlush, err = processMessage(ctx, msg, topicToConfigFmtMap, "foo") assert.False(t, shouldFlush) assert.Error(t, err) diff --git a/processes/consumer/pubsub.go b/processes/consumer/pubsub.go new file mode 100644 index 000000000..a29767dff --- /dev/null +++ b/processes/consumer/pubsub.go @@ -0,0 +1,107 @@ +package consumer + +import ( + gcp_pubsub "cloud.google.com/go/pubsub" + "context" + "fmt" + "github.com/artie-labs/transfer/lib/artie" + "github.com/artie-labs/transfer/lib/cdc/format" + "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/logger" + "google.golang.org/api/option" + "sync" + "time" +) + +const defaultAckDeadline = 5 * time.Minute +const subscriptionName = "transfer" + +func findOrCreateSubscription(ctx context.Context, client *gcp_pubsub.Client, topic string) (*gcp_pubsub.Subscription, error) { + log := logger.FromContext(ctx) + sub := client.Subscription(subscriptionName) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, fmt.Errorf("failed to fetch subscription, err: %v", err) + } + + if !exists { + log.WithField("topic", topic).Info("subscription does not exist, creating one...") + gcpTopic := client.Topic(topic) + exists, err = gcpTopic.Exists(ctx) + if !exists || err != nil { + // We error out if the topic does not exist or there's an error. + return nil, fmt.Errorf("failed to fetch gcp topic, exists: %v, err: %v", exists, err) + } + + sub, err = client.CreateSubscription(ctx, subscriptionName, gcp_pubsub.SubscriptionConfig{ + Topic: gcpTopic, + AckDeadline: defaultAckDeadline, + // Enable ordering given the `partition key` which is known as ordering key in Pub/Sub + EnableMessageOrdering: true, + }) + + if err != nil { + return nil, fmt.Errorf("failed to create subscription, topic: %s, err: %v", topic, err) + } + } + + return sub, err +} + +func StartSubscriber(ctx context.Context, flushChan chan bool) { + log := logger.FromContext(ctx) + client, clientErr := gcp_pubsub.NewClient(ctx, config.GetSettings().Config.Pubsub.ProjectID, + option.WithCredentialsFile(config.GetSettings().Config.Pubsub.PathToCredentials)) + if clientErr != nil { + log.Fatalf("failed to create a pubsub client, err: %v", clientErr) + } + + topicToConfigFmtMap := make(map[string]TopicConfigFormatter) + var topics []string + for _, topicConfig := range config.GetSettings().Config.Pubsub.TopicConfigs { + topicToConfigFmtMap[topicConfig.Topic] = TopicConfigFormatter{ + tc: topicConfig, + Format: format.GetFormatParser(ctx, topicConfig.CDCFormat), + } + topics = append(topics, topicConfig.Topic) + } + + var wg sync.WaitGroup + for _, topicConfig := range config.GetSettings().Config.Pubsub.TopicConfigs { + wg.Add(1) + go func(ctx context.Context, client *gcp_pubsub.Client, topic string) { + defer wg.Done() + sub, err := findOrCreateSubscription(ctx, client, topic) + if err != nil { + log.Fatalf("failed to find or create subscription, err: %v", err) + } + + err = sub.Receive(ctx, func(_ context.Context, pubsubMsg *gcp_pubsub.Message) { + msg := artie.NewMessage(nil, pubsubMsg, topic) + msg.EmitIngestionLag(ctx, subscriptionName) + logFields := map[string]interface{}{ + "topic": msg.Topic(), + "msgID": msg.PubSub.ID, + "key": string(msg.Key()), + "value": string(msg.Value()), + } + + shouldFlush, processErr := processMessage(ctx, msg, topicToConfigFmtMap, subscriptionName) + if processErr != nil { + log.WithError(processErr).WithFields(logFields).Warn("skipping message...") + } + + if shouldFlush { + flushChan <- true + } + }) + + if err != nil { + log.Fatalf("sub receive error, err: %v", err) + } + }(ctx, client, topicConfig.Topic) + + } + + wg.Wait() +}