Skip to content

Commit

Permalink
Checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Jul 30, 2024
1 parent c01d76d commit 81cc2be
Show file tree
Hide file tree
Showing 18 changed files with 536 additions and 494 deletions.
2 changes: 1 addition & 1 deletion lib/cdc/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Event interface {
DeletePayload() bool
GetTableName() string
GetData(pkMap map[string]any, config *kafkalib.TopicConfig) (map[string]any, error)
GetOptionalSchema() map[string]typing.KindDetails
GetOptionalSchema() (map[string]typing.KindDetails, error)
// GetColumns will inspect the envelope's payload right now and return.
GetColumns() (*columns.Columns, error)
}
4 changes: 2 additions & 2 deletions lib/cdc/mongo/debezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func (s *SchemaEventPayload) GetTableName() string {
return s.Payload.Source.Collection
}

func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails {
func (s *SchemaEventPayload) GetOptionalSchema() (map[string]typing.KindDetails, error) {
// MongoDB does not have a schema at the database level.
return nil
return nil, nil
}

func (s *SchemaEventPayload) GetColumns() (*columns.Columns, error) {
Expand Down
9 changes: 7 additions & 2 deletions lib/cdc/relational/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ func (r *RelationTestSuite) TestPostgresEventWithSchemaAndTimestampNoTZ() {
// Testing typing.
assert.Equal(r.T(), evtData["id"], int64(1001))
assert.Equal(r.T(), evtData["another_id"], int64(333))
assert.Equal(r.T(), typing.ParseValue(typing.Settings{}, "another_id", evt.GetOptionalSchema(), evtData["another_id"]), typing.Integer)

schema, err := evt.GetOptionalSchema()
assert.NoError(r.T(), err)

assert.Equal(r.T(), typing.ParseValue(typing.Settings{}, "another_id", schema, evtData["another_id"]), typing.Integer)

assert.Equal(r.T(), evtData["email"], "[email protected]")

Expand Down Expand Up @@ -518,7 +522,8 @@ func (r *RelationTestSuite) TestGetEventFromBytes_MySQL() {
assert.Equal(r.T(), time.Date(2023, time.March, 13, 19, 19, 24, 0, time.UTC), evt.GetExecutionTime())
assert.Equal(r.T(), "customers", evt.GetTableName())

schema := evt.GetOptionalSchema()
schema, err := evt.GetOptionalSchema()
assert.NoError(r.T(), err)
assert.Equal(r.T(), typing.Struct, schema["custom_fields"])

kvMap := map[string]any{
Expand Down
13 changes: 9 additions & 4 deletions lib/cdc/util/optional_schema.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
package util

import (
"fmt"
"log/slog"

"github.com/artie-labs/transfer/lib/debezium"
"github.com/artie-labs/transfer/lib/typing"
)

func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails {
func (s *SchemaEventPayload) GetOptionalSchema() (map[string]typing.KindDetails, error) {
fieldsObject := s.Schema.GetSchemaFromLabel(debezium.After)
if fieldsObject == nil {
// AFTER schema does not exist.
return nil
return nil, nil
}

schema := make(map[string]typing.KindDetails)
for _, field := range fieldsObject.Fields {
kd := field.ToKindDetails()
kd, err := field.ToKindDetails()
if err != nil {
return nil, fmt.Errorf("failed to convert field (%v), to kind details: %w", field, err)
}

if kd == typing.Invalid {
slog.Warn("Skipping field from optional schema b/c we cannot determine the data type", slog.String("field", field.FieldName))
continue
Expand All @@ -25,5 +30,5 @@ func (s *SchemaEventPayload) GetOptionalSchema() map[string]typing.KindDetails {
schema[field.FieldName] = kd
}

return schema
return schema, nil
}
4 changes: 3 additions & 1 deletion lib/cdc/util/optional_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func TestGetOptionalSchema(t *testing.T) {
err := json.Unmarshal([]byte(tc.body), &schemaEventPayload)
assert.NoError(t, err, idx)

actualData := schemaEventPayload.GetOptionalSchema()
actualData, err := schemaEventPayload.GetOptionalSchema()
assert.NoError(t, err)

for actualKey, actualVal := range actualData {
testMsg := fmt.Sprintf("key: %s, actualKind: %s, index: %d", actualKey, actualVal.Kind, idx)

Expand Down
5 changes: 4 additions & 1 deletion lib/cdc/util/relational_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func TestSource_GetOptionalSchema(t *testing.T) {
}`), &schemaEventPayload)

assert.NoError(t, err)
optionalSchema := schemaEventPayload.GetOptionalSchema()

optionalSchema, err := schemaEventPayload.GetOptionalSchema()
assert.NoError(t, err)

value, isOk := optionalSchema["last_modified"]
assert.True(t, isOk)
assert.Equal(t, value, typing.String)
Expand Down
86 changes: 86 additions & 0 deletions lib/debezium/converters/decimal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package converters

import (
"encoding/base64"
"fmt"

"github.com/artie-labs/transfer/lib/debezium/encode"

"github.com/artie-labs/transfer/lib/maputil"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/decimal"
)

// toBytes attempts to convert a value (type []byte, or string) to a slice of bytes.
// - If value is already a slice of bytes it will be directly returned.
// - If value is a string we will attempt to base64 decode it.
func toBytes(value any) ([]byte, error) {
var stringVal string

switch typedValue := value.(type) {
case []byte:
return typedValue, nil
case string:
stringVal = typedValue
default:
return nil, fmt.Errorf("failed to cast value '%v' with type '%T' to []byte", value, value)
}

data, err := base64.StdEncoding.DecodeString(stringVal)
if err != nil {
return nil, fmt.Errorf("failed to base64 decode: %w", err)
}
return data, nil
}

type Decimal struct {
precision int32
scale int32

variableNumeric bool
}

func NewDecimal(precision int32, scale int32, variableNumeric bool) *Decimal {
return &Decimal{
precision: precision,
scale: scale,
variableNumeric: variableNumeric,
}
}

func (d Decimal) ToKindDetails() typing.KindDetails {
return typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(d.precision, d.scale))
}

func (d Decimal) Convert(val any) (any, error) {
if d.variableNumeric {
valueStruct, isOk := val.(map[string]any)
if !isOk {
return nil, fmt.Errorf("value is not map[string]any type")
}

scale, err := maputil.GetInt32FromMap(valueStruct, "scale")
if err != nil {
return nil, err
}

val, isOk := valueStruct["value"]
if !isOk {
return nil, fmt.Errorf("encoded value does not exist")
}

bytes, err := toBytes(val)
if err != nil {
return nil, err
}

return decimal.NewDecimal(encode.DecodeDecimal(bytes, scale)), nil
} else {
bytes, err := toBytes(val)
if err != nil {
return nil, err
}

return decimal.NewDecimalWithPrecision(encode.DecodeDecimal(bytes, d.scale), d.precision), nil
}
}
44 changes: 44 additions & 0 deletions lib/debezium/converters/decimal_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package converters

import (
"testing"

"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/stretchr/testify/assert"
)

func BenchmarkDecodeDecimal_P64_S10(b *testing.B) {
converter := NewDecimal(64, 10, false)
for i := 0; i < b.N; i++ {
val, err := converter.Convert([]byte("AwBGAw8m9GLXrCGifrnVP/8jPHrNEtd1r4rS"))
assert.NoError(b, err)

dec, isOk := val.(decimal.Decimal)
assert.True(b, isOk)
assert.Equal(b, "123456789012345678901234567890123456789012345678901234.1234567890", dec.String())
}
}

func BenchmarkDecodeDecimal_P38_S2(b *testing.B) {
converter := NewDecimal(38, 2, false)
for i := 0; i < b.N; i++ {
val, err := converter.Convert("AMCXznvJBxWzS58P/////w==")
assert.NoError(b, err)

dec, isOk := val.(decimal.Decimal)
assert.True(b, isOk)
assert.Equal(b, "9999999999999999999999999999999999.99", dec.String())
}
}

func BenchmarkDecodeDecimal_P5_S2(b *testing.B) {
converter := NewDecimal(5, 2, false)
for i := 0; i < b.N; i++ {
val, err := converter.Convert("AOHJ")
assert.NoError(b, err)

dec, isOk := val.(decimal.Decimal)
assert.True(b, isOk)
assert.Equal(b, "578.01", dec.String())
}
}
Loading

0 comments on commit 81cc2be

Please sign in to comment.