Skip to content

Commit

Permalink
Ability to use GCP Pub/Sub instead of Kafka (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Mar 7, 2023
1 parent 1c601b4 commit d73c021
Show file tree
Hide file tree
Showing 20 changed files with 494 additions and 89 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ To run Artie Transfer's stack locally, please refer to the [examples folder](htt
## What is currently supported?
Transfer is aiming to provide coverage across all OLTPs and OLAPs databases. Currently Transfer supports:

- Message Queues
- Kafka (default)
- Google Pub/Sub

- [Destinations](https://docs.artie.so/configurations/real-time-destinations/overview):
- Snowflake
- BigQuery

- [Sources](https://docs.artie.so/configurations/real-time-sources/overview):
- MongoDB (w/ Debezium)
- Postgres (w/ Debezium), we support the following replication slot plug-ins: `pgoutput, decoderbufs, wal2json`
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/artie-labs/transfer
go 1.19

require (
cloud.google.com/go/pubsub v1.28.0
github.com/DataDog/datadog-go v4.8.3+incompatible
github.com/evalphobia/logrus_sentry v0.8.2
github.com/google/uuid v1.3.0
Expand All @@ -13,12 +14,15 @@ require (
github.com/stretchr/testify v1.8.1
github.com/viant/bigquery v0.2.1-0.20230129024722-24ed6fd5555f
go.mongodb.org/mongo-driver v1.11.0
google.golang.org/api v0.103.0
gopkg.in/yaml.v3 v3.0.1
)

require (
cloud.google.com/go v0.105.0 // indirect
cloud.google.com/go/compute v1.13.0 // indirect
cloud.google.com/go/compute/metadata v0.2.1 // indirect
cloud.google.com/go/iam v0.7.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
Expand Down Expand Up @@ -49,6 +53,7 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/flatbuffers v22.9.29+incompatible // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand All @@ -74,11 +79,11 @@ require (
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.103.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
google.golang.org/grpc v1.51.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.37.0/go.mod h1:TS1dMSSfndXH133OKGwekG838Om/cQT0BUHV3HcBgoo=
cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y=
cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM=
cloud.google.com/go/compute v1.13.0 h1:AYrLkB8NPdDRslNp4Jxmzrhdr03fUAIDbiGFjLWowoU=
cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARyZtRXDJ8GE=
cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48=
cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
cloud.google.com/go/iam v0.7.0 h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs=
cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQEYOeg=
cloud.google.com/go/kms v1.6.0 h1:OWRZzrPmOZUzurjI2FBGtgY2mB1WaJkqhw6oIwSj0Yg=
cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is=
cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8=
dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU=
Expand Down Expand Up @@ -175,6 +181,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -418,6 +425,8 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
125 changes: 125 additions & 0 deletions lib/artie/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package artie

import (
"cloud.google.com/go/pubsub"
"context"
"fmt"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/segmentio/kafka-go"
"time"
)

type Kind int

const (
Invalid Kind = iota
Kafka
PubSub
)

type pubsubWrapper struct {
topic string
*pubsub.Message
}

type Message struct {
KafkaMsg *kafka.Message
PubSub *pubsubWrapper
}

func NewMessage(kafkaMsg *kafka.Message, pubsubMsg *pubsub.Message, topic string) Message {
var msg Message
if kafkaMsg != nil {
msg.KafkaMsg = kafkaMsg
}

if pubsubMsg != nil {
msg.PubSub = &pubsubWrapper{
topic: topic,
Message: pubsubMsg,
}
}

return msg
}

func (m *Message) Kind() Kind {
if m.KafkaMsg != nil {
return Kafka
}

if m.PubSub != nil {
return PubSub
}

return Invalid
}

func (m *Message) EmitIngestionLag(ctx context.Context, groupID string) {
metrics.FromContext(ctx).Timing("ingestion.lag", time.Since(m.PublishTime()), map[string]string{
"groupID": groupID,
"topic": m.Topic(),
"partition": m.Partition(),
})
}

func (m *Message) PublishTime() time.Time {
if m.KafkaMsg != nil {
return m.KafkaMsg.Time
}

if m.PubSub != nil {
return m.PubSub.PublishTime
}

return time.Time{}
}

func (m *Message) Topic() string {
if m.KafkaMsg != nil {
return m.KafkaMsg.Topic
}

if m.PubSub != nil {
return m.PubSub.topic
}

return ""
}

func (m *Message) Partition() string {
if m.KafkaMsg != nil {
return fmt.Sprint(m.KafkaMsg.Partition)
}

if m.PubSub != nil {
// Pub/Sub doesn't have partitions.
return "no_partition"
}

return ""
}

func (m *Message) Key() []byte {
if m.KafkaMsg != nil {
return m.KafkaMsg.Key
}

if m.PubSub != nil {
return []byte(m.PubSub.OrderingKey)
}

return nil
}

func (m *Message) Value() []byte {
if m.KafkaMsg != nil {
return m.KafkaMsg.Value
}

if m.PubSub != nil {
return m.PubSub.Data
}

return nil
}
43 changes: 43 additions & 0 deletions lib/artie/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package artie

import (
"cloud.google.com/go/pubsub"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
"testing"
)

const keyString = "Struct{id=12}"

func TestNewMessage_Pubsub(t *testing.T) {
msg := NewMessage(nil, nil, "")
assert.Equal(t, msg.Topic(), "", "empty topic")

pubsubMsg := &pubsub.Message{}
msg = NewMessage(nil, pubsubMsg, "")
assert.Equal(t, "no_partition", msg.Partition())

pubsubMsg.Data = []byte("hello_world")
assert.Equal(t, "hello_world", string(msg.Value()))

pubsubMsg.OrderingKey = keyString
assert.Equal(t, []byte(keyString), msg.Key())

msg = NewMessage(nil, pubsubMsg, "database.schema.table")
assert.Equal(t, "database.schema.table", msg.Topic())
}

func TestNewMessage(t *testing.T) {
kafkaMsg := &kafka.Message{
Topic: "test_topic",
Partition: 5,
Key: []byte(keyString),
Value: []byte("kafka_value"),
}

msg := NewMessage(kafkaMsg, nil, "")
assert.Equal(t, "test_topic", msg.Topic())
assert.Equal(t, "5", msg.Partition())
assert.Equal(t, keyString, string(msg.Key()))
assert.Equal(t, "kafka_value", string(msg.Value()))
}
56 changes: 45 additions & 11 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type Sentry struct {
DSN string `yaml:"dsn"`
}

type Pubsub struct {
ProjectID string `yaml:"projectID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
PathToCredentials string `yaml:"pathToCredentials"`
}

type Kafka struct {
BootstrapServer string `yaml:"bootstrapServer"`
GroupID string `yaml:"groupID"`
Expand All @@ -24,6 +30,10 @@ type Kafka struct {
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
}

func (p *Pubsub) String() string {
return fmt.Sprintf("project_id=%s, pathToCredentials=%s", p.ProjectID, p.PathToCredentials)
}

func (k *Kafka) String() string {
// Don't log credentials.
return fmt.Sprintf("bootstrapServer=%s, groupID=%s, user_set=%v, pass_set=%v",
Expand All @@ -32,6 +42,9 @@ func (k *Kafka) String() string {

type Config struct {
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`

Pubsub *Pubsub
Kafka *Kafka

BigQuery struct {
Expand Down Expand Up @@ -83,6 +96,11 @@ func readFileToConfig(pathToConfig string) (*Config, error) {
return nil, err
}

if config.Queue == "" {
// We default to Kafka for backwards compatibility
config.Queue = constants.Kafka
}

return &config, nil
}

Expand All @@ -98,21 +116,37 @@ func (c *Config) Validate() error {
return fmt.Errorf("output: %s is invalid", c.Output)
}

// TopicConfigs
if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 {
return fmt.Errorf("no kafka topic configs, kafka: %v", c.Kafka)
}
if c.Queue == constants.Kafka {
if c.Kafka == nil || len(c.Kafka.TopicConfigs) == 0 {
return fmt.Errorf("no kafka topic configs, kafka: %v", c.Kafka)
}

for _, topicConfig := range c.Kafka.TopicConfigs {
if valid := topicConfig.Valid(); !valid {
return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String())
}
}

for _, topicConfig := range c.Kafka.TopicConfigs {
if valid := topicConfig.Valid(); !valid {
return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String())
// Username and password are not required (if it's within the same VPC or connecting locally
if array.Empty([]string{c.Kafka.GroupID, c.Kafka.BootstrapServer}) {
return fmt.Errorf("kafka settings is invalid, kafka: %s", c.Kafka.String())
}
}

// Kafka config
// Username and password are not required (if it's within the same VPC or connecting locally
if array.Empty([]string{c.Kafka.GroupID, c.Kafka.BootstrapServer}) {
return fmt.Errorf("kafka settings is invalid, kafka: %s", c.Kafka.String())
if c.Queue == constants.PubSub {
if c.Pubsub == nil || len(c.Pubsub.TopicConfigs) == 0 {
return fmt.Errorf("no pubsub topic configs, pubsub: %v", c.Pubsub)
}

for _, topicConfig := range c.Pubsub.TopicConfigs {
if valid := topicConfig.Valid(); !valid {
return fmt.Errorf("topic config is invalid, tc: %s", topicConfig.String())
}
}

if array.Empty([]string{c.Pubsub.ProjectID, c.Pubsub.PathToCredentials}) {
return fmt.Errorf("pubsub settings is invalid, pubsub: %s", c.Pubsub.String())
}
}

return nil
Expand Down
Loading

0 comments on commit d73c021

Please sign in to comment.