Skip to content

Commit

Permalink
Wrap errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Feb 9, 2024
1 parent ba4eefe commit 3373d79
Show file tree
Hide file tree
Showing 25 changed files with 66 additions and 56 deletions.
2 changes: 1 addition & 1 deletion clients/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *Store) PutTable(ctx context.Context, dataset, tableName string, rows []
inserter := client.Dataset(dataset).Table(tableName).Inserter()
for batch.HasNext() {
if err := inserter.Put(ctx, batch.NextChunk()); err != nil {
return fmt.Errorf("failed to insert rows, err: %v", err)
return fmt.Errorf("failed to insert rows, err: %w", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func castColVal(colVal interface{}, colKind columns.Column, additionalDateFmts [
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err)
}

if colKind.KindDetails.ExtendedTimeDetails == nil {
Expand Down
14 changes: 7 additions & 7 deletions clients/bigquery/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *Store) merge(tableData *optimization.TableData) ([]*Row, error) {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
colVal, err := castColVal(value[col], colKind, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("failed to cast col: %v, err: %v", col, err)
return nil, fmt.Errorf("failed to cast col: %v, err: %w", col, err)
}

if colVal != nil {
Expand All @@ -68,7 +68,7 @@ func (s *Store) backfillColumn(column columns.Column, fqTableName string) error

defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{Escape: true, DestKind: s.Label()}, additionalDateFmts)
if err != nil {
return fmt.Errorf("failed to escape default value, err: %v", err)
return fmt.Errorf("failed to escape default value, err: %w", err)
}

escapedCol := column.Name(s.config.SharedDestinationConfig.UppercaseEscapedNames, &sql.NameArgs{Escape: true, DestKind: s.Label()})
Expand All @@ -84,7 +84,7 @@ func (s *Store) backfillColumn(column columns.Column, fqTableName string) error
)
_, err = s.Exec(query)
if err != nil {
return fmt.Errorf("failed to backfill, err: %v, query: %v", err, query)
return fmt.Errorf("failed to backfill, err: %w, query: %v", err, query)
}

query = fmt.Sprintf("ALTER TABLE %s ALTER COLUMN %s SET OPTIONS (description=`%s`);",
Expand Down Expand Up @@ -166,7 +166,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

if err = ddl.AlterTable(tempAlterTableArgs, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table, err: %v", err)
return fmt.Errorf("failed to create temp table, err: %w", err)
}
// End temporary table creation

Expand All @@ -191,7 +191,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
attempts += 1
time.Sleep(jitter.Jitter(1500, jitter.DefaultMaxMs, attempts))
} else {
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %v", col.RawName(), col.RawDefaultValue(), err)
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %w", col.RawName(), col.RawDefaultValue(), err)
}
}

Expand All @@ -207,7 +207,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
tableName := fmt.Sprintf("%s_%s", tableData.RawName(), tableData.TempTableSuffix())
err = s.PutTable(ctx, tableData.TopicConfig.Database, tableName, rows)
if err != nil {
return fmt.Errorf("failed to insert into temp table: %s, err: %v", tableName, err)
return fmt.Errorf("failed to insert into temp table: %s, err: %w", tableName, err)
}

defer func() {
Expand All @@ -222,7 +222,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats
distinctDates, err := tableData.DistinctDates(tableData.TopicConfig.BigQueryPartitionSettings.PartitionField, additionalDateFmts)
if err != nil {
return fmt.Errorf("failed to generate distinct dates, err: %v", err)
return fmt.Errorf("failed to generate distinct dates, err: %w", err)
}

mergeString, err := tableData.TopicConfig.BigQueryPartitionSettings.GenerateMergeString(distinctDates)
Expand Down
10 changes: 5 additions & 5 deletions clients/redshift/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

err = utils.BackfillColumn(s.config, s, col, fqName)
if err != nil {
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %v", col.RawName(), col.RawDefaultValue(), err)
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %w", col.RawName(), col.RawDefaultValue(), err)
}

tableConfig.Columns().UpsertColumn(col.RawName(), columns.UpsertColumnArg{
Expand All @@ -115,23 +115,23 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
// Prepare merge statement
mergeParts, err := mergeArg.GetParts()
if err != nil {
return fmt.Errorf("failed to generate merge statement, err: %v", err)
return fmt.Errorf("failed to generate merge statement, err: %w", err)
}

tx, err := s.Begin()
if err != nil {
return fmt.Errorf("failed to start tx, err: %v", err)
return fmt.Errorf("failed to start tx, err: %w", err)
}

for _, mergeQuery := range mergeParts {
_, err = tx.Exec(mergeQuery)
if err != nil {
return fmt.Errorf("failed to merge, query: %v, err: %v", mergeQuery, err)
return fmt.Errorf("failed to merge, query: %v, err: %w", mergeQuery, err)
}
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to merge, parts: %v, err: %v", mergeParts, err)
return fmt.Errorf("failed to merge, parts: %v, err: %w", mergeParts, err)
}

return err
Expand Down
4 changes: 2 additions & 2 deletions clients/redshift/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableNa
}

if err = writer.Write(row); err != nil {
return "", fmt.Errorf("failed to write to csv, err: %v", err)
return "", fmt.Errorf("failed to write to csv, err: %w", err)
}
}

writer.Flush()
if err = writer.Error(); err != nil {
return "", fmt.Errorf("failed to flush csv writer, err: %v", err)
return "", fmt.Errorf("failed to flush csv writer, err: %w", err)
}

return filePath, nil
Expand Down
18 changes: 9 additions & 9 deletions clients/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

schema, err := parquetutil.GenerateJSONSchema(cols)
if err != nil {
return fmt.Errorf("failed to generate parquet schema, err: %v", err)
return fmt.Errorf("failed to generate parquet schema, err: %w", err)
}

fp := fmt.Sprintf("/tmp/%v_%s.parquet.gz", tableData.LatestCDCTs.UnixMilli(), stringutil.Random(4))
fw, err := local.NewLocalFileWriter(fp)
if err != nil {
return fmt.Errorf("failed to create a local parquet file, err: %v", err)
return fmt.Errorf("failed to create a local parquet file, err: %w", err)
}

pw, err := writer.NewJSONWriter(schema, fw, 4)
if err != nil {
return fmt.Errorf("failed to instantiate parquet writer, err: %v", err)
return fmt.Errorf("failed to instantiate parquet writer, err: %w", err)
}

additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats
Expand All @@ -115,28 +115,28 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er

value, err := parquetutil.ParseValue(val[col], colKind, additionalDateFmts)
if err != nil {
return fmt.Errorf("failed to parse value, err: %v, value: %v, column: %v", err, val[col], col)
return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %v", err, val[col], col)
}

row[col] = value
}

rowBytes, err := json.Marshal(row)
if err != nil {
return fmt.Errorf("failed to marshal row, err: %v", err)
return fmt.Errorf("failed to marshal row, err: %w", err)
}

if err = pw.Write(string(rowBytes)); err != nil {
return fmt.Errorf("failed to write row, err: %v", err)
return fmt.Errorf("failed to write row, err: %w", err)
}
}

if err = pw.WriteStop(); err != nil {
return fmt.Errorf("failed to write stop, err: %v", err)
return fmt.Errorf("failed to write stop, err: %w", err)
}

if err = fw.Close(); err != nil {
return fmt.Errorf("failed to close filewriter, err: %v", err)
return fmt.Errorf("failed to close filewriter, err: %w", err)
}

defer func() {
Expand All @@ -153,7 +153,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
OverrideAWSAccessKeyID: ptr.ToString(s.config.S3.AwsAccessKeyID),
OverrideAWSAccessKeySecret: ptr.ToString(s.config.S3.AwsSecretAccessKey),
}); err != nil {
return fmt.Errorf("failed to upload file to s3, err: %v", err)
return fmt.Errorf("failed to upload file to s3, err: %w", err)
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions clients/snowflake/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableNa
}

if err = writer.Write(row); err != nil {
return "", fmt.Errorf("failed to write to csv, err: %v", err)
return "", fmt.Errorf("failed to write to csv, err: %w", err)
}
}

Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *Store) mergeWithStages(tableData *optimization.TableData) error {

err = utils.BackfillColumn(s.config, s, col, fqName)
if err != nil {
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %v", col.RawName(), col.RawDefaultValue(), err)
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %w", col.RawName(), col.RawDefaultValue(), err)
}

tableConfig.Columns().UpsertColumn(col.RawName(), columns.UpsertColumnArg{
Expand All @@ -225,7 +225,7 @@ func (s *Store) mergeWithStages(tableData *optimization.TableData) error {

mergeQuery, err := mergeArg.GetStatement()
if err != nil {
return fmt.Errorf("failed to generate merge statement, err: %v", err)
return fmt.Errorf("failed to generate merge statement, err: %w", err)
}

slog.Debug("Executing...", slog.String("query", mergeQuery))
Expand Down
6 changes: 3 additions & 3 deletions clients/utils/table_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ func GetTableConfig(args GetTableCfgArgs) (*types.DwhTableConfig, error) {
tableMissing = true
err = nil
} else {
return nil, fmt.Errorf("failed to query %v, err: %v, query: %v", args.Dwh.Label(), err, args.Query)
return nil, fmt.Errorf("failed to query %v, err: %w, query: %v", args.Dwh.Label(), err, args.Query)
}
default:
return nil, fmt.Errorf("failed to query %v, err: %v", args.Dwh.Label(), err)
return nil, fmt.Errorf("failed to query %v, err: %w", args.Dwh.Label(), err)
}
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func GetTableConfig(args GetTableCfgArgs) (*types.DwhTableConfig, error) {
var _colComment constants.ColComment
err = json.Unmarshal([]byte(comment), &_colComment)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal comment, err: %v", err)
return nil, fmt.Errorf("failed to unmarshal comment, err: %w", err)
}

col.SetBackfilled(_colComment.Backfilled)
Expand Down
4 changes: 2 additions & 2 deletions clients/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col
additionalDateFmts := cfg.SharedTransferConfig.TypingSettings.AdditionalDateFormats
defaultVal, err := column.DefaultValue(&columns.DefaultValueArgs{Escape: true, DestKind: dwh.Label()}, additionalDateFmts)
if err != nil {
return fmt.Errorf("failed to escape default value, err: %v", err)
return fmt.Errorf("failed to escape default value, err: %w", err)
}

uppercaseEscNames := cfg.SharedDestinationConfig.UppercaseEscapedNames
Expand All @@ -53,7 +53,7 @@ func BackfillColumn(cfg config.Config, dwh destination.DataWarehouse, column col

_, err = dwh.Exec(query)
if err != nil {
return fmt.Errorf("failed to backfill, err: %v, query: %v", err, query)
return fmt.Errorf("failed to backfill, err: %w, query: %v", err, query)
}

query = fmt.Sprintf(`COMMENT ON COLUMN %s.%s IS '%v';`, fqTableName, escapedCol, `{"backfilled": true}`)
Expand Down
6 changes: 3 additions & 3 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt

err := json.Unmarshal(bytes, &schemaEventPayload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json, err: %v", err)
return nil, fmt.Errorf("failed to unmarshal json, err: %w", err)
}

// Now marshal before & after string.
Expand All @@ -46,7 +46,7 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt
if schemaEventPayload.Payload.After != nil {
after, err := mongo.JSONEToMap([]byte(*schemaEventPayload.Payload.After))
if err != nil {
return nil, fmt.Errorf("mongo JSONEToMap err: %v", err)
return nil, fmt.Errorf("mongo JSONEToMap err: %w", err)
}

// Now, we need to iterate over each key and if the value is JSON
Expand All @@ -55,7 +55,7 @@ func (d *Debezium) GetEventFromBytes(typingSettings typing.Settings, bytes []byt
if typing.ParseValue(typingSettings, key, nil, value) == typing.Struct {
valBytes, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("failed to marshal, err: %v", err)
return nil, fmt.Errorf("failed to marshal, err: %w", err)
}

after[key] = string(valBytes)
Expand Down
12 changes: 11 additions & 1 deletion lib/db/errors.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package db

import "strings"
import (
"errors"
"log/slog"
"strings"
"syscall"
)

func retryableError(err error) bool {
if err != nil {
if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.ECONNREFUSED) {
return true
}
if strings.Contains(err.Error(), "read: connection reset by peer") {
slog.Warn("matched 'read: connection reset by peer' by string")
return true
} else if strings.Contains(err.Error(), "connect: connection refused") {
slog.Warn("matched 'connect: connection refused' by string")
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/debezium/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func parsePartitionKeyStruct(keyBytes []byte) (map[string]interface{}, error) {
var pkStruct map[string]interface{}
err := json.Unmarshal(keyBytes, &pkStruct)
if err != nil {
return nil, fmt.Errorf("failed to json unmarshal, err: %v", err)
return nil, fmt.Errorf("failed to json unmarshal, err: %w", err)
}

if len(pkStruct) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions lib/debezium/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ func FromDebeziumTypeToTime(supportedType SupportedDebeziumType, val int64) (*ex
func (f Field) DecodeDecimal(encoded string) (*decimal.Decimal, error) {
results, err := f.GetScaleAndPrecision()
if err != nil {
return nil, fmt.Errorf("failed to get scale and/or precision, err: %v", err)
return nil, fmt.Errorf("failed to get scale and/or precision, err: %w", err)
}

data, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return nil, fmt.Errorf("failed to bae64 decode, err: %v", err)
return nil, fmt.Errorf("failed to bae64 decode, err: %w", err)
}

bigInt := new(big.Int)
Expand Down
2 changes: 1 addition & 1 deletion lib/destination/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func AlterTable(args AlterTableArgs, cols ...columns.Column) error {
if ColumnAlreadyExistErr(err, args.Dwh.Label()) {
err = nil
} else if err != nil {
return fmt.Errorf("failed to apply ddl, sql: %v, err: %v", sqlQuery, err)
return fmt.Errorf("failed to apply ddl, sql: %v, err: %w", sqlQuery, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/kafkalib/partition/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type BigQuerySettings struct {
// GenerateMergeString this is used as an equality string for the MERGE statement.
func (b *BigQuerySettings) GenerateMergeString(values []string) (string, error) {
if err := b.Valid(); err != nil {
return "", fmt.Errorf("failed to validate bigQuerySettings, err: %v", err)
return "", fmt.Errorf("failed to validate bigQuerySettings, err: %w", err)
}

if len(values) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion lib/maputil/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func GetIntegerFromMap(obj map[string]interface{}, key string) (int, error) {

val, err := strconv.Atoi(fmt.Sprint(valInterface))
if err != nil {
return 0, fmt.Errorf("key: %s is not type integer, err: %v", key, err)
return 0, fmt.Errorf("key: %s is not type integer, err: %w", key, err)
}

return val, nil
Expand Down
2 changes: 1 addition & 1 deletion lib/optimization/table_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (t *TableData) DistinctDates(colName string, additionalDateFmts []string) (

extTime, err := ext.ParseFromInterface(val, additionalDateFmts)
if err != nil {
return nil, fmt.Errorf("col: %v is not a time column, value: %v, err: %v", colName, val, err)
return nil, fmt.Errorf("col: %v is not a time column, value: %v, err: %w", colName, val, err)
}

retMap[extTime.String(ext.PostgresDateFormat)] = true
Expand Down
2 changes: 1 addition & 1 deletion lib/parquetutil/parse_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ParseValue(colVal interface{}, colKind columns.Column, additionalDateFmts [
case typing.ETime.Kind:
extTime, err := ext.ParseFromInterface(colVal, additionalDateFmts)
if err != nil {
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err)
return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %w", colVal, err)
}

if colKind.KindDetails.ExtendedTimeDetails == nil {
Expand Down
2 changes: 1 addition & 1 deletion lib/s3lib/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func UploadLocalFileToS3(ctx context.Context, args UploadArgs) (string, error) {
}

if err != nil {
return "", fmt.Errorf("failed loading config, err: %v", err)
return "", fmt.Errorf("failed loading config, err: %w", err)
}

s3Client := s3.NewFromConfig(cfg)
Expand Down
Loading

0 comments on commit 3373d79

Please sign in to comment.