Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 13, 2024
1 parent ee7af84 commit 94434ec
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 28 deletions.
50 changes: 30 additions & 20 deletions clients/bigquery/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,14 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)
return extTime.String(dialect.BQStreamingTimeFormat), nil
}
case typing.Struct.Kind:
// TODO: See if we can improve this eval and find a better location, see: https://github.com/artie-labs/transfer/pull/697#discussion_r1609280164
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}

// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
if colValString, isOk := colVal.(string); isOk {
if colValString == "" {
return nil, nil
}

return colValString, nil
}

colValBytes, err := json.Marshal(colVal)
stringValue, err := EncodeStructToJSONString(colVal)
if err != nil {
return nil, fmt.Errorf("failed to marshal colVal: %w", err)
return nil, err
} else if stringValue == "" {
return nil, nil
} else {
return stringValue, nil
}

return string(colValBytes), nil
case typing.Array.Kind:
arrayString, err := array.InterfaceToArrayString(colVal, true)
if err != nil {
Expand All @@ -98,3 +84,27 @@ func castColVal(colVal any, colKind columns.Column, additionalDateFmts []string)
slog.Error("Unexpected BigQuery Data Type", slog.Any("colKind", colKind.KindDetails.Kind), slog.Any("colVal", colVal))
return fmt.Sprint(colVal), nil
}

func EncodeStructToJSONString(value any) (string, error) {
if strings.Contains(fmt.Sprint(value), constants.ToastUnavailableValuePlaceholder) {
return fmt.Sprintf(`{"key":"%s"}`, constants.ToastUnavailableValuePlaceholder), nil
}

// Structs from relational and Mongo are different.
// MongoDB will return the native objects back such as `map[string]any{"hello": "world"}`
// Relational will return a string representation of the struct such as `{"hello": "world"}`
if colValString, isOk := value.(string); isOk {
if colValString == "" {
return "", nil
}

return colValString, nil
}

colValBytes, err := json.Marshal(value)
if err != nil {
return "", fmt.Errorf("failed to marshal colVal: %w", err)
}

return string(colValBytes), nil
}
11 changes: 6 additions & 5 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto

value := row[column.Name()]

fmt.Printf("%s %v %s %T %v\n", column.Name(), column.KindDetails, field.Kind(), value, value)

if value == nil {
continue
}
Expand Down Expand Up @@ -106,10 +104,13 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
return nil, fmt.Errorf("unsupported extended time details: %s", column.KindDetails.ExtendedTimeDetails.Type)
}
case typing.Struct.Kind:
if stringValue, ok := value.(string); ok {
message.Set(field, protoreflect.ValueOfString(stringValue))
stringValue, err := EncodeStructToJSONString(value)
if err != nil {
return nil, err
} else if stringValue == "" {
continue
} else {
return nil, fmt.Errorf("expected string received %T with value %v", value, value)
message.Set(field, protoreflect.ValueOfString(stringValue))
}
case typing.Array.Kind:
values, err := array.InterfaceToArrayString(value, true)
Expand Down
2 changes: 1 addition & 1 deletion lib/config/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type BigQuery struct {
ProjectID string `yaml:"projectID"`
Location string `yaml:"location"`
BatchSize int `yaml:"batchSize"`
UseStorageWriteAPI bool `yaml:"useStorageWriteAPI"`
UseStorageWriteAPI bool `yaml:"__useStorageWriteAPI"` // Not officially supported yet.
}

func (b *BigQuery) LoadDefaultValues() {
Expand Down
2 changes: 0 additions & 2 deletions processes/consumer/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo
"what": "success",
}

fmt.Printf("Got a message %v\n", tags)

st := time.Now()
// We are wrapping this in a defer function so that the values do not get immediately evaluated and miss with our actual process duration.
defer func() {
Expand Down

0 comments on commit 94434ec

Please sign in to comment.