Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie committed Jun 12, 2024
1 parent 4f3d7e9 commit ee7af84
Showing 1 changed file with 6 additions and 20 deletions.
26 changes: 6 additions & 20 deletions clients/bigquery/storagewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
fmt.Printf("%s %v %s %T %v\n", column.Name(), column.KindDetails, field.Kind(), value, value)

if value == nil {
// message.Set(field, protoreflect.Value{})
continue
}

Expand Down Expand Up @@ -94,7 +93,7 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
switch column.KindDetails.ExtendedTimeDetails.Type {
case ext.DateKindType:
daysSinceEpoch := extTime.Unix() / (60 * 60 * 24)
message.Set(field, protoreflect.ValueOfInt64(daysSinceEpoch))
message.Set(field, protoreflect.ValueOfInt32(int32(daysSinceEpoch)))
case ext.TimeKindType:
message.Set(field, protoreflect.ValueOfInt64(encodePacked64TimeMicros(extTime.Time)))
case ext.DateTimeKindType:
Expand All @@ -104,38 +103,25 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto
}
message.Set(field, protoreflect.ValueOfInt64(extTime.UnixMicro()))
default:
return nil, fmt.Errorf("error")
return nil, fmt.Errorf("unsupported extended time details: %s", column.KindDetails.ExtendedTimeDetails.Type)
}
case typing.Struct.Kind:
if stringValue, ok := value.(string); ok {
message.Set(field, protoreflect.ValueOfString(stringValue))
} else {
return nil, fmt.Errorf("expected string received %T with value %v", value, value)
}

// bytes, err := json.Marshal(value)
// if err != nil {
// return nil, err
// }

// msg := message.Mutable(field).Message().(protoreflect.ProtoMessage)
// err = protojson.Unmarshal(bytes, msg)
// if err != nil {
// return nil, fmt.Errorf("fail: %w", err)
// }

// message.Set(field, protoreflect.ValueOfString(string(bytes)))
case typing.Array.Kind:
value, err := array.InterfaceToArrayString(value, true)
values, err := array.InterfaceToArrayString(value, true)
if err != nil {
return nil, err
}
list := message.Mutable(field).List()
for _, j := range value {
list.Append(protoreflect.ValueOf(j))
for _, value := range values {
list.Append(protoreflect.ValueOf(value))
}
default:
return nil, fmt.Errorf("error")
return nil, fmt.Errorf("unsupported column kind: %s", column.KindDetails.Kind)
}

}
Expand Down

0 comments on commit ee7af84

Please sign in to comment.