From c061a8fe90d3f97fad653a65a7028c5bcef34f21 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 23 Aug 2024 15:51:51 -0700 Subject: [PATCH 1/2] Warn if value scale is different from schema scale --- lib/debezium/converters/decimal.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index 08a960bc..c4d314e3 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -2,6 +2,7 @@ package converters import ( "fmt" + "log/slog" "github.com/artie-labs/transfer/lib/debezium" "github.com/artie-labs/transfer/lib/typing" @@ -40,6 +41,14 @@ func decimalWithNewExponent(decimal *apd.Decimal, newExponent int32) *apd.Decima func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte { targetExponent := -scale // Negate scale since [Decimal.Exponent] is negative. if decimal.Exponent != targetExponent { + // TODO: We may be able to remove this conversion and just return an error to maintain parity with `org.apache.kafka.connect.data.Decimal` + // https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L69 + slog.Warn("Value scale is different from expected scale", + slog.Any("value", decimal.Text('f')), + slog.Any("actual", -decimal.Exponent), + slog.Any("expected", scale), + ) + decimal = decimalWithNewExponent(decimal, targetExponent) } bytes, _ := debezium.EncodeDecimal(decimal) From 3b2ef401ceeb4e6b74076df60f590337b3634cc9 Mon Sep 17 00:00:00 2001 From: Nathan <148575555+nathan-artie@users.noreply.github.com> Date: Fri, 23 Aug 2024 15:53:29 -0700 Subject: [PATCH 2/2] Change word --- lib/debezium/converters/decimal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/debezium/converters/decimal.go b/lib/debezium/converters/decimal.go index c4d314e3..ce2bd0b8 100644 --- a/lib/debezium/converters/decimal.go +++ b/lib/debezium/converters/decimal.go @@ -43,7 +43,7 @@ func encodeDecimalWithScale(decimal *apd.Decimal, scale int32) []byte { if decimal.Exponent != targetExponent { // TODO: We may be able to remove this conversion and just return an error to maintain parity with `org.apache.kafka.connect.data.Decimal` // https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L69 - slog.Warn("Value scale is different from expected scale", + slog.Warn("Value scale is different from schema scale", slog.Any("value", decimal.Text('f')), slog.Any("actual", -decimal.Exponent), slog.Any("expected", scale),