Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 14, 2024
1 parent 4e717c8 commit 2f038bc
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/typing"
Expand All @@ -16,63 +16,60 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

func columnToFieldSchema(column columns.Column) (*bigquery.FieldSchema, error) {
var fieldType bigquery.FieldType
var repeated bool
func columnToTableFieldSchema(column columns.Column) (*storagepb.TableFieldSchema, error) {
var fieldType storagepb.TableFieldSchema_Type
mode := storagepb.TableFieldSchema_NULLABLE

switch column.KindDetails.Kind {
case typing.Boolean.Kind:
fieldType = bigquery.BooleanFieldType
fieldType = storagepb.TableFieldSchema_BOOL
case typing.Integer.Kind:
fieldType = bigquery.IntegerFieldType
fieldType = storagepb.TableFieldSchema_INT64
case typing.Float.Kind:
fieldType = bigquery.FloatFieldType
fieldType = storagepb.TableFieldSchema_DOUBLE
case typing.String.Kind:
fieldType = bigquery.StringFieldType
fieldType = storagepb.TableFieldSchema_STRING
case typing.EDecimal.Kind:
fieldType = bigquery.StringFieldType
fieldType = storagepb.TableFieldSchema_STRING
case typing.ETime.Kind:
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.DateKindType:
fieldType = bigquery.DateFieldType
fieldType = storagepb.TableFieldSchema_DATE
case ext.TimeKindType:
fieldType = bigquery.TimeFieldType
fieldType = storagepb.TableFieldSchema_TIME
case ext.DateTimeKindType:
fieldType = bigquery.TimestampFieldType
fieldType = storagepb.TableFieldSchema_TIMESTAMP
default:
return nil, fmt.Errorf("unsupported extended time details type: %s", column.KindDetails.ExtendedTimeDetails.Type)
}
case typing.Struct.Kind:
fieldType = bigquery.StringFieldType
fieldType = storagepb.TableFieldSchema_STRING
case typing.Array.Kind:
fieldType = bigquery.StringFieldType
repeated = true
fieldType = storagepb.TableFieldSchema_STRING
mode = storagepb.TableFieldSchema_REPEATED
default:
return nil, fmt.Errorf("unsupported column kind: %s", column.KindDetails.Kind)
}

return &bigquery.FieldSchema{
Name: column.Name(),
Type: fieldType,
Repeated: repeated,
return &storagepb.TableFieldSchema{
Name: column.Name(),
Type: fieldType,
Mode: mode,
}, nil
}

func columnsToMessageDescriptor(cols []columns.Column) (*protoreflect.MessageDescriptor, error) {
fields := make([]*bigquery.FieldSchema, len(cols))
fields := make([]*storagepb.TableFieldSchema, len(cols))
for i, col := range cols {
field, err := columnToFieldSchema(col)
field, err := columnToTableFieldSchema(col)
if err != nil {
return nil, err
}
fields[i] = field
}
tableSchema := storagepb.TableSchema{Fields: fields}

storageSchema, err := adapt.BQSchemaToStorageTableSchema(fields)
if err != nil {
return nil, fmt.Errorf("failed to adapt BigQuery schema to storage table schema: %w", err)
}
descriptor, err := adapt.StorageSchemaToProto2Descriptor(storageSchema, "root")
descriptor, err := adapt.StorageSchemaToProto2Descriptor(&tableSchema, "root")
if err != nil {
return nil, fmt.Errorf("failed to build proto descriptor: %w", err)
}
Expand Down

0 comments on commit 2f038bc

Please sign in to comment.