From 94434ec9338ff58b4fe7c33717f5be741ff6f088 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Wed, 12 Jun 2024 23:15:27 -0700 Subject: [PATCH] Cleanup --- clients/bigquery/cast.go | 50 +++++++++++++++++++------------- clients/bigquery/storagewrite.go | 11 +++---- lib/config/bigquery.go | 2 +- processes/consumer/process.go | 2 -- 4 files changed, 37 insertions(+), 28 deletions(-) diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index 916d354f2..321fc0491 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -59,28 +59,14 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) return extTime.String(dialect.BQStreamingTimeFormat), nil } case typing.Struct.Kind: - // TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164 - if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { - return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil - } - - // Structs from relational and Mongo are different. - // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` - // Relational will return a string representation of the struct such as `{"hello": "world"}` - if colValString, isOk := colVal.(string); isOk { - if colValString == "" { - return nil, nil - } - - return colValString, nil - } - - colValBytes, err := json.Marshal(colVal) + stringValue, err := EncodeStructToJSONString(colVal) if err != nil { - return nil, fmt.Errorf("failed to marshal colVal: %w", err) + return nil, err + } else if stringValue == "" { + return nil, nil + } else { + return stringValue, nil } - - return string(colValBytes), nil case typing.Array.Kind: arrayString, err := array.InterfaceToArrayString(colVal, true) if err != nil { @@ -98,3 +84,27 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string) slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal)) return fmt.Sprint(colVal), nil } + +func EncodeStructToJSONString(value any) (string, error) { + if strings.Contains(fmt.Sprint(value), constants.ToastUnavailableValuePlaceholder) { + return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil + } + + // Structs from relational and Mongo are different. + // MongoDB will return the native objects back such as `map[string]any{"hello": "world"}` + // Relational will return a string representation of the struct such as `{"hello": "world"}` + if colValString, isOk := value.(string); isOk { + if colValString == "" { + return "", nil + } + + return colValString, nil + } + + colValBytes, err := json.Marshal(value) + if err != nil { + return "", fmt.Errorf("failed to marshal colVal: %w", err) + } + + return string(colValBytes), nil +} diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index ce511c664..1f9b13445 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -35,8 +35,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto value := row[column.Name()] - fmt.Printf("%s %v %s %T %v\n", column.Name(), column.KindDetails, field.Kind(), value, value) - if value == nil { continue } @@ -106,10 +104,13 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto return nil, fmt.Errorf("unsupported extended time details: %s", column.KindDetails.ExtendedTimeDetails.Type) } case typing.Struct.Kind: - if stringValue, ok := value.(string); ok { - message.Set(field, protoreflect.ValueOfString(stringValue)) + stringValue, err := EncodeStructToJSONString(value) + if err != nil { + return nil, err + } else if stringValue == "" { + continue } else { - return nil, fmt.Errorf("expected string received %T with value %v", value, value) + message.Set(field, protoreflect.ValueOfString(stringValue)) } case typing.Array.Kind: values, err := array.InterfaceToArrayString(value, true) diff --git a/lib/config/bigquery.go b/lib/config/bigquery.go index 75d96b79a..c1d5c123f 100644 --- a/lib/config/bigquery.go +++ b/lib/config/bigquery.go @@ -10,7 +10,7 @@ type BigQuery struct { ProjectID string `yaml:"projectID"` Location string `yaml:"location"` BatchSize int `yaml:"batchSize"` - UseStorageWriteAPI bool `yaml:"useStorageWriteAPI"` + UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet. } func (b *BigQuery) LoadDefaultValues() { diff --git a/processes/consumer/process.go b/processes/consumer/process.go index fd14dfdd5..8e30a303b 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -31,8 +31,6 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo "what": "success", } - fmt.Printf("Got a message %v\n", tags) - st := time.Now() // We are wrapping this in a defer function so that the values do not get immediately evaluated and miss with our actual process duration. defer func() {