From 6f4c88931085525fbb265b92a450b951df1e5647 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Sat, 6 Jul 2024 15:04:54 -1000 Subject: [PATCH] Checkpoint. --- lib/postgres/parse/parse.go | 9 ++++++++- lib/postgres/scanner.go | 5 ----- sources/postgres/adapter/adapter.go | 4 +++- sources/postgres/adapter/converters.go | 28 ++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/lib/postgres/parse/parse.go b/lib/postgres/parse/parse.go index b846aba1..1d486cb2 100644 --- a/lib/postgres/parse/parse.go +++ b/lib/postgres/parse/parse.go @@ -45,7 +45,14 @@ func ParseValue(colKind schema.DataType, value any) (any, error) { } return nil, fmt.Errorf("value: %v not of string type for Numeric or VariableNumeric", value) - case schema.Time, schema.TimeWithTimeZone: + case schema.TimeWithTimeZone: + stringValue, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected string got %T with value: %v", value, value) + } + + return stringValue, nil + case schema.Time: stringValue, ok := value.(string) if !ok { return nil, fmt.Errorf("expected string got %T with value: %v", value, value) diff --git a/lib/postgres/scanner.go b/lib/postgres/scanner.go index ee914d5f..4dca6337 100644 --- a/lib/postgres/scanner.go +++ b/lib/postgres/scanner.go @@ -142,11 +142,6 @@ func (s scanAdapter) ParsePrimaryKeyValueForOverrides(columnName string, value s func castColumn(col schema.Column) string { colName := pgx.Identifier{col.Name}.Sanitize() switch col.Type { - case schema.TimeWithTimeZone: - // If we don't convert `time with time zone` to UTC we end up with strings like `10:23:54-02` - // And pgtype.Time doesn't parse the offset properly. - // See https://github.com/jackc/pgx/issues/1940 - return fmt.Sprintf(`%s AT TIME ZONE 'UTC' AS %q`, colName, col.Name) case schema.Array: return fmt.Sprintf(`ARRAY_TO_JSON(%s)::TEXT as %q`, colName, col.Name) default: diff --git a/sources/postgres/adapter/adapter.go b/sources/postgres/adapter/adapter.go index 68a8f10d..7492ae86 100644 --- a/sources/postgres/adapter/adapter.go +++ b/sources/postgres/adapter/adapter.go @@ -103,7 +103,9 @@ func valueConverterForType(dataType schema.DataType, opts *schema.Opts) (convert return converters.BytesPassthrough{}, nil case schema.Text, schema.UserDefinedText: return converters.StringPassthrough{}, nil - case schema.Time, schema.TimeWithTimeZone: + case schema.TimeWithTimeZone: + return TimeWithTimezoneConverter{}, nil + case schema.Time: return PgTimeConverter{}, nil case schema.Date: return converters.DateConverter{}, nil diff --git a/sources/postgres/adapter/converters.go b/sources/postgres/adapter/converters.go index 3a0a9cdf..0f12df7a 100644 --- a/sources/postgres/adapter/converters.go +++ b/sources/postgres/adapter/converters.go @@ -9,6 +9,34 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type TimeWithTimezoneConverter struct{} + +func (TimeWithTimezoneConverter) ToField(name string) debezium.Field { + return debezium.Field{ + FieldName: name, + Type: debezium.String, + DebeziumType: debezium.TimeWithTimezone, + } +} + +func (TimeWithTimezoneConverter) Convert(value any) (any, error) { + stringValue, ok := value.(string) + if !ok { + return nil, fmt.Errorf("expected string got %T with value: %v", value, value) + } + + inputLayout := "15:04:05.000000-07" + timeValue, err := time.Parse(inputLayout, stringValue) + if err != nil { + return nil, fmt.Errorf("failed to parse time value %q: %w", stringValue, err) + } + + // We need to parse this value into `time.Time` + // Then convert it back into a string where the timezone is GMT to match Debezium. + outputLayout := "15:04:05.000000Z" + return timeValue.UTC().Format(outputLayout), nil +} + type PgTimeConverter struct{} func (PgTimeConverter) ToField(name string) debezium.Field {