From 46eab86666a5bd650d253bfc8c3d2455e5f45745 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Tue, 17 Sep 2024 10:54:18 -0700 Subject: [PATCH] Simplifying ptrs (#910) --- clients/bigquery/dialect/dialect_test.go | 3 +- clients/mssql/dialect/dialect.go | 3 +- clients/mssql/dialect/dialect_test.go | 3 +- clients/mssql/values_test.go | 4 +-- clients/redshift/cast_test.go | 4 +-- clients/redshift/dialect/dialect.go | 3 +- clients/redshift/dialect/dialect_test.go | 5 ++-- clients/s3/s3.go | 24 +++++++--------- clients/shared/merge.go | 4 +-- clients/snowflake/dialect/dialect.go | 3 +- clients/snowflake/dialect/dialect_test.go | 3 +- clients/snowflake/staging_test.go | 5 ++-- lib/config/snowflake.go | 7 ++--- lib/debezium/schema.go | 3 +- lib/debezium/schema_test.go | 3 +- lib/optimization/event_update_test.go | 3 +- lib/parquetutil/generate_schema.go | 14 ++++------ lib/ptr/ptr.go | 27 ------------------ lib/typing/columns/columns_test.go | 7 ++--- lib/typing/parquet.go | 34 +++++++++++------------ lib/typing/ptr.go | 5 ++++ models/event/event.go | 5 ++-- processes/pool/writes.go | 7 ++--- 23 files changed, 64 insertions(+), 115 deletions(-) delete mode 100644 lib/ptr/ptr.go create mode 100644 lib/typing/ptr.go diff --git a/clients/bigquery/dialect/dialect_test.go b/clients/bigquery/dialect/dialect_test.go index 9f15356a4..afecfb961 100644 --- a/clients/bigquery/dialect/dialect_test.go +++ b/clients/bigquery/dialect/dialect_test.go @@ -8,7 +8,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" @@ -28,7 +27,7 @@ func TestBigQueryDialect_DataTypeForKind(t *testing.T) { assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.String, false)) } { - assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: ptr.ToInt32(12345)}, true)) + assert.Equal(t, "string", BigQueryDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, true)) } } } diff --git a/clients/mssql/dialect/dialect.go b/clients/mssql/dialect/dialect.go index bbd9539d0..25a916250 100644 --- a/clients/mssql/dialect/dialect.go +++ b/clients/mssql/dialect/dialect.go @@ -8,7 +8,6 @@ import ( mssql "github.com/microsoft/go-mssqldb" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -92,7 +91,7 @@ func (MSSQLDialect) KindForDataType(rawType string, stringPrecision string) (typ var strPrecision *int32 precision, err := strconv.ParseInt(stringPrecision, 10, 32) if err == nil { - strPrecision = ptr.ToInt32(int32(precision)) + strPrecision = typing.ToPtr(int32(precision)) } // precision of -1 means it's MAX. diff --git a/clients/mssql/dialect/dialect_test.go b/clients/mssql/dialect/dialect_test.go index 86c8680c0..cfd813c6c 100644 --- a/clients/mssql/dialect/dialect_test.go +++ b/clients/mssql/dialect/dialect_test.go @@ -6,7 +6,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" @@ -34,7 +33,7 @@ func TestMSSQLDialect_DataTypeForKind(t *testing.T) { { kd: typing.KindDetails{ Kind: typing.String.Kind, - OptionalStringPrecision: ptr.ToInt32(12345), + OptionalStringPrecision: typing.ToPtr(int32(12345)), }, expected: "VARCHAR(12345)", expectedIsPk: "VARCHAR(900)", diff --git a/clients/mssql/values_test.go b/clients/mssql/values_test.go index e96824b41..6ed8e7af9 100644 --- a/clients/mssql/values_test.go +++ b/clients/mssql/values_test.go @@ -5,8 +5,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -31,7 +29,7 @@ func TestParseValue(t *testing.T) { // If the string precision exceeds the value, we'll need to insert an exceeded value. stringCol := columns.NewColumn("foo", typing.String) - stringCol.KindDetails.OptionalStringPrecision = ptr.ToInt32(25) + stringCol.KindDetails.OptionalStringPrecision = typing.ToPtr(int32(25)) val, err = parseValue(`abcdefabcdefabcdefabcdef113321`, stringCol) assert.NoError(t, err) diff --git a/clients/redshift/cast_test.go b/clients/redshift/cast_test.go index d308ae58b..e54fd283c 100644 --- a/clients/redshift/cast_test.go +++ b/clients/redshift/cast_test.go @@ -3,8 +3,6 @@ package redshift import ( "fmt" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/config/constants" @@ -22,7 +20,7 @@ func (r *RedshiftTestSuite) TestReplaceExceededValues() { // Masked, reached the string precision limit stringKd := typing.KindDetails{ Kind: typing.String.Kind, - OptionalStringPrecision: ptr.ToInt32(3), + OptionalStringPrecision: typing.ToPtr(int32(3)), } assert.Equal(r.T(), constants.ExceededValueMarker, replaceExceededValues("hello", stringKd)) diff --git a/clients/redshift/dialect/dialect.go b/clients/redshift/dialect/dialect.go index 29bddc52d..bd6fc2a1e 100644 --- a/clients/redshift/dialect/dialect.go +++ b/clients/redshift/dialect/dialect.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -81,7 +80,7 @@ func (RedshiftDialect) KindForDataType(rawType string, stringPrecision string) ( return typing.KindDetails{ Kind: typing.String.Kind, - OptionalStringPrecision: ptr.ToInt32(int32(precision)), + OptionalStringPrecision: typing.ToPtr(int32(precision)), }, nil } diff --git a/clients/redshift/dialect/dialect_test.go b/clients/redshift/dialect/dialect_test.go index 1ae7fbb4b..fe2f22023 100644 --- a/clients/redshift/dialect/dialect_test.go +++ b/clients/redshift/dialect/dialect_test.go @@ -8,7 +8,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" @@ -27,7 +26,7 @@ func TestRedshiftDialect_DataTypeForKind(t *testing.T) { assert.Equal(t, "VARCHAR(MAX)", RedshiftDialect{}.DataTypeForKind(typing.String, true)) } { - assert.Equal(t, "VARCHAR(12345)", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: ptr.ToInt32(12345)}, false)) + assert.Equal(t, "VARCHAR(12345)", RedshiftDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false)) } } } @@ -92,7 +91,7 @@ func TestRedshiftDialect_KindForDataType(t *testing.T) { // String with precision kd, err := dialect.KindForDataType("character varying", "65535") assert.NoError(t, err) - assert.Equal(t, typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: ptr.ToInt32(65535)}, kd) + assert.Equal(t, typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(65535))}, kd) } { // Times diff --git a/clients/s3/s3.go b/clients/s3/s3.go index 12701fe71..5a05c5539 100644 --- a/clients/s3/s3.go +++ b/clients/s3/s3.go @@ -8,23 +8,19 @@ import ( "os" "strings" - "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/ptr" - "github.com/artie-labs/transfer/lib/sql" - - "github.com/artie-labs/transfer/lib/stringutil" - - "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/artie-labs/transfer/lib/s3lib" - + "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/parquet" + "github.com/xitongsys/parquet-go/writer" "github.com/artie-labs/transfer/lib/config" + "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/parquetutil" - "github.com/xitongsys/parquet-go-source/local" - "github.com/xitongsys/parquet-go/writer" + "github.com/artie-labs/transfer/lib/s3lib" + "github.com/artie-labs/transfer/lib/sql" + "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/typing" + "github.com/artie-labs/transfer/lib/typing/ext" ) type Store struct { @@ -135,8 +131,8 @@ func (s *Store) Merge(tableData *optimization.TableData) error { Bucket: s.config.S3.Bucket, OptionalS3Prefix: s.ObjectPrefix(tableData), FilePath: fp, - OverrideAWSAccessKeyID: ptr.ToString(s.config.S3.AwsAccessKeyID), - OverrideAWSAccessKeySecret: ptr.ToString(s.config.S3.AwsSecretAccessKey), + OverrideAWSAccessKeyID: typing.ToPtr(s.config.S3.AwsAccessKeyID), + OverrideAWSAccessKeySecret: typing.ToPtr(s.config.S3.AwsSecretAccessKey), }); err != nil { return fmt.Errorf("failed to upload file to s3: %w", err) } diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 57a75a987..255f6d33d 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -11,7 +11,7 @@ import ( "github.com/artie-labs/transfer/lib/destination/types" "github.com/artie-labs/transfer/lib/jitter" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/ptr" + "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -92,7 +92,7 @@ func Merge(dwh destination.DataWarehouse, tableData *optimization.TableData, opt backfillErr = BackfillColumn(dwh, col, tableID) if backfillErr == nil { tableConfig.Columns().UpsertColumn(col.Name(), columns.UpsertColumnArg{ - Backfilled: ptr.ToBool(true), + Backfilled: typing.ToPtr(true), }) break } diff --git a/clients/snowflake/dialect/dialect.go b/clients/snowflake/dialect/dialect.go index a5e8b6850..7257460a9 100644 --- a/clients/snowflake/dialect/dialect.go +++ b/clients/snowflake/dialect/dialect.go @@ -6,7 +6,6 @@ import ( "strings" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/sql" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -89,7 +88,7 @@ func (SnowflakeDialect) KindForDataType(snowflakeType string, _ string) (typing. return typing.KindDetails{ Kind: typing.String.Kind, - OptionalStringPrecision: ptr.ToInt32(int32(precision)), + OptionalStringPrecision: typing.ToPtr(int32(precision)), }, nil default: return typing.Invalid, fmt.Errorf("expected at most one type parameters, received %d", len(parameters)) diff --git a/clients/snowflake/dialect/dialect_test.go b/clients/snowflake/dialect/dialect_test.go index 7190b3f6e..18b7cba46 100644 --- a/clients/snowflake/dialect/dialect_test.go +++ b/clients/snowflake/dialect/dialect_test.go @@ -7,7 +7,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/mocks" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/ext" @@ -27,7 +26,7 @@ func TestSnowflakeDialect_DataTypeForKind(t *testing.T) { assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.String, false)) } { - assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: ptr.ToInt32(12345)}, false)) + assert.Equal(t, "string", SnowflakeDialect{}.DataTypeForKind(typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(12345))}, false)) } } } diff --git a/clients/snowflake/staging_test.go b/clients/snowflake/staging_test.go index 9d2d63f8d..ae7447c18 100644 --- a/clients/snowflake/staging_test.go +++ b/clients/snowflake/staging_test.go @@ -9,7 +9,6 @@ import ( "github.com/artie-labs/transfer/clients/shared" "github.com/artie-labs/transfer/lib/config" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/destination/types" @@ -30,12 +29,12 @@ func (s *SnowflakeTestSuite) TestReplaceExceededValues() { // String + OptionalStringPrecision set + equal to OptionalStringPrecision: assert.Equal(s.T(), strings.Repeat("a", 100), - replaceExceededValues(strings.Repeat("a", 100), typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: ptr.ToInt32(100)}), + replaceExceededValues(strings.Repeat("a", 100), typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(100))}), ) // String + OptionalStringPrecision set + larger than OptionalStringPrecision: assert.Equal(s.T(), constants.ExceededValueMarker, - replaceExceededValues(strings.Repeat("a", 101), typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: ptr.ToInt32(100)}), + replaceExceededValues(strings.Repeat("a", 101), typing.KindDetails{Kind: typing.String.Kind, OptionalStringPrecision: typing.ToPtr(int32(100))}), ) } diff --git a/lib/config/snowflake.go b/lib/config/snowflake.go index 6000e0102..9bfec9c28 100644 --- a/lib/config/snowflake.go +++ b/lib/config/snowflake.go @@ -4,8 +4,7 @@ import ( "fmt" "github.com/artie-labs/transfer/lib/crypto" - - "github.com/artie-labs/transfer/lib/ptr" + "github.com/artie-labs/transfer/lib/typing" "github.com/snowflakedb/gosnowflake" ) @@ -32,10 +31,10 @@ func (s Snowflake) ToConfig() (*gosnowflake.Config, error) { Params: map[string]*string{ // This parameter will cancel in-progress queries if connectivity is lost. // https://docs.snowflake.com/en/sql-reference/parameters#abort-detached-query - "ABORT_DETACHED_QUERY": ptr.ToString("true"), + "ABORT_DETACHED_QUERY": typing.ToPtr("true"), // This parameter must be set to prevent the auth token from expiring after 4 hours. // https://docs.snowflake.com/en/user-guide/session-policies#considerations - "CLIENT_SESSION_KEEP_ALIVE": ptr.ToString("true"), + "CLIENT_SESSION_KEEP_ALIVE": typing.ToPtr("true"), }, } diff --git a/lib/debezium/schema.go b/lib/debezium/schema.go index 1eb4cf2e9..3ad21ef10 100644 --- a/lib/debezium/schema.go +++ b/lib/debezium/schema.go @@ -6,7 +6,6 @@ import ( "github.com/artie-labs/transfer/lib/debezium/converters" "github.com/artie-labs/transfer/lib/maputil" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" ) @@ -76,7 +75,7 @@ func (f Field) GetScaleAndPrecision() (int32, *int32, error) { return 0, nil, precisionErr } - precisionPtr = ptr.ToInt32(precision) + precisionPtr = typing.ToPtr(int32(precision)) } return scale, precisionPtr, nil diff --git a/lib/debezium/schema_test.go b/lib/debezium/schema_test.go index 753614534..93a6677dd 100644 --- a/lib/debezium/schema_test.go +++ b/lib/debezium/schema_test.go @@ -5,7 +5,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" @@ -38,7 +37,7 @@ func TestField_GetScaleAndPrecision(t *testing.T) { KafkaDecimalPrecisionKey: 10, }, expectedScale: 5, - expectedPrecision: ptr.ToInt32(10), + expectedPrecision: typing.ToPtr(int32(10)), }, { name: "Test Case 4: Invalid Scale Type", diff --git a/lib/optimization/event_update_test.go b/lib/optimization/event_update_test.go index f7e4322b1..1298afd4d 100644 --- a/lib/optimization/event_update_test.go +++ b/lib/optimization/event_update_test.go @@ -3,7 +3,6 @@ package optimization import ( "testing" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/decimal" @@ -180,7 +179,7 @@ func TestTableData_UpdateInMemoryColumnsFromDestination(t *testing.T) { // Testing string precision stringKindWithPrecision := typing.KindDetails{ Kind: typing.String.Kind, - OptionalStringPrecision: ptr.ToInt32(123), + OptionalStringPrecision: typing.ToPtr(int32(123)), } assert.NoError(t, tableData.MergeColumnsFromDestination(columns.NewColumn(strCol, stringKindWithPrecision))) diff --git a/lib/parquetutil/generate_schema.go b/lib/parquetutil/generate_schema.go index 8f02657d2..5e9d07c5c 100644 --- a/lib/parquetutil/generate_schema.go +++ b/lib/parquetutil/generate_schema.go @@ -3,7 +3,6 @@ package parquetutil import ( "encoding/json" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) @@ -20,13 +19,12 @@ func GenerateJSONSchema(columns []columns.Column) (string, error) { fields = append(fields, *field) } - schemaBytes, err := json.Marshal(typing.Field{ - Tag: typing.FieldTag{ - Name: "parquet-go-root", - RepetitionType: ptr.ToString("REQUIRED"), - }.String(), - Fields: fields, - }) + schemaBytes, err := json.Marshal( + typing.Field{ + Tag: typing.FieldTag{Name: "parquet-go-root", RepetitionType: typing.ToPtr("REQUIRED")}.String(), + Fields: fields, + }, + ) if err != nil { return "", err diff --git a/lib/ptr/ptr.go b/lib/ptr/ptr.go deleted file mode 100644 index 7196fda0e..000000000 --- a/lib/ptr/ptr.go +++ /dev/null @@ -1,27 +0,0 @@ -package ptr - -import "time" - -func ToString(val string) *string { - return &val -} - -func ToInt(val int) *int { - return &val -} - -func ToInt32(val int32) *int32 { - return &val -} - -func ToInt64(val int64) *int64 { - return &val -} - -func ToBool(val bool) *bool { - return &val -} - -func ToDuration(duration time.Duration) *time.Duration { - return &duration -} diff --git a/lib/typing/columns/columns_test.go b/lib/typing/columns/columns_test.go index 08585255f..7c283e09e 100644 --- a/lib/typing/columns/columns_test.go +++ b/lib/typing/columns/columns_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/typing" "github.com/stretchr/testify/assert" ) @@ -196,7 +195,7 @@ func TestColumns_UpsertColumns(t *testing.T) { // Now selectively update only a, b for _, key := range []string{"a", "b"} { cols.UpsertColumn(key, UpsertColumnArg{ - ToastCol: ptr.ToBool(true), + ToastCol: typing.ToPtr(true), }) // Now inspect. @@ -211,8 +210,8 @@ func TestColumns_UpsertColumns(t *testing.T) { assert.Equal(t, zzzCol.KindDetails, typing.Invalid) cols.UpsertColumn("aaa", UpsertColumnArg{ - ToastCol: ptr.ToBool(true), - PrimaryKey: ptr.ToBool(true), + ToastCol: typing.ToPtr(true), + PrimaryKey: typing.ToPtr(true), }) aaaCol, _ := cols.GetColumn("aaa") assert.True(t, aaaCol.ToastColumn) diff --git a/lib/typing/parquet.go b/lib/typing/parquet.go index 3ba663aa9..5a51ac2ce 100644 --- a/lib/typing/parquet.go +++ b/lib/typing/parquet.go @@ -6,8 +6,6 @@ import ( "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" - - "github.com/artie-labs/transfer/lib/ptr" ) type FieldTag struct { @@ -87,8 +85,8 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("BYTE_ARRAY"), - ConvertedType: ptr.ToString("UTF8"), + Type: ToPtr("BYTE_ARRAY"), + ConvertedType: ToPtr("UTF8"), }.String(), }, nil } @@ -99,7 +97,7 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("FLOAT"), + Type: ToPtr("FLOAT"), }.String(), }, nil case Integer.Kind, ETime.Kind: @@ -109,7 +107,7 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("INT64"), + Type: ToPtr("INT64"), }.String(), }, nil case EDecimal.Kind: @@ -120,8 +118,8 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("BYTE_ARRAY"), - ConvertedType: ptr.ToString("UTF8"), + Type: ToPtr("BYTE_ARRAY"), + ConvertedType: ToPtr("UTF8"), }.String(), }, nil } @@ -131,10 +129,10 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("BYTE_ARRAY"), - ConvertedType: ptr.ToString("DECIMAL"), - Precision: ptr.ToInt(int(precision)), - Scale: ptr.ToInt(int(scale)), + Type: ToPtr("BYTE_ARRAY"), + ConvertedType: ToPtr("DECIMAL"), + Precision: ToPtr(int(precision)), + Scale: ToPtr(int(scale)), }.String(), }, nil case Boolean.Kind: @@ -142,7 +140,7 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("BOOLEAN"), + Type: ToPtr("BOOLEAN"), }.String(), }, nil case Array.Kind: @@ -150,16 +148,16 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { Tag: FieldTag{ Name: colName, InName: &colName, - Type: ptr.ToString("LIST"), - RepetitionType: ptr.ToString("REQUIRED"), + Type: ToPtr("LIST"), + RepetitionType: ToPtr("REQUIRED"), }.String(), Fields: []Field{ { Tag: FieldTag{ Name: "element", - Type: ptr.ToString("BYTE_ARRAY"), - ConvertedType: ptr.ToString("UTF8"), - RepetitionType: ptr.ToString("REQUIRED"), + Type: ToPtr("BYTE_ARRAY"), + ConvertedType: ToPtr("UTF8"), + RepetitionType: ToPtr("REQUIRED"), }.String(), }, }, diff --git a/lib/typing/ptr.go b/lib/typing/ptr.go new file mode 100644 index 000000000..e4892a3f1 --- /dev/null +++ b/lib/typing/ptr.go @@ -0,0 +1,5 @@ +package typing + +func ToPtr[T any](v T) *T { + return &v +} diff --git a/models/event/event.go b/models/event/event.go index cd05684c5..4d87a13af 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -15,7 +15,6 @@ import ( "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" @@ -47,7 +46,7 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi // We need to escape the column name similar to have parity with event.GetColumns() columns.EscapeName(primaryKey), columns.UpsertColumnArg{ - PrimaryKey: ptr.ToBool(true), + PrimaryKey: typing.ToPtr(true), }, ) } @@ -196,7 +195,7 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali if toastedCol { inMemoryColumns.UpsertColumn(newColName, columns.UpsertColumnArg{ - ToastCol: ptr.ToBool(true), + ToastCol: typing.ToPtr(true), }) } else { retrievedColumn, isOk := inMemoryColumns.GetColumn(newColName) diff --git a/processes/pool/writes.go b/processes/pool/writes.go index 18b0e8aa6..2e8a84a3b 100644 --- a/processes/pool/writes.go +++ b/processes/pool/writes.go @@ -6,8 +6,8 @@ import ( "time" "github.com/artie-labs/transfer/lib/destination" - "github.com/artie-labs/transfer/lib/ptr" "github.com/artie-labs/transfer/lib/telemetry/metrics/base" + "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/models" "github.com/artie-labs/transfer/processes/consumer" ) @@ -17,10 +17,7 @@ func StartPool(ctx context.Context, inMemDB *models.DatabaseData, dest destinati ticker := time.NewTicker(td) for range ticker.C { slog.Info("Flushing via pool...") - if err := consumer.Flush(ctx, inMemDB, dest, metricsClient, consumer.Args{ - Reason: "time", - CoolDown: ptr.ToDuration(td), - }); err != nil { + if err := consumer.Flush(ctx, inMemDB, dest, metricsClient, consumer.Args{Reason: "time", CoolDown: typing.ToPtr(td)}); err != nil { slog.Error("Failed to flush via pool", slog.Any("err", err)) } }