Skip to content

Commit

Permalink
Improve Parquet (#1064)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Nov 21, 2024
1 parent 5c148f5 commit 4344149
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 83 deletions.
23 changes: 9 additions & 14 deletions clients/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package s3

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"os"
"strings"
"time"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
Expand Down Expand Up @@ -49,8 +49,8 @@ func (s *Store) IdentifierFor(topicConfig kafkalib.TopicConfig, table string) sq
func (s *Store) ObjectPrefix(tableData *optimization.TableData) string {
tableID := s.IdentifierFor(tableData.TopicConfig(), tableData.Name())
fqTableName := tableID.FullyQualifiedName()
yyyyMMDDFormat := tableData.LatestCDCTs.Format(ext.PostgresDateFormat)

// Adding date= prefix so that it adheres to the partitioning format for Hive.
yyyyMMDDFormat := fmt.Sprintf("date=%s", time.Now().Format(ext.PostgresDateFormat))
if len(s.config.S3.FolderName) > 0 {
return strings.Join([]string{s.config.S3.FolderName, fqTableName, yyyyMMDDFormat}, "/")
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
}

cols := tableData.ReadOnlyInMemoryCols().ValidColumns()
schema, err := parquetutil.GenerateJSONSchema(cols)
schema, err := parquetutil.BuildCSVSchema(cols)
if err != nil {
return fmt.Errorf("failed to generate parquet schema: %w", err)
}
Expand All @@ -89,29 +89,24 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er
return fmt.Errorf("failed to create a local parquet file: %w", err)
}

pw, err := writer.NewJSONWriter(schema, fw, 4)
pw, err := writer.NewCSVWriter(schema, fw, 4)
if err != nil {
return fmt.Errorf("failed to instantiate parquet writer: %w", err)
}

pw.CompressionType = parquet.CompressionCodec_GZIP
for _, val := range tableData.Rows() {
row := make(map[string]any)
var row []any
for _, col := range cols {
value, err := parquetutil.ParseValue(val[col.Name()], col)
value, err := parquetutil.ParseValue(val[col.Name()], col.KindDetails)
if err != nil {
return fmt.Errorf("failed to parse value, err: %w, value: %v, column: %q", err, val[col.Name()], col.Name())
}

row[col.Name()] = value
}

rowBytes, err := json.Marshal(row)
if err != nil {
return fmt.Errorf("failed to marshal row: %w", err)
row = append(row, value)
}

if err = pw.Write(string(rowBytes)); err != nil {
if err = pw.Write(row); err != nil {
return fmt.Errorf("failed to write row: %w", err)
}
}
Expand Down
8 changes: 5 additions & 3 deletions clients/s3/s3_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package s3

import (
"fmt"
"strings"
"testing"
"time"

"github.com/artie-labs/transfer/lib/typing/ext"

"github.com/stretchr/testify/assert"

"github.com/artie-labs/transfer/lib/config"
Expand All @@ -27,7 +30,6 @@ func TestObjectPrefix(t *testing.T) {
Schema: "public",
}, "table")

td.LatestCDCTs = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)
testCases := []struct {
name string
tableData *optimization.TableData
Expand All @@ -49,7 +51,7 @@ func TestObjectPrefix(t *testing.T) {
AwsAccessKeyID: "bar",
OutputFormat: constants.ParquetFormat,
},
expectedFormat: "db.public.table/2020-01-01",
expectedFormat: fmt.Sprintf("db.public.table/date=%s", time.Now().Format(ext.PostgresDateFormat)),
},
{
name: "valid #2 w/ folder",
Expand All @@ -61,7 +63,7 @@ func TestObjectPrefix(t *testing.T) {
OutputFormat: constants.ParquetFormat,
FolderName: "foo",
},
expectedFormat: "foo/db.public.table/2020-01-01",
expectedFormat: fmt.Sprintf("foo/db.public.table/date=%s", time.Now().Format(ext.PostgresDateFormat)),
},
}

Expand Down
24 changes: 5 additions & 19 deletions lib/parquetutil/generate_schema.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,20 @@
package parquetutil

