From 12da05aecaefe60c175bd1bcbc81fa98e7c43839 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 25 Sep 2024 15:59:02 -0700 Subject: [PATCH] Moving types around (#927) --- lib/config/bigquery.go | 24 ----- lib/config/config.go | 105 ++++--------------- lib/config/config_test.go | 26 ----- lib/config/destination_types.go | 53 ++++++++++ lib/config/{snowflake.go => destinations.go} | 36 +++++-- lib/config/mssql.go | 51 --------- lib/config/types.go | 70 +++++++++++++ 7 files changed, 168 insertions(+), 197 deletions(-) delete mode 100644 lib/config/bigquery.go create mode 100644 lib/config/destination_types.go rename lib/config/{snowflake.go => destinations.go} (62%) delete mode 100644 lib/config/mssql.go create mode 100644 lib/config/types.go diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go deleted file mode 100644 index 5226fa95f..000000000 --- a/lib/config/bigquery.go +++ /dev/null @@ -1,24 +0,0 @@ -package config - -import "fmt" - -type BigQuery struct { - // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var - // Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC - PathToCredentials string `yaml:"pathToCredentials"` - DefaultDataset string `yaml:"defaultDataset"` - ProjectID string `yaml:"projectID"` - Location string `yaml:"location"` -} - -// DSN - returns the notation for BigQuery following this format: bigquery://projectID/[location/]datasetID?queryString -// If location is passed in, we'll specify it. Else, it'll default to empty and our library will set it to US. -func (b *BigQuery) DSN() string { - dsn := fmt.Sprintf("bigquery://%s/%s", b.ProjectID, b.DefaultDataset) - - if b.Location != "" { - dsn = fmt.Sprintf("bigquery://%s/%s/%s", b.ProjectID, b.Location, b.DefaultDataset) - } - - return dsn -} diff --git a/lib/config/config.go b/lib/config/config.go index b16b710d0..c21ef64f9 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -24,43 +24,10 @@ const ( FlushIntervalSecondsMax = 6 * 60 * 60 ) -type Sentry struct { - DSN string `yaml:"dsn"` -} - -type Pubsub struct { - ProjectID string `yaml:"projectID"` - TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` - PathToCredentials string `yaml:"pathToCredentials"` -} - -type Kafka struct { - // Comma-separated Kafka servers to port. - // e.g. host1:port1,host2:port2,... - // Following kafka's spec mentioned here: https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers - BootstrapServer string `yaml:"bootstrapServer"` - GroupID string `yaml:"groupID"` - TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` - - // Optional parameters - Username string `yaml:"username,omitempty"` - Password string `yaml:"password,omitempty"` - EnableAWSMSKIAM bool `yaml:"enableAWSMKSIAM,omitempty"` - DisableTLS bool `yaml:"disableTLS,omitempty"` -} - func (k *Kafka) BootstrapServers() []string { return strings.Split(k.BootstrapServer, ",") } -type S3Settings struct { - FolderName string `yaml:"folderName"` - Bucket string `yaml:"bucket"` - AwsAccessKeyID string `yaml:"awsAccessKeyID"` - AwsSecretAccessKey string `yaml:"awsSecretAccessKey"` - OutputFormat constants.S3OutputFormat `yaml:"outputFormat"` -} - func (s *S3Settings) Validate() error { if s == nil { return fmt.Errorf("s3 settings are nil") @@ -77,18 +44,6 @@ func (s *S3Settings) Validate() error { return nil } -type Redshift struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - Database string `yaml:"database"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Bucket string `yaml:"bucket"` - OptionalS3Prefix string `yaml:"optionalS3Prefix"` - // https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html - CredentialsClause string `yaml:"credentialsClause"` -} - func (p *Pubsub) String() string { return fmt.Sprintf("project_id=%s, pathToCredentials=%s", p.ProjectID, p.PathToCredentials) } @@ -110,50 +65,10 @@ func (c Config) TopicConfigs() ([]*kafkalib.TopicConfig, error) { return nil, fmt.Errorf("unsupported queue: %q", c.Queue) } -type Mode string - -const ( - History Mode = "history" - Replication Mode = "replication" -) - func (m Mode) String() string { return string(m) } -type Config struct { - Mode Mode `yaml:"mode"` - Output constants.DestinationKind `yaml:"outputSource"` - Queue constants.QueueKind `yaml:"queue"` - - // Flush rules - FlushIntervalSeconds int `yaml:"flushIntervalSeconds"` - FlushSizeKb int `yaml:"flushSizeKb"` - BufferRows uint `yaml:"bufferRows"` - - // Supported message queues - Pubsub *Pubsub `yaml:"pubsub,omitempty"` - Kafka *Kafka `yaml:"kafka,omitempty"` - - // Supported destinations - MSSQL *MSSQL `yaml:"mssql,omitempty"` - BigQuery *BigQuery `yaml:"bigquery,omitempty"` - Snowflake *Snowflake `yaml:"snowflake,omitempty"` - Redshift *Redshift `yaml:"redshift,omitempty"` - S3 *S3Settings `yaml:"s3,omitempty"` - - Reporting struct { - Sentry *Sentry `yaml:"sentry"` - } - - Telemetry struct { - Metrics struct { - Provider constants.ExporterKind `yaml:"provider"` - Settings map[string]any `yaml:"settings,omitempty"` - } - } -} - func readFileToConfig(pathToConfig string) (*Config, error) { file, err := os.Open(pathToConfig) if err != nil { @@ -219,6 +134,26 @@ func (c Config) ValidateRedshift() error { return nil } +func (c Config) ValidateMSSQL() error { + if c.Output != constants.MSSQL { + return fmt.Errorf("output is not mssql, output: %v", c.Output) + } + + if c.MSSQL == nil { + return fmt.Errorf("mssql config is nil") + } + + if empty := stringutil.Empty(c.MSSQL.Host, c.MSSQL.Username, c.MSSQL.Password, c.MSSQL.Database); empty { + return fmt.Errorf("one of mssql settings is empty (host, username, password, database)") + } + + if c.MSSQL.Port <= 0 { + return fmt.Errorf("invalid mssql port: %d", c.MSSQL.Port) + } + + return nil +} + // Validate will check the output source validity // It will also check if a topic exists + iterate over each topic to make sure it's valid. // The actual output source (like Snowflake) and CDC parser will be loaded and checked by other funcs. diff --git a/lib/config/config_test.go b/lib/config/config_test.go index cd2716003..2e1d545e1 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -38,32 +38,6 @@ func TestBigQuery_DSN(t *testing.T) { assert.Equal(t, "bigquery://project/eu/dataset", b.DSN()) } -func TestKafka_BootstrapServers(t *testing.T) { - type _tc struct { - bootstrapServerString string - expectedBootstrapServers []string - } - - tcs := []_tc{ - { - bootstrapServerString: "localhost:9092", - expectedBootstrapServers: []string{"localhost:9092"}, - }, - { - bootstrapServerString: "a:9092,b:9093,c:9094", - expectedBootstrapServers: []string{"a:9092", "b:9093", "c:9094"}, - }, - } - - for idx, tc := range tcs { - k := Kafka{ - BootstrapServer: tc.bootstrapServerString, - } - - assert.Equal(t, tc.expectedBootstrapServers, k.BootstrapServers(), idx) - } -} - func TestKafka_String(t *testing.T) { k := Kafka{ BootstrapServer: "server", diff --git a/lib/config/destination_types.go b/lib/config/destination_types.go new file mode 100644 index 000000000..47cd94304 --- /dev/null +++ b/lib/config/destination_types.go @@ -0,0 +1,53 @@ +package config + +import "github.com/artie-labs/transfer/lib/config/constants" + +type BigQuery struct { + // PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var + // Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC + PathToCredentials string `yaml:"pathToCredentials"` + DefaultDataset string `yaml:"defaultDataset"` + ProjectID string `yaml:"projectID"` + Location string `yaml:"location"` +} + +type MSSQL struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Database string `yaml:"database"` +} + +type Redshift struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Database string `yaml:"database"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Bucket string `yaml:"bucket"` + OptionalS3Prefix string `yaml:"optionalS3Prefix"` + // https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html + CredentialsClause string `yaml:"credentialsClause"` +} + +type S3Settings struct { + FolderName string `yaml:"folderName"` + Bucket string `yaml:"bucket"` + AwsAccessKeyID string `yaml:"awsAccessKeyID"` + AwsSecretAccessKey string `yaml:"awsSecretAccessKey"` + OutputFormat constants.S3OutputFormat `yaml:"outputFormat"` +} + +type Snowflake struct { + AccountID string `yaml:"account"` + Username string `yaml:"username"` + // If pathToPrivateKey is specified, the password field will be ignored + PathToPrivateKey string `yaml:"pathToPrivateKey,omitempty"` + Password string `yaml:"password,omitempty"` + + Warehouse string `yaml:"warehouse"` + Region string `yaml:"region"` + Host string `yaml:"host"` + Application string `yaml:"application"` +} diff --git a/lib/config/snowflake.go b/lib/config/destinations.go similarity index 62% rename from lib/config/snowflake.go rename to lib/config/destinations.go index 89bdcd339..af04a0038 100644 --- a/lib/config/snowflake.go +++ b/lib/config/destinations.go @@ -2,23 +2,37 @@ package config import ( "fmt" + "net/url" "github.com/artie-labs/transfer/lib/cryptography" "github.com/artie-labs/transfer/lib/typing" "github.com/snowflakedb/gosnowflake" ) -type Snowflake struct { - AccountID string `yaml:"account"` - Username string `yaml:"username"` - // If pathToPrivateKey is specified, the password field will be ignored - PathToPrivateKey string `yaml:"pathToPrivateKey,omitempty"` - Password string `yaml:"password,omitempty"` - - Warehouse string `yaml:"warehouse"` - Region string `yaml:"region"` - Host string `yaml:"host"` - Application string `yaml:"application"` +// DSN - returns the notation for BigQuery following this format: bigquery://projectID/[location/]datasetID?queryString +// If location is passed in, we'll specify it. Else, it'll default to empty and our library will set it to US. +func (b *BigQuery) DSN() string { + dsn := fmt.Sprintf("bigquery://%s/%s", b.ProjectID, b.DefaultDataset) + + if b.Location != "" { + dsn = fmt.Sprintf("bigquery://%s/%s/%s", b.ProjectID, b.Location, b.DefaultDataset) + } + + return dsn +} + +func (m MSSQL) DSN() string { + query := url.Values{} + query.Add("database", m.Database) + + u := &url.URL{ + Scheme: "sqlserver", + User: url.UserPassword(m.Username, m.Password), + Host: fmt.Sprintf("%s:%d", m.Host, m.Port), + RawQuery: query.Encode(), + } + + return u.String() } func (s Snowflake) ToConfig() (*gosnowflake.Config, error) { diff --git a/lib/config/mssql.go b/lib/config/mssql.go deleted file mode 100644 index 15eb4d3f4..000000000 --- a/lib/config/mssql.go +++ /dev/null @@ -1,51 +0,0 @@ -package config - -import ( - "fmt" - "net/url" - - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/stringutil" -) - -type MSSQL struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Database string `yaml:"database"` -} - -func (m MSSQL) DSN() string { - query := url.Values{} - query.Add("database", m.Database) - - u := &url.URL{ - Scheme: "sqlserver", - User: url.UserPassword(m.Username, m.Password), - Host: fmt.Sprintf("%s:%d", m.Host, m.Port), - RawQuery: query.Encode(), - } - - return u.String() -} - -func (c Config) ValidateMSSQL() error { - if c.Output != constants.MSSQL { - return fmt.Errorf("output is not mssql, output: %v", c.Output) - } - - if c.MSSQL == nil { - return fmt.Errorf("mssql config is nil") - } - - if empty := stringutil.Empty(c.MSSQL.Host, c.MSSQL.Username, c.MSSQL.Password, c.MSSQL.Database); empty { - return fmt.Errorf("one of mssql settings is empty (host, username, password, database)") - } - - if c.MSSQL.Port <= 0 { - return fmt.Errorf("invalid mssql port: %d", c.MSSQL.Port) - } - - return nil -} diff --git a/lib/config/types.go b/lib/config/types.go new file mode 100644 index 000000000..f84615364 --- /dev/null +++ b/lib/config/types.go @@ -0,0 +1,70 @@ +package config + +import ( + "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/kafkalib" +) + +type Mode string + +const ( + History Mode = "history" + Replication Mode = "replication" +) + +type Sentry struct { + DSN string `yaml:"dsn"` +} + +type Pubsub struct { + ProjectID string `yaml:"projectID"` + TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` + PathToCredentials string `yaml:"pathToCredentials"` +} +type Kafka struct { + // Comma-separated Kafka servers to port. + // e.g. host1:port1,host2:port2,... + // Following kafka's spec mentioned here: https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers + BootstrapServer string `yaml:"bootstrapServer"` + GroupID string `yaml:"groupID"` + TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"` + + // Optional parameters + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` + EnableAWSMSKIAM bool `yaml:"enableAWSMKSIAM,omitempty"` + DisableTLS bool `yaml:"disableTLS,omitempty"` +} + +type Config struct { + Mode Mode `yaml:"mode"` + Output constants.DestinationKind `yaml:"outputSource"` + Queue constants.QueueKind `yaml:"queue"` + + // Flush rules + FlushIntervalSeconds int `yaml:"flushIntervalSeconds"` + FlushSizeKb int `yaml:"flushSizeKb"` + BufferRows uint `yaml:"bufferRows"` + + // Supported message queues + Pubsub *Pubsub `yaml:"pubsub,omitempty"` + Kafka *Kafka `yaml:"kafka,omitempty"` + + // Supported destinations + MSSQL *MSSQL `yaml:"mssql,omitempty"` + BigQuery *BigQuery `yaml:"bigquery,omitempty"` + Snowflake *Snowflake `yaml:"snowflake,omitempty"` + Redshift *Redshift `yaml:"redshift,omitempty"` + S3 *S3Settings `yaml:"s3,omitempty"` + + Reporting struct { + Sentry *Sentry `yaml:"sentry"` + } + + Telemetry struct { + Metrics struct { + Provider constants.ExporterKind `yaml:"provider"` + Settings map[string]any `yaml:"settings,omitempty"` + } + } +}