From 4344149f07ff34cead80aebbc3e44ef617602f28 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 21 Nov 2024 11:48:12 -0800 Subject: [PATCH] Improve Parquet (#1064) --- clients/s3/s3.go | 23 +++++++----------- clients/s3/s3_test.go | 8 ++++--- lib/parquetutil/generate_schema.go | 24 ++++--------------- lib/parquetutil/parse_values.go | 7 +++--- lib/parquetutil/parse_values_test.go | 19 +++++++-------- lib/typing/parquet.go | 36 ++++++---------------------- lib/typing/parquet_test.go | 6 ++--- 7 files changed, 40 insertions(+), 83 deletions(-) diff --git a/clients/s3/s3.go b/clients/s3/s3.go index ec12bb1b6..f0b10be9e 100644 --- a/clients/s3/s3.go +++ b/clients/s3/s3.go @@ -2,11 +2,11 @@ package s3 import ( "context" - "encoding/json" "fmt" "log/slog" "os" "strings" + "time" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/parquet" @@ -49,8 +49,8 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq func (s *Store) ObjectPrefix(tableData *optimization.TableData) string { tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name()) fqTableName := tableID.FullyQualifiedName() - yyyyMMDDFormat := tableData.LatestCDCTs.Format(ext.PostgresDateFormat) - + // Adding date= prefix so that it adheres to the partitioning format for Hive. + yyyyMMDDFormat := fmt.Sprintf("date=%s", time.Now().Format(ext.PostgresDateFormat)) if len(s.config.S3.FolderName) > 0 { return strings.Join([]string{s.config.S3.FolderName, fqTableName, yyyyMMDDFormat}, "/") } @@ -78,7 +78,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er } cols := tableData.ReadOnlyInMemoryCols().ValidColumns() - schema, err := parquetutil.GenerateJSONSchema(cols) + schema, err := parquetutil.BuildCSVSchema(cols) if err != nil { return fmt.Errorf("failed to generate parquet schema: %w", err) } @@ -89,29 +89,24 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er return fmt.Errorf("failed to create a local parquet file: %w", err) } - pw, err := writer.NewJSONWriter(schema, fw, 4) + pw, err := writer.NewCSVWriter(schema, fw, 4) if err != nil { return fmt.Errorf("failed to instantiate parquet writer: %w", err) } pw.CompressionType = parquet.CompressionCodec_GZIP for _, val := range tableData.Rows() { - row := make(map[string]any) + var row []any for _, col := range cols { - value, err := parquetutil.ParseValue(val[col.Name()], col) + value, err := parquetutil.ParseValue(val[col.Name()], col.KindDetails) if err != nil { return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %q", err, val[col.Name()], col.Name()) } - row[col.Name()] = value - } - - rowBytes, err := json.Marshal(row) - if err != nil { - return fmt.Errorf("failed to marshal row: %w", err) + row = append(row, value) } - if err = pw.Write(string(rowBytes)); err != nil { + if err = pw.Write(row); err != nil { return fmt.Errorf("failed to write row: %w", err) } } diff --git a/clients/s3/s3_test.go b/clients/s3/s3_test.go index 140085e58..b374d63be 100644 --- a/clients/s3/s3_test.go +++ b/clients/s3/s3_test.go @@ -1,10 +1,13 @@ package s3 import ( + "fmt" "strings" "testing" "time" + "github.com/artie-labs/transfer/lib/typing/ext" + "github.com/stretchr/testify/assert" "github.com/artie-labs/transfer/lib/config" @@ -27,7 +30,6 @@ func TestObjectPrefix(t *testing.T) { Schema: "public", }, "table") - td.LatestCDCTs = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) testCases := []struct { name string tableData *optimization.TableData @@ -49,7 +51,7 @@ func TestObjectPrefix(t *testing.T) { AwsAccessKeyID: "bar", OutputFormat: constants.ParquetFormat, }, - expectedFormat: "db.public.table/2020-01-01", + expectedFormat: fmt.Sprintf("db.public.table/date=%s", time.Now().Format(ext.PostgresDateFormat)), }, { name: "valid #2 w/ folder", @@ -61,7 +63,7 @@ func TestObjectPrefix(t *testing.T) { OutputFormat: constants.ParquetFormat, FolderName: "foo", }, - expectedFormat: "foo/db.public.table/2020-01-01", + expectedFormat: fmt.Sprintf("foo/db.public.table/date=%s", time.Now().Format(ext.PostgresDateFormat)), }, } diff --git a/lib/parquetutil/generate_schema.go b/lib/parquetutil/generate_schema.go index 5e9d07c5c..6d2633190 100644 --- a/lib/parquetutil/generate_schema.go +++ b/lib/parquetutil/generate_schema.go @@ -1,34 +1,20 @@ package parquetutil import ( - "encoding/json" - - "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" ) -func GenerateJSONSchema(columns []columns.Column) (string, error) { - var fields []typing.Field +func BuildCSVSchema(columns []columns.Column) ([]string, error) { + var fields []string for _, column := range columns { // We don't need to escape the column name here. field, err := column.KindDetails.ParquetAnnotation(column.Name()) if err != nil { - return "", err + return nil, err } - fields = append(fields, *field) - } - - schemaBytes, err := json.Marshal( - typing.Field{ - Tag: typing.FieldTag{Name: "parquet-go-root", RepetitionType: typing.ToPtr("REQUIRED")}.String(), - Fields: fields, - }, - ) - - if err != nil { - return "", err + fields = append(fields, field.Tag) } - return string(schemaBytes), nil + return fields, nil } diff --git a/lib/parquetutil/parse_values.go b/lib/parquetutil/parse_values.go index 911a6a1b3..a1009bcc6 100644 --- a/lib/parquetutil/parse_values.go +++ b/lib/parquetutil/parse_values.go @@ -9,17 +9,16 @@ import ( "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/artie-labs/transfer/lib/typing/ext" ) -func ParseValue(colVal any, colKind columns.Column) (any, error) { +func ParseValue(colVal any, colKind typing.KindDetails) (any, error) { if colVal == nil { return nil, nil } - switch colKind.KindDetails.Kind { + switch colKind.Kind { case typing.Date.Kind: _time, err := ext.ParseDateFromAny(colVal) if err != nil { @@ -51,7 +50,7 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) { case typing.String.Kind: return colVal, nil case typing.Struct.Kind: - if colKind.KindDetails == typing.Struct { + if colKind == typing.Struct { if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { colVal = map[string]any{ "key": constants.ToastUnavailableValuePlaceholder, diff --git a/lib/parquetutil/parse_values_test.go b/lib/parquetutil/parse_values_test.go index 1f692888a..301bbf25d 100644 --- a/lib/parquetutil/parse_values_test.go +++ b/lib/parquetutil/parse_values_test.go @@ -5,7 +5,6 @@ import ( "github.com/artie-labs/transfer/lib/numbers" "github.com/artie-labs/transfer/lib/typing" - "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/lib/typing/decimal" "github.com/stretchr/testify/assert" ) @@ -13,19 +12,19 @@ import ( func TestParseValue(t *testing.T) { { // Nil - value, err := ParseValue(nil, columns.Column{}) + value, err := ParseValue(nil, typing.KindDetails{}) assert.NoError(t, err) assert.Nil(t, value) } { // String - value, err := ParseValue("test", columns.NewColumn("", typing.String)) + value, err := ParseValue("test", typing.String) assert.NoError(t, err) assert.Equal(t, "test", value) } { // Struct - value, err := ParseValue(map[string]any{"foo": "bar"}, columns.NewColumn("", typing.Struct)) + value, err := ParseValue(map[string]any{"foo": "bar"}, typing.Struct) assert.NoError(t, err) assert.Equal(t, `{"foo":"bar"}`, value) } @@ -33,13 +32,13 @@ func TestParseValue(t *testing.T) { // Arrays { // Arrays (numbers - converted to string) - value, err := ParseValue([]any{123, 456}, columns.NewColumn("", typing.Array)) + value, err := ParseValue([]any{123, 456}, typing.Array) assert.NoError(t, err) assert.Equal(t, []string{"123", "456"}, value) } { // Arrays (booleans - converted to string) - value, err := ParseValue([]any{false, true, false}, columns.NewColumn("", typing.Array)) + value, err := ParseValue([]any{false, true, false}, typing.Array) assert.NoError(t, err) assert.Equal(t, []string{"false", "true", "false"}, value) } @@ -48,7 +47,7 @@ func TestParseValue(t *testing.T) { // Decimal value, err := ParseValue(decimal.NewDecimalWithPrecision( numbers.MustParseDecimal("5000.22320"), 30), - columns.NewColumn("", typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(30, 5))), + typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(30, 5)), ) assert.NoError(t, err) @@ -56,19 +55,19 @@ func TestParseValue(t *testing.T) { } { // Time - value, err := ParseValue("03:15:00", columns.NewColumn("", typing.Time)) + value, err := ParseValue("03:15:00", typing.Time) assert.NoError(t, err) assert.Equal(t, "03:15:00Z", value) } { // Date - value, err := ParseValue("2022-12-25", columns.NewColumn("", typing.Date)) + value, err := ParseValue("2022-12-25", typing.Date) assert.NoError(t, err) assert.Equal(t, "2022-12-25", value) } { // Timestamp TZ - value, err := ParseValue("2023-04-24T17:29:05.69944Z", columns.NewColumn("", typing.TimestampTZ)) + value, err := ParseValue("2023-04-24T17:29:05.69944Z", typing.TimestampTZ) assert.NoError(t, err) assert.Equal(t, int64(1682357345699), value) } diff --git a/lib/typing/parquet.go b/lib/typing/parquet.go index f124119d1..33c14bb61 100644 --- a/lib/typing/parquet.go +++ b/lib/typing/parquet.go @@ -9,7 +9,6 @@ import ( type FieldTag struct { Name string - InName *string Type *string ConvertedType *string ValueConvertedType *string @@ -25,10 +24,6 @@ func (f FieldTag) String() string { fmt.Sprintf("name=%s", f.Name), } - if f.InName != nil { - parts = append(parts, fmt.Sprintf("inname=%s", *f.InName)) - } - if f.Type != nil { parts = append(parts, fmt.Sprintf("type=%s", *f.Type)) } @@ -69,18 +64,10 @@ type Field struct { func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { switch k.Kind { - case - String.Kind, - Struct.Kind, - Date.Kind, - Time.Kind: - // We could go further with struct, but it's very possible that it has inconsistent column headers across all the rows. - // It's much safer to just treat this as a string. When we do bring this data out into another destination, - // then just parse it as a JSON string, into a VARIANT column. + case String.Kind, Struct.Kind, Date.Kind, Time.Kind: return &Field{ Tag: FieldTag{ Name: colName, - InName: &colName, Type: ToPtr("BYTE_ARRAY"), ConvertedType: ToPtr("UTF8"), }.String(), @@ -88,39 +75,32 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { case Float.Kind: return &Field{ Tag: FieldTag{ - Name: colName, - InName: &colName, - Type: ToPtr("FLOAT"), + Name: colName, + Type: ToPtr("FLOAT"), }.String(), }, nil case Integer.Kind, TimestampNTZ.Kind, TimestampTZ.Kind: - // Parquet doesn't have native time types, so we are using int64 and casting the value as UNIX ts. return &Field{ Tag: FieldTag{ - Name: colName, - InName: &colName, - Type: ToPtr("INT64"), + Name: colName, + Type: ToPtr("INT64"), }.String(), }, nil case EDecimal.Kind: precision := k.ExtendedDecimalDetails.Precision() if precision == decimal.PrecisionNotSpecified { - // This is a variable precision decimal, so we'll just treat it as a string. return &Field{ Tag: FieldTag{ Name: colName, - InName: &colName, Type: ToPtr("BYTE_ARRAY"), ConvertedType: ToPtr("UTF8"), }.String(), }, nil } - scale := k.ExtendedDecimalDetails.Scale() return &Field{ Tag: FieldTag{ Name: colName, - InName: &colName, Type: ToPtr("BYTE_ARRAY"), ConvertedType: ToPtr("DECIMAL"), Precision: ToPtr(int(precision)), @@ -130,16 +110,14 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) { case Boolean.Kind: return &Field{ Tag: FieldTag{ - Name: colName, - InName: &colName, - Type: ToPtr("BOOLEAN"), + Name: colName, + Type: ToPtr("BOOLEAN"), }.String(), }, nil case Array.Kind: return &Field{ Tag: FieldTag{ Name: colName, - InName: &colName, Type: ToPtr("LIST"), RepetitionType: ToPtr("REQUIRED"), }.String(), diff --git a/lib/typing/parquet_test.go b/lib/typing/parquet_test.go index 21e9bc734..83a8193c2 100644 --- a/lib/typing/parquet_test.go +++ b/lib/typing/parquet_test.go @@ -16,7 +16,6 @@ func TestKindDetails_ParquetAnnotation(t *testing.T) { Field{ Tag: FieldTag{ Name: "foo", - InName: ToPtr("foo"), Type: ToPtr("BYTE_ARRAY"), ConvertedType: ToPtr("UTF8"), }.String(), @@ -33,9 +32,8 @@ func TestKindDetails_ParquetAnnotation(t *testing.T) { assert.Equal(t, Field{ Tag: FieldTag{ - Name: "foo", - InName: ToPtr("foo"), - Type: ToPtr("INT64"), + Name: "foo", + Type: ToPtr("INT64"), }.String(), }, *field,