Skip to content

Commit

Permalink
WIP.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Aug 29, 2024
1 parent 19fa197 commit 9dd64bc
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
1 change: 1 addition & 0 deletions config/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type PostgreSQLTable struct {

// Optional settings
BatchSize uint `yaml:"batchSize,omitempty"`
PrimaryKeysOverride []string `yaml:"primaryKeysOverride,omitempty"`
OptionalPrimaryKeyValStart string `yaml:"optionalPrimaryKeyValStart,omitempty"`
OptionalPrimaryKeyValEnd string `yaml:"optionalPrimaryKeyValEnd,omitempty"`
ExcludeColumns []string `yaml:"excludeColumns,omitempty"`
Expand Down
10 changes: 7 additions & 3 deletions lib/postgres/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Table struct {
PrimaryKeys []string
}

func LoadTable(db *sql.DB, _schema string, name string) (*Table, error) {
func LoadTable(db *sql.DB, _schema string, name string, primaryKeysOverride []string) (*Table, error) {
tbl := &Table{
Name: name,
Schema: _schema,
Expand All @@ -29,8 +29,12 @@ func LoadTable(db *sql.DB, _schema string, name string) (*Table, error) {
return nil, fmt.Errorf("failed to describe table %s.%s: %w", tbl.Schema, tbl.Name, err)
}

if tbl.PrimaryKeys, err = schema.FetchPrimaryKeys(db, tbl.Schema, tbl.Name); err != nil {
return nil, fmt.Errorf("failed to retrieve primary keys: %w", err)
if len(primaryKeysOverride) > 0 {
tbl.PrimaryKeys = primaryKeysOverride
} else {
if tbl.PrimaryKeys, err = schema.FetchPrimaryKeys(db, tbl.Schema, tbl.Name); err != nil {
return nil, fmt.Errorf("failed to retrieve primary keys: %w", err)
}
}

return tbl, nil
Expand Down
2 changes: 1 addition & 1 deletion sources/postgres/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type PostgresAdapter struct {

func NewPostgresAdapter(db *sql.DB, tableCfg config.PostgreSQLTable) (PostgresAdapter, error) {
slog.Info("Loading metadata for table")
table, err := postgres.LoadTable(db, tableCfg.Schema, tableCfg.Name)
table, err := postgres.LoadTable(db, tableCfg.Schema, tableCfg.Name, tableCfg.PrimaryKeysOverride)
if err != nil {
return PostgresAdapter{}, fmt.Errorf("failed to load metadata for table %s.%s: %w", tableCfg.Schema, tableCfg.Name, err)
}
Expand Down

0 comments on commit 9dd64bc

Please sign in to comment.