Skip to content

Commit

Permalink
Moving Types.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 25, 2024
1 parent 756a7fe commit a612e9f
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 115 deletions.
9 changes: 0 additions & 9 deletions lib/config/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@ package config

import "fmt"

type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
// Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
}

// DSN - returns the notation for BigQuery following this format: bigquery://projectID/[location/]datasetID?queryString
// If location is passed in, we'll specify it. Else, it'll default to empty and our library will set it to US.
func (b *BigQuery) DSN() string {
Expand Down
85 changes: 0 additions & 85 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,10 @@ const (
FlushIntervalSecondsMax = 6 * 60 * 60
)

type Sentry struct {
DSN string `yaml:"dsn"`
}

type Pubsub struct {
ProjectID string `yaml:"projectID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
PathToCredentials string `yaml:"pathToCredentials"`
}

type Kafka struct {
// Comma-separated Kafka servers to port.
// e.g. host1:port1,host2:port2,...
// Following kafka's spec mentioned here: https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers
BootstrapServer string `yaml:"bootstrapServer"`
GroupID string `yaml:"groupID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`

// Optional parameters
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
EnableAWSMSKIAM bool `yaml:"enableAWSMKSIAM,omitempty"`
DisableTLS bool `yaml:"disableTLS,omitempty"`
}

func (k *Kafka) BootstrapServers() []string {
return strings.Split(k.BootstrapServer, ",")
}

type S3Settings struct {
FolderName string `yaml:"folderName"`
Bucket string `yaml:"bucket"`
AwsAccessKeyID string `yaml:"awsAccessKeyID"`
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
OutputFormat constants.S3OutputFormat `yaml:"outputFormat"`
}

func (s *S3Settings) Validate() error {
if s == nil {
return fmt.Errorf("s3 settings are nil")
Expand All @@ -77,18 +44,6 @@ func (s *S3Settings) Validate() error {
return nil
}

type Redshift struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Bucket string `yaml:"bucket"`
OptionalS3Prefix string `yaml:"optionalS3Prefix"`
// https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html
CredentialsClause string `yaml:"credentialsClause"`
}

func (p *Pubsub) String() string {
return fmt.Sprintf("project_id=%s, pathToCredentials=%s", p.ProjectID, p.PathToCredentials)
}
Expand All @@ -110,50 +65,10 @@ func (c Config) TopicConfigs() ([]*kafkalib.TopicConfig, error) {
return nil, fmt.Errorf("unsupported queue: %q", c.Queue)
}

type Mode string

const (
History Mode = "history"
Replication Mode = "replication"
)

func (m Mode) String() string {
return string(m)
}

type Config struct {
Mode Mode `yaml:"mode"`
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`

// Flush rules
FlushIntervalSeconds int `yaml:"flushIntervalSeconds"`
FlushSizeKb int `yaml:"flushSizeKb"`
BufferRows uint `yaml:"bufferRows"`

// Supported message queues
Pubsub *Pubsub `yaml:"pubsub,omitempty"`
Kafka *Kafka `yaml:"kafka,omitempty"`

// Supported destinations
MSSQL *MSSQL `yaml:"mssql,omitempty"`
BigQuery *BigQuery `yaml:"bigquery,omitempty"`
Snowflake *Snowflake `yaml:"snowflake,omitempty"`
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`

Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}

Telemetry struct {
Metrics struct {
Provider constants.ExporterKind `yaml:"provider"`
Settings map[string]any `yaml:"settings,omitempty"`
}
}
}

func readFileToConfig(pathToConfig string) (*Config, error) {
file, err := os.Open(pathToConfig)
if err != nil {
Expand Down
53 changes: 53 additions & 0 deletions lib/config/destination_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package config

import "github.com/artie-labs/transfer/lib/config/constants"

type S3Settings struct {
FolderName string `yaml:"folderName"`
Bucket string `yaml:"bucket"`
AwsAccessKeyID string `yaml:"awsAccessKeyID"`
AwsSecretAccessKey string `yaml:"awsSecretAccessKey"`
OutputFormat constants.S3OutputFormat `yaml:"outputFormat"`
}

type MSSQL struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
}

type BigQuery struct {
// PathToCredentials is _optional_ if you have GOOGLE_APPLICATION_CREDENTIALS set as an env var
// Links to credentials: https://cloud.google.com/docs/authentication/application-default-credentials#GAC
PathToCredentials string `yaml:"pathToCredentials"`
DefaultDataset string `yaml:"defaultDataset"`
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
}

type Redshift struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Database string `yaml:"database"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Bucket string `yaml:"bucket"`
OptionalS3Prefix string `yaml:"optionalS3Prefix"`
// https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html
CredentialsClause string `yaml:"credentialsClause"`
}

type Snowflake struct {
AccountID string `yaml:"account"`
Username string `yaml:"username"`
// If pathToPrivateKey is specified, the password field will be ignored
PathToPrivateKey string `yaml:"pathToPrivateKey,omitempty"`
Password string `yaml:"password,omitempty"`

Warehouse string `yaml:"warehouse"`
Region string `yaml:"region"`
Host string `yaml:"host"`
Application string `yaml:"application"`
}
8 changes: 0 additions & 8 deletions lib/config/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ import (
"github.com/artie-labs/transfer/lib/stringutil"
)

type MSSQL struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`
}

func (m MSSQL) DSN() string {
query := url.Values{}
query.Add("database", m.Database)
Expand Down
13 changes: 0 additions & 13 deletions lib/config/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,6 @@ import (
"github.com/snowflakedb/gosnowflake"
)

type Snowflake struct {
AccountID string `yaml:"account"`
Username string `yaml:"username"`
// If pathToPrivateKey is specified, the password field will be ignored
PathToPrivateKey string `yaml:"pathToPrivateKey,omitempty"`
Password string `yaml:"password,omitempty"`

Warehouse string `yaml:"warehouse"`
Region string `yaml:"region"`
Host string `yaml:"host"`
Application string `yaml:"application"`
}

func (s Snowflake) ToConfig() (*gosnowflake.Config, error) {
cfg := &gosnowflake.Config{
Account: s.AccountID,
Expand Down
70 changes: 70 additions & 0 deletions lib/config/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package config

import (
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/kafkalib"
)

type Mode string

const (
History Mode = "history"
Replication Mode = "replication"
)

type Sentry struct {
DSN string `yaml:"dsn"`
}

type Pubsub struct {
ProjectID string `yaml:"projectID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`
PathToCredentials string `yaml:"pathToCredentials"`
}
type Kafka struct {
// Comma-separated Kafka servers to port.
// e.g. host1:port1,host2:port2,...
// Following kafka's spec mentioned here: https://kafka.apache.org/documentation/#producerconfigs_bootstrap.servers
BootstrapServer string `yaml:"bootstrapServer"`
GroupID string `yaml:"groupID"`
TopicConfigs []*kafkalib.TopicConfig `yaml:"topicConfigs"`

// Optional parameters
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
EnableAWSMSKIAM bool `yaml:"enableAWSMKSIAM,omitempty"`
DisableTLS bool `yaml:"disableTLS,omitempty"`
}

type Config struct {
Mode Mode `yaml:"mode"`
Output constants.DestinationKind `yaml:"outputSource"`
Queue constants.QueueKind `yaml:"queue"`

// Flush rules
FlushIntervalSeconds int `yaml:"flushIntervalSeconds"`
FlushSizeKb int `yaml:"flushSizeKb"`
BufferRows uint `yaml:"bufferRows"`

// Supported message queues
Pubsub *Pubsub `yaml:"pubsub,omitempty"`
Kafka *Kafka `yaml:"kafka,omitempty"`

// Supported destinations
MSSQL *MSSQL `yaml:"mssql,omitempty"`
BigQuery *BigQuery `yaml:"bigquery,omitempty"`
Snowflake *Snowflake `yaml:"snowflake,omitempty"`
Redshift *Redshift `yaml:"redshift,omitempty"`
S3 *S3Settings `yaml:"s3,omitempty"`

Reporting struct {
Sentry *Sentry `yaml:"sentry"`
}

Telemetry struct {
Metrics struct {
Provider constants.ExporterKind `yaml:"provider"`
Settings map[string]any `yaml:"settings,omitempty"`
}
}
}

0 comments on commit a612e9f

Please sign in to comment.