import (
"encoding/json"

"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func GenerateJSONSchema(columns []columns.Column) (string, error) {
var fields []typing.Field
func BuildCSVSchema(columns []columns.Column) ([]string, error) {
var fields []string
for _, column := range columns {
// We don't need to escape the column name here.
field, err := column.KindDetails.ParquetAnnotation(column.Name())
if err != nil {
return "", err
return nil, err
}

fields = append(fields, *field)
}

schemaBytes, err := json.Marshal(
typing.Field{
Tag: typing.FieldTag{Name: "parquet-go-root", RepetitionType: typing.ToPtr("REQUIRED")}.String(),
Fields: fields,
},
)

if err != nil {
return "", err
fields = append(fields, field.Tag)
}

return string(schemaBytes), nil
return fields, nil
}
7 changes: 3 additions & 4 deletions lib/parquetutil/parse_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ import (
"github.com/artie-labs/transfer/lib/array"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/artie-labs/transfer/lib/typing/ext"
)

func ParseValue(colVal any, colKind columns.Column) (any, error) {
func ParseValue(colVal any, colKind typing.KindDetails) (any, error) {
if colVal == nil {
return nil, nil
}

switch colKind.KindDetails.Kind {
switch colKind.Kind {
case typing.Date.Kind:
_time, err := ext.ParseDateFromAny(colVal)
if err != nil {
Expand Down Expand Up @@ -51,7 +50,7 @@ func ParseValue(colVal any, colKind columns.Column) (any, error) {
case typing.String.Kind:
return colVal, nil
case typing.Struct.Kind:
if colKind.KindDetails == typing.Struct {
if colKind == typing.Struct {
if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) {
colVal = map[string]any{
"key": constants.ToastUnavailableValuePlaceholder,
Expand Down
19 changes: 9 additions & 10 deletions lib/parquetutil/parse_values_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,40 @@ import (

"github.com/artie-labs/transfer/lib/numbers"
"github.com/artie-labs/transfer/lib/typing"
"github.com/artie-labs/transfer/lib/typing/columns"
"github.com/artie-labs/transfer/lib/typing/decimal"
"github.com/stretchr/testify/assert"
)

func TestParseValue(t *testing.T) {
{
// Nil
value, err := ParseValue(nil, columns.Column{})
value, err := ParseValue(nil, typing.KindDetails{})
assert.NoError(t, err)
assert.Nil(t, value)
}
{
// String
value, err := ParseValue("test", columns.NewColumn("", typing.String))
value, err := ParseValue("test", typing.String)
assert.NoError(t, err)
assert.Equal(t, "test", value)
}
{
// Struct
value, err := ParseValue(map[string]any{"foo": "bar"}, columns.NewColumn("", typing.Struct))
value, err := ParseValue(map[string]any{"foo": "bar"}, typing.Struct)
assert.NoError(t, err)
assert.Equal(t, `{"foo":"bar"}`, value)
}
{
// Arrays
{
// Arrays (numbers - converted to string)
value, err := ParseValue([]any{123, 456}, columns.NewColumn("", typing.Array))
value, err := ParseValue([]any{123, 456}, typing.Array)
assert.NoError(t, err)
assert.Equal(t, []string{"123", "456"}, value)
}
{
// Arrays (booleans - converted to string)
value, err := ParseValue([]any{false, true, false}, columns.NewColumn("", typing.Array))
value, err := ParseValue([]any{false, true, false}, typing.Array)
assert.NoError(t, err)
assert.Equal(t, []string{"false", "true", "false"}, value)
}
Expand All @@ -48,27 +47,27 @@ func TestParseValue(t *testing.T) {
// Decimal
value, err := ParseValue(decimal.NewDecimalWithPrecision(
numbers.MustParseDecimal("5000.22320"), 30),
columns.NewColumn("", typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(30, 5))),
typing.NewDecimalDetailsFromTemplate(typing.EDecimal, decimal.NewDetails(30, 5)),
)

assert.NoError(t, err)
assert.Equal(t, "5000.22320", value)
}
{
// Time
value, err := ParseValue("03:15:00", columns.NewColumn("", typing.Time))
value, err := ParseValue("03:15:00", typing.Time)
assert.NoError(t, err)
assert.Equal(t, "03:15:00Z", value)
}
{
// Date
value, err := ParseValue("2022-12-25", columns.NewColumn("", typing.Date))
value, err := ParseValue("2022-12-25", typing.Date)
assert.NoError(t, err)
assert.Equal(t, "2022-12-25", value)
}
{
// Timestamp TZ
value, err := ParseValue("2023-04-24T17:29:05.69944Z", columns.NewColumn("", typing.TimestampTZ))
value, err := ParseValue("2023-04-24T17:29:05.69944Z", typing.TimestampTZ)
assert.NoError(t, err)
assert.Equal(t, int64(1682357345699), value)
}
Expand Down
36 changes: 7 additions & 29 deletions lib/typing/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

type FieldTag struct {
Name string
InName *string
Type *string
ConvertedType *string
ValueConvertedType *string
Expand All @@ -25,10 +24,6 @@ func (f FieldTag) String() string {
fmt.Sprintf("name=%s", f.Name),
}

if f.InName != nil {
parts = append(parts, fmt.Sprintf("inname=%s", *f.InName))
}

if f.Type != nil {
parts = append(parts, fmt.Sprintf("type=%s", *f.Type))
}
Expand Down Expand Up @@ -69,58 +64,43 @@ type Field struct {

func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) {
switch k.Kind {
case
String.Kind,
Struct.Kind,
Date.Kind,
Time.Kind:
// We could go further with struct, but it's very possible that it has inconsistent column headers across all the rows.
// It's much safer to just treat this as a string. When we do bring this data out into another destination,
// then just parse it as a JSON string, into a VARIANT column.
case String.Kind, Struct.Kind, Date.Kind, Time.Kind:
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("BYTE_ARRAY"),
ConvertedType: ToPtr("UTF8"),
}.String(),
}, nil
case Float.Kind:
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("FLOAT"),
Name: colName,
Type: ToPtr("FLOAT"),
}.String(),
}, nil
case Integer.Kind, TimestampNTZ.Kind, TimestampTZ.Kind:
// Parquet doesn't have native time types, so we are using int64 and casting the value as UNIX ts.
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("INT64"),
Name: colName,
Type: ToPtr("INT64"),
}.String(),
}, nil
case EDecimal.Kind:
precision := k.ExtendedDecimalDetails.Precision()
if precision == decimal.PrecisionNotSpecified {
// This is a variable precision decimal, so we'll just treat it as a string.
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("BYTE_ARRAY"),
ConvertedType: ToPtr("UTF8"),
}.String(),
}, nil
}

scale := k.ExtendedDecimalDetails.Scale()
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("BYTE_ARRAY"),
ConvertedType: ToPtr("DECIMAL"),
Precision: ToPtr(int(precision)),
Expand All @@ -130,16 +110,14 @@ func (k *KindDetails) ParquetAnnotation(colName string) (*Field, error) {
case Boolean.Kind:
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("BOOLEAN"),
Name: colName,
Type: ToPtr("BOOLEAN"),
}.String(),
}, nil
case Array.Kind:
return &Field{
Tag: FieldTag{
Name: colName,
InName: &colName,
Type: ToPtr("LIST"),
RepetitionType: ToPtr("REQUIRED"),
}.String(),
Expand Down
6 changes: 2 additions & 4 deletions lib/typing/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func TestKindDetails_ParquetAnnotation(t *testing.T) {
Field{
Tag: FieldTag{
Name: "foo",
InName: ToPtr("foo"),
Type: ToPtr("BYTE_ARRAY"),
ConvertedType: ToPtr("UTF8"),
}.String(),
Expand All @@ -33,9 +32,8 @@ func TestKindDetails_ParquetAnnotation(t *testing.T) {
assert.Equal(t,
Field{
Tag: FieldTag{
Name: "foo",
InName: ToPtr("foo"),
Type: ToPtr("INT64"),
Name: "foo",
Type: ToPtr("INT64"),
}.String(),
},
*field,
Expand Down

0 comments on commit 4344149

Please sign in to comment.