diff --git a/clients/bigquery/bigquery.go b/clients/bigquery/bigquery.go index ed1f70a12..c0c23d079 100644 --- a/clients/bigquery/bigquery.go +++ b/clients/bigquery/bigquery.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + "cloud.google.com/go/bigquery" _ "github.com/viant/bigquery" "github.com/artie-labs/transfer/lib/config" @@ -33,6 +34,24 @@ func (s *Store) Label() constants.DestinationKind { return constants.BigQuery } +func (s *Store) GetClient(ctx context.Context) *bigquery.Client { + settings := config.FromContext(ctx) + client, err := bigquery.NewClient(ctx, settings.Config.BigQuery.ProjectID) + if err != nil { + logger.FromContext(ctx).WithError(err).Fatalf("failed to get bigquery client") + } + + return client +} + +func (s *Store) PutTable(ctx context.Context, dataset, tableName string, rows []*Row) error { + client := s.GetClient(ctx) + defer client.Close() + + inserter := client.Dataset(dataset).Table(tableName).Inserter() + return inserter.Put(ctx, rows) +} + func LoadBigQuery(ctx context.Context, _store *db.Store) *Store { if _store != nil { // Used for tests. diff --git a/clients/bigquery/cast.go b/clients/bigquery/cast.go index 0fe5aeedc..2cb84f4f1 100644 --- a/clients/bigquery/cast.go +++ b/clients/bigquery/cast.go @@ -3,62 +3,54 @@ package bigquery import ( "fmt" "strings" - "time" "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/stringutil" "github.com/artie-labs/transfer/lib/typing/ext" "github.com/artie-labs/transfer/lib/typing" ) -func CastColVal(colVal interface{}, colKind typing.Column) (string, error) { +func CastColVal(colVal interface{}, colKind typing.Column) (interface{}, error) { if colVal != nil { switch colKind.KindDetails.Kind { case typing.ETime.Kind: extTime, err := ext.ParseFromInterface(colVal) if err != nil { - return "", fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err) + return nil, fmt.Errorf("failed to cast colVal as time.Time, colVal: %v, err: %v", colVal, err) } switch extTime.NestedKind.Type { + // https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery#sending_datetime_data case ext.DateTimeKindType: - colVal = fmt.Sprintf("PARSE_DATETIME('%s', '%v')", RFC3339Format, extTime.String(time.RFC3339Nano)) + colVal = extTime.StringUTC(ext.BigQueryDateTimeFormat) case ext.DateKindType: - colVal = fmt.Sprintf("PARSE_DATE('%s', '%v')", PostgresDateFormat, extTime.String(ext.Date.Format)) + colVal = extTime.String(ext.PostgresDateFormat) case ext.TimeKindType: - colVal = fmt.Sprintf("PARSE_TIME('%s', '%v')", PostgresTimeFormatNoTZ, extTime.String(ext.PostgresTimeFormatNoTZ)) + colVal = extTime.String(typing.StreamingTimeFormat) } // All the other types do not need string wrapping. case typing.String.Kind, typing.Struct.Kind: - colVal = stringutil.Wrap(colVal) - colVal = stringutil.LineBreaksToCarriageReturns(fmt.Sprint(colVal)) if colKind.KindDetails == typing.Struct { if strings.Contains(fmt.Sprint(colVal), constants.ToastUnavailableValuePlaceholder) { - colVal = typing.BigQueryJSON(fmt.Sprintf(`{"key": "%s"}`, constants.ToastUnavailableValuePlaceholder)) - } else { - // This is how you cast string -> JSON - colVal = fmt.Sprintf("JSON %s", colVal) + colVal = map[string]interface{}{ + "key": constants.ToastUnavailableValuePlaceholder, + } } } case typing.Array.Kind: var err error - colVal, err = array.InterfaceToArrayStringEscaped(colVal) + colVal, err = array.InterfaceToArrayString(colVal) if err != nil { - return "", err + return nil, err } + + return colVal, nil } - } else { - if colKind.KindDetails == typing.String { - // BigQuery does not like null as a string for CTEs. - // It throws this error: Value of type INT64 cannot be assigned to column name, which has type STRING - colVal = "''" - } else { - colVal = "null" - } + + return fmt.Sprint(colVal), nil } - return fmt.Sprint(colVal), nil + return nil, nil } diff --git a/clients/bigquery/cast_test.go b/clients/bigquery/cast_test.go index 79e97c6fc..a7495cee4 100644 --- a/clients/bigquery/cast_test.go +++ b/clients/bigquery/cast_test.go @@ -2,6 +2,9 @@ package bigquery import ( "testing" + "time" + + "github.com/artie-labs/transfer/lib/typing/ext" "github.com/stretchr/testify/assert" @@ -14,40 +17,76 @@ func TestCastColVal(t *testing.T) { colVal interface{} colKind typing.Column - expectedErr error - expectedString string + expectedErr error + expectedValue interface{} } + tsKind := typing.ETime + tsKind.ExtendedTimeDetails = &ext.DateTime + + dateKind := typing.ETime + dateKind.ExtendedTimeDetails = &ext.Date + + birthday := time.Date(2022, time.September, 6, 3, 19, 24, 942000000, time.UTC) + birthdayTSExt, err := ext.NewExtendedTime(birthday, tsKind.ExtendedTimeDetails.Type, "") + assert.NoError(t, err) + + birthdayDateExt, err := ext.NewExtendedTime(birthday, dateKind.ExtendedTimeDetails.Type, "") + assert.NoError(t, err) + + timeKind := typing.ETime + timeKind.ExtendedTimeDetails = &ext.Time + birthdayTimeExt, err := ext.NewExtendedTime(birthday, timeKind.ExtendedTimeDetails.Type, "") + assert.NoError(t, err) + testCases := []_testCase{ { - name: "escaping string", - colVal: "foo", - colKind: typing.Column{KindDetails: typing.String}, - expectedString: "'foo'", + name: "escaping string", + colVal: "foo", + colKind: typing.Column{KindDetails: typing.String}, + expectedValue: "foo", + }, + { + name: "123 as int", + colVal: 123, + colKind: typing.Column{KindDetails: typing.Integer}, + expectedValue: "123", + }, + { + name: "struct", + colVal: `{"hello": "world"}`, + colKind: typing.Column{KindDetails: typing.Struct}, + expectedValue: `{"hello": "world"}`, + }, + { + name: "array", + colVal: []int{1, 2, 3, 4, 5}, + colKind: typing.Column{KindDetails: typing.Array}, + expectedValue: []string{"1", "2", "3", "4", "5"}, }, { - name: "123 as int", - colVal: 123, - colKind: typing.Column{KindDetails: typing.Integer}, - expectedString: "123", + name: "timestamp", + colVal: birthdayTSExt, + colKind: typing.Column{KindDetails: tsKind}, + expectedValue: "2022-09-06 03:19:24.942", }, { - name: "struct", - colVal: `{"hello": "world"}`, - colKind: typing.Column{KindDetails: typing.Struct}, - expectedString: `JSON '{"hello": "world"}'`, + name: "date", + colVal: birthdayDateExt, + colKind: typing.Column{KindDetails: dateKind}, + expectedValue: "2022-09-06", }, { - name: "array", - colVal: []int{1, 2, 3, 4, 5}, - colKind: typing.Column{KindDetails: typing.Array}, - expectedString: `['1','2','3','4','5']`, + name: "time", + colVal: birthdayTimeExt, + colKind: typing.Column{KindDetails: timeKind}, + expectedValue: "03:19:24", }, } for _, testCase := range testCases { actualString, actualErr := CastColVal(testCase.colVal, testCase.colKind) assert.Equal(t, testCase.expectedErr, actualErr, testCase.name) - assert.Equal(t, testCase.expectedString, actualString, testCase.name) + assert.Equal(t, testCase.expectedValue, actualString, testCase.name) } } diff --git a/clients/bigquery/constants.go b/clients/bigquery/constants.go deleted file mode 100644 index 19c23c87a..000000000 --- a/clients/bigquery/constants.go +++ /dev/null @@ -1,8 +0,0 @@ -package bigquery - -// https://cloud.google.com/bigquery/docs/reference/standard-sql/format-elements#format_elements_date_time - -const RFC3339Format = "%Y-%m-%dT%H:%M:%E*SZ" -const PostgresDateFormat = "%F" // YYYY-MM-DD -// PostgresTimeFormatNoTZ does not contain TZ for BigQuery because BigQuery's `Time` type does not like time zones. -const PostgresTimeFormatNoTZ = "%H:%M:%E*S" // HH:MM:SS (micro-seconds) diff --git a/clients/bigquery/merge.go b/clients/bigquery/merge.go index de96df676..6603989e2 100644 --- a/clients/bigquery/merge.go +++ b/clients/bigquery/merge.go @@ -3,66 +3,57 @@ package bigquery import ( "context" "fmt" - "strings" + + "cloud.google.com/go/bigquery" + + "github.com/artie-labs/transfer/lib/dwh/dml" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/dwh/ddl" - "github.com/artie-labs/transfer/lib/dwh/dml" "github.com/artie-labs/transfer/lib/logger" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/typing" ) -func merge(tableData *optimization.TableData) (string, error) { - var cols []string - // Given all the columns, diff this against SFLK. - for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() { - if col.KindDetails == typing.Invalid { - // Don't update BQ - continue - } +type Row struct { + data map[string]bigquery.Value +} - cols = append(cols, col.Name) +func NewRow(data map[string]bigquery.Value) *Row { + return &Row{ + data: data, } +} - var rowValues []string - firstRow := true +func (r *Row) Save() (map[string]bigquery.Value, string, error) { + return r.data, bigquery.NoDedupeID, nil +} +func merge(tableData *optimization.TableData) ([]*Row, error) { + var rows []*Row for _, value := range tableData.RowsData() { - var colVals []string - for _, col := range cols { + data := make(map[string]bigquery.Value) + for _, col := range tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate() { colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col) colVal, err := CastColVal(value[col], colKind) if err != nil { - return "", err + return nil, err } - if firstRow { - colVal = fmt.Sprintf("%v as %s", colVal, col) + if colVal != nil { + data[col] = colVal } - - colVals = append(colVals, colVal) } - firstRow = false - rowValues = append(rowValues, fmt.Sprintf("SELECT %s", strings.Join(colVals, ","))) + rows = append(rows, NewRow(data)) } - subQuery := strings.Join(rowValues, " UNION ALL ") - - return dml.MergeStatement(dml.MergeArgument{ - FqTableName: tableData.ToFqName(constants.BigQuery), - SubQuery: subQuery, - IdempotentKey: tableData.IdempotentKey, - PrimaryKeys: tableData.PrimaryKeys, - Columns: cols, - ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(), - SoftDelete: tableData.SoftDelete, - BigQueryTypeCasting: true, - }) + return rows, nil } func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error { + // TODO - write test for this. + if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil { // There's no rows or columns. Let's skip. return nil @@ -85,7 +76,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er createAlterTableArgs := ddl.AlterTableArgs{ Dwh: s, Tc: tableConfig, - FqTableName: tableData.ToFqName(constants.BigQuery), + FqTableName: tableData.ToFqName(s.Label()), CreateTable: tableConfig.CreateTable, ColumnOp: constants.Add, CdcTime: tableData.LatestCDCTs, @@ -104,7 +95,7 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er deleteAlterTableArgs := ddl.AlterTableArgs{ Dwh: s, Tc: tableConfig, - FqTableName: tableData.ToFqName(constants.BigQuery), + FqTableName: tableData.ToFqName(s.Label()), CreateTable: false, ColumnOp: constants.Delete, CdcTime: tableData.LatestCDCTs, @@ -133,14 +124,54 @@ func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) er } } + // Start temporary table creation + tempAlterTableArgs := ddl.AlterTableArgs{ + Dwh: s, + Tc: tableConfig, + FqTableName: fmt.Sprintf("%s_%s", tableData.ToFqName(s.Label()), tableData.TempTableSuffix()), + CreateTable: true, + TemporaryTable: true, + ColumnOp: constants.Add, + } + + if err = ddl.AlterTable(ctx, tempAlterTableArgs, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil { + return fmt.Errorf("failed to create temp table, error: %v", err) + } + // End temporary table creation + tableData.UpdateInMemoryColumnsFromDestination(tableConfig.Columns().GetColumns()...) - query, err := merge(tableData) + rows, err := merge(tableData) if err != nil { log.WithError(err).Warn("failed to generate the merge query") return err } - log.WithField("query", query).Debug("executing...") - _, err = s.Exec(query) - return err + tableName := fmt.Sprintf("%s_%s", tableData.TableName, tableData.TempTableSuffix()) + err = s.PutTable(ctx, tableData.Database, tableName, rows) + if err != nil { + return fmt.Errorf("failed to insert into temp table: %s, error: %v", tableName, err) + } + + mergeQuery, err := dml.MergeStatement(dml.MergeArgument{ + FqTableName: tableData.ToFqName(constants.BigQuery), + SubQuery: tempAlterTableArgs.FqTableName, + IdempotentKey: tableData.IdempotentKey, + PrimaryKeys: tableData.PrimaryKeys, + Columns: tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(), + ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(), + SoftDelete: tableData.SoftDelete, + BigQuery: true, + }) + + if err != nil { + return err + } + + _, err = s.Exec(mergeQuery) + if err != nil { + return err + } + + ddl.DropTemporaryTable(ctx, s, tempAlterTableArgs.FqTableName) + return nil } diff --git a/clients/bigquery/merge_test.go b/clients/bigquery/merge_test.go index 15bbef416..cbea0568c 100644 --- a/clients/bigquery/merge_test.go +++ b/clients/bigquery/merge_test.go @@ -1,298 +1 @@ package bigquery - -import ( - "fmt" - "github.com/artie-labs/transfer/lib/config/constants" - "github.com/artie-labs/transfer/lib/kafkalib" - "github.com/artie-labs/transfer/lib/optimization" - "github.com/artie-labs/transfer/lib/typing" - "github.com/stretchr/testify/assert" - "strings" -) - -func (b *BigQueryTestSuite) TestMergeNoDeleteFlag() { - var cols typing.Columns - cols.AddColumn(typing.Column{ - Name: "id", - KindDetails: typing.Integer, - }) - - tableData := optimization.NewTableData(&cols, []string{"id"}, kafkalib.TopicConfig{}) - _, err := merge(tableData) - assert.Error(b.T(), err, "merge failed") -} - -func (b *BigQueryTestSuite) TestMerge() { - primaryKeys := []string{"id"} - - var cols typing.Columns - for colName, kindDetails := range map[string]typing.KindDetails{ - "id": typing.Integer, - "name": typing.String, - "multiline": typing.String, - constants.DeleteColumnMarker: typing.Boolean, - } { - cols.AddColumn(typing.Column{ - Name: colName, - KindDetails: kindDetails, - }) - } - - rowData := make(map[string]map[string]interface{}) - for idx, name := range []string{"robin", "jacqueline", "dusty"} { - pk := fmt.Sprint(idx + 1) - rowData[pk] = map[string]interface{}{ - "id": pk, - "name": name, - "multiline": `artie -dusty -robin -jacqueline -charlie`, - constants.DeleteColumnMarker: false, - } - } - - topicConfig := kafkalib.TopicConfig{ - Database: "shop", - TableName: "customer", - Schema: "public", - } - - tableData := optimization.NewTableData(&cols, primaryKeys, topicConfig) - for pk, row := range rowData { - tableData.InsertRow(pk, row) - } - - mergeSQL, err := merge(tableData) - - assert.NoError(b.T(), err, "merge failed") - // Check if MERGE INTO FQ Table exists. - assert.True(b.T(), strings.Contains(mergeSQL, "MERGE INTO shop.customer c"), mergeSQL) - // Check for equality merge - for _, pk := range primaryKeys { - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("c.%s = cc.%s", pk, pk))) - } - - for _, rowData := range tableData.RowsData() { - for col, val := range rowData { - switch _col, _ := cols.GetColumn(col); _col.KindDetails { - case typing.String, typing.Array, typing.Struct: - if col == "multiline" { - // Check the multiline string was escaped properly - val = strings.Join([]string{"artie", "dusty", "robin", "jacqueline", "charlie"}, `\n`) - } else { - val = fmt.Sprintf("'%v'", val) - } - - } - - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprint(val)), map[string]interface{}{ - "merge": mergeSQL, - "val": val, - }) - } - } -} - -func (b *BigQueryTestSuite) TestMergeJSONKey() { - var cols typing.Columns - for colName, kindDetails := range map[string]typing.KindDetails{ - "id": typing.Struct, - "name": typing.String, - constants.DeleteColumnMarker: typing.Boolean, - } { - cols.AddColumn(typing.Column{ - Name: colName, - KindDetails: kindDetails, - }) - } - - rowData := make(map[string]map[string]interface{}) - for idx, name := range []string{"robin", "jacqueline", "dusty"} { - pkVal := fmt.Sprint(map[string]interface{}{ - "$oid": fmt.Sprintf("640127e4beeb1ccfc821c25c++%v", idx), - }) - - rowData[pkVal] = map[string]interface{}{ - "id": pkVal, - "name": name, - constants.DeleteColumnMarker: false, - } - } - - topicConfig := kafkalib.TopicConfig{ - Database: "shop", - TableName: "customer", - Schema: "public", - } - - primaryKeys := []string{"id"} - tableData := optimization.NewTableData(&cols, primaryKeys, topicConfig) - for pk, row := range rowData { - tableData.InsertRow(pk, row) - } - - mergeSQL, err := merge(tableData) - assert.NoError(b.T(), err, "merge failed") - // Check if MERGE INTO FQ Table exists. - assert.True(b.T(), strings.Contains(mergeSQL, "MERGE INTO shop.customer c"), mergeSQL) - // Check for equality merge - - for _, primaryKey := range primaryKeys { - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey, primaryKey))) - } - - for _, rowData := range tableData.RowsData() { - for col, val := range rowData { - switch _col, _ := cols.GetColumn(col); _col.KindDetails { - case typing.String, typing.Array, typing.Struct: - val = fmt.Sprintf("'%v'", val) - } - - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprint(val)), map[string]interface{}{ - "merge": mergeSQL, - "val": val, - }) - } - } -} - -func (b *BigQueryTestSuite) TestMergeSimpleCompositeKey() { - var cols typing.Columns - for colName, kindDetails := range map[string]typing.KindDetails{ - "id": typing.String, - "idA": typing.String, - "name": typing.String, - "nullable_string": typing.String, - constants.DeleteColumnMarker: typing.Boolean, - } { - cols.AddColumn(typing.Column{ - Name: colName, - KindDetails: kindDetails, - }) - } - - rowData := make(map[string]map[string]interface{}) - for idx, name := range []string{"robin", "jacqueline", "dusty"} { - pkVal := fmt.Sprint(map[string]interface{}{ - "$oid": fmt.Sprintf("640127e4beeb1ccfc821c25c++%v", idx), - }) - - rowData[pkVal] = map[string]interface{}{ - "id": pkVal, - "name": name, - constants.DeleteColumnMarker: false, - } - } - - topicConfig := kafkalib.TopicConfig{ - Database: "shop", - TableName: "customer", - Schema: "public", - } - - primaryKeys := []string{"id", "idA"} - tableData := optimization.NewTableData(&cols, primaryKeys, topicConfig) - for pk, row := range rowData { - tableData.InsertRow(pk, row) - } - - mergeSQL, err := merge(tableData) - assert.NoError(b.T(), err, "merge failed") - // Check if MERGE INTO FQ Table exists. - assert.True(b.T(), strings.Contains(mergeSQL, "MERGE INTO shop.customer c"), mergeSQL) - // Check for equality merge - for _, primaryKey := range primaryKeys { - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("c.%s = cc.%s", primaryKey, primaryKey))) - } - - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("c.%s = cc.%s and c.%s = cc.%s", "id", "id", "idA", "idA")), mergeSQL) - for _, rowData := range tableData.RowsData() { - for col, val := range rowData { - switch _col, _ := cols.GetColumn(col); _col.KindDetails { - case typing.String, typing.Array, typing.Struct: - val = fmt.Sprintf("'%v'", val) - } - - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprint(val)), map[string]interface{}{ - "merge": mergeSQL, - "val": val, - }) - } - } - - // Check null string fix. - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf(`'' as nullable_string`)), mergeSQL) -} - -func (b *BigQueryTestSuite) TestMergeJSONKeyAndCompositeHybrid() { - var cols typing.Columns - - for colName, kindDetails := range map[string]typing.KindDetails{ - "id": typing.Struct, - "idA": typing.String, - "idB": typing.String, - "idC": typing.Struct, - "name": typing.String, - constants.DeleteColumnMarker: typing.Boolean, - } { - cols.AddColumn(typing.Column{ - Name: colName, - KindDetails: kindDetails, - }) - } - - rowData := make(map[string]map[string]interface{}) - for idx, name := range []string{"robin", "jacqueline", "dusty"} { - pkVal := fmt.Sprint(map[string]interface{}{ - "$oid": fmt.Sprintf("640127e4beeb1ccfc821c25c++%v", idx), - }) - - rowData[pkVal] = map[string]interface{}{ - "id": pkVal, - "name": name, - constants.DeleteColumnMarker: false, - } - } - - topicConfig := kafkalib.TopicConfig{ - Database: "shop", - TableName: "customer", - Schema: "public", - } - - primaryKeys := []string{"id", "idA", "idB", "idC"} - - tableData := optimization.NewTableData(&cols, primaryKeys, topicConfig) - for pk, row := range rowData { - tableData.InsertRow(pk, row) - } - - mergeSQL, err := merge(tableData) - assert.NoError(b.T(), err, "merge failed") - // Check if MERGE INTO FQ Table exists. - assert.True(b.T(), strings.Contains(mergeSQL, "MERGE INTO shop.customer c"), mergeSQL) - // Check for equality merge - for _, primaryKey := range []string{"id", "idC"} { - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey, primaryKey)), mergeSQL) - } - - for _, primaryKey := range []string{"idA", "idB"} { - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprintf("c.%s = cc.%s", primaryKey, primaryKey))) - } - - for _, rowData := range tableData.RowsData() { - for col, val := range rowData { - switch _col, _ := cols.GetColumn(col); _col.KindDetails { - case typing.String, typing.Array, typing.Struct: - val = fmt.Sprintf("'%v'", val) - } - - assert.True(b.T(), strings.Contains(mergeSQL, fmt.Sprint(val)), map[string]interface{}{ - "merge": mergeSQL, - "val": val, - }) - } - } -} diff --git a/go.mod b/go.mod index 4ec8fd1b6..1dc45b271 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/artie-labs/transfer go 1.19 require ( - cloud.google.com/go/pubsub v1.28.0 + cloud.google.com/go/bigquery v1.51.2 + cloud.google.com/go/pubsub v1.30.0 github.com/DataDog/datadog-go v4.8.3+incompatible github.com/aws/aws-sdk-go-v2/config v1.17.8 github.com/evalphobia/logrus_sentry v0.8.2 @@ -16,19 +17,22 @@ require ( github.com/stretchr/testify v1.8.1 github.com/viant/bigquery v0.2.1-0.20230129024722-24ed6fd5555f go.mongodb.org/mongo-driver v1.11.0 - google.golang.org/api v0.103.0 + google.golang.org/api v0.118.0 gopkg.in/yaml.v3 v3.0.1 ) require ( - cloud.google.com/go v0.105.0 // indirect - cloud.google.com/go/compute v1.13.0 // indirect - cloud.google.com/go/compute/metadata v0.2.1 // indirect - cloud.google.com/go/iam v0.7.0 // indirect + cloud.google.com/go v0.110.0 // indirect + cloud.google.com/go/compute v1.19.0 // indirect + cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/iam v0.13.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/azure-storage-blob-go v0.15.0 // indirect github.com/Microsoft/go-winio v0.6.0 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect + github.com/apache/arrow/go/v12 v12.0.0 // indirect + github.com/apache/thrift v0.16.0 // indirect github.com/aws/aws-sdk-go-v2 v1.16.16 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.8 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.12.21 // indirect @@ -55,16 +59,20 @@ require ( github.com/gabriel-vasile/mimetype v1.4.1 // indirect github.com/getsentry/raven-go v0.2.0 // indirect github.com/go-errors/errors v1.4.2 // indirect - github.com/goccy/go-json v0.9.7 // indirect + github.com/goccy/go-json v0.9.11 // indirect github.com/golang-jwt/jwt/v4 v4.4.1 // indirect - github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect - github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v22.9.29+incompatible // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect - github.com/googleapis/gax-go/v2 v2.7.0 // indirect + github.com/google/s2a-go v0.1.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect + github.com/googleapis/gax-go/v2 v2.8.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.15.11 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect github.com/lestrrat-go/blackmagic v1.0.0 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect @@ -72,6 +80,8 @@ require ( github.com/lestrrat-go/jwx v1.2.25 // indirect github.com/lestrrat-go/option v1.0.0 // indirect github.com/mattn/go-ieproxy v0.0.9 // indirect + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -81,19 +91,20 @@ require ( github.com/viant/scy v0.3.2-0.20220825213848-acc5c59cde78 // indirect github.com/viant/toolbox v0.34.5 // indirect github.com/viant/xunsafe v0.8.2 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect go.opencensus.io v0.24.0 // indirect - golang.org/x/crypto v0.0.0-20221012134737-56aed061732a // indirect - golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/net v0.7.0 // indirect - golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect + golang.org/x/crypto v0.7.0 // indirect + golang.org/x/mod v0.8.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/oauth2 v0.7.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect - golang.org/x/tools v0.1.12 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect - google.golang.org/grpc v1.51.0 // indirect - google.golang.org/protobuf v1.28.1 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/grpc v1.55.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 610880545..2e206801d 100644 --- a/go.sum +++ b/go.sum @@ -2,18 +2,22 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.37.0/go.mod h1:TS1dMSSfndXH133OKGwekG838Om/cQT0BUHV3HcBgoo= -cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= -cloud.google.com/go v0.105.0/go.mod h1:PrLgOJNe5nfE9UMxKxgXj4mD3voiP+YQ6gdt6KMFOKM= -cloud.google.com/go/compute v1.13.0 h1:AYrLkB8NPdDRslNp4Jxmzrhdr03fUAIDbiGFjLWowoU= -cloud.google.com/go/compute v1.13.0/go.mod h1:5aPTS0cUNMIc1CE546K+Th6weJUNQErARyZtRXDJ8GE= -cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48= -cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= -cloud.google.com/go/iam v0.7.0 h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs= -cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQEYOeg= -cloud.google.com/go/kms v1.6.0 h1:OWRZzrPmOZUzurjI2FBGtgY2mB1WaJkqhw6oIwSj0Yg= -cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs= -cloud.google.com/go/pubsub v1.28.0 h1:XzabfdPx/+eNrsVVGLFgeUnQQKPGkMb8klRCeYK52is= -cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9xznmcXX8= +cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys= +cloud.google.com/go v0.110.0/go.mod h1:SJnCLqQ0FCFGSZMUNUf84MV3Aia54kn7pi8st7tMzaY= +cloud.google.com/go/bigquery v1.51.2 h1:p6SZQJBh64rNJB/9V5O0jvMBI8O/XV5rJKlhmmCU+2o= +cloud.google.com/go/bigquery v1.51.2/go.mod h1:6YYSJ37dAY1HyMDq/+XByPmzsC52MgzNXhxjlTzIVCM= +cloud.google.com/go/compute v1.19.0 h1:+9zda3WGgW1ZSTlVppLCYFIr48Pa35q1uG2N1itbCEQ= +cloud.google.com/go/compute v1.19.0/go.mod h1:rikpw2y+UMidAe9tISo04EHNOIf42RLYF/q8Bs93scU= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= +cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +cloud.google.com/go/datacatalog v1.13.0 h1:4H5IJiyUE0X6ShQBqgFFZvGGcrwGVndTwUSLP4c52gw= +cloud.google.com/go/iam v0.13.0 h1:+CmB+K0J/33d0zSQ9SlFWUeCCEn5XJA0ZMZ3pHE9u8k= +cloud.google.com/go/iam v0.13.0/go.mod h1:ljOg+rcNfzZ5d6f1nAUJ8ZIxOaZUVoS14bKCtaLZ/D0= +cloud.google.com/go/kms v1.10.1 h1:7hm1bRqGCA1GBRQUrp831TwJ9TWhP+tvLuP497CQS2g= +cloud.google.com/go/longrunning v0.4.1 h1:v+yFJOfKC3yZdY6ZUI933pIYdhyhV8S3NpWrXWmg7jM= +cloud.google.com/go/pubsub v1.30.0 h1:vCge8m7aUKBJYOgrZp7EsNDf6QMd2CAlXZqWTn3yq6s= +cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4= +cloud.google.com/go/storage v1.29.0 h1:6weCgzRvMg7lzuUurI4697AqIRPU1SvzHhynwpW31jI= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU= @@ -40,13 +44,20 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q= github.com/DataDog/datadog-go v4.8.3+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2yDvg= github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= +github.com/apache/arrow/go/v12 v12.0.0 h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc= +github.com/apache/arrow/go/v12 v12.0.0/go.mod h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg= +github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= +github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts= github.com/aws/aws-sdk-go-v2 v1.16.16 h1:M1fj4FE2lB4NzRb9Y0xdWsn2P0+2UHVxwKyOa4YJNjk= github.com/aws/aws-sdk-go-v2 v1.16.16/go.mod h1:SwiyXi/1zTUZ6KIAmLK5V5ll8SiURNUYOqTerZPaF9k= @@ -104,10 +115,15 @@ github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s= github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -122,6 +138,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evalphobia/logrus_sentry v0.8.2 h1:dotxHq+YLZsT1Bb45bB5UQbfCh3gM/nFFetyN46VoDQ= github.com/evalphobia/logrus_sentry v0.8.2/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc= @@ -149,18 +166,21 @@ github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2H github.com/go-fonts/stix v0.1.0/go.mod h1:w/c1f0ldAUlJmLBvlbkvVXLAD+tAMqobIIQpmnUIzUY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U= -github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.9.11 h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk= +github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang-jwt/jwt/v4 v4.4.1 h1:pC5DB52sCeK48Wlb9oPcdhnjkz1TKt1D/P7WKJ0kUcQ= github.com/golang-jwt/jwt/v4 v4.4.1/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -174,10 +194,13 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v22.9.29+incompatible h1:3UBb679lq3V/O9rgzoJmnkP1jJzmC9OdFzITUBkLU/A= @@ -196,18 +219,22 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= +github.com/google/s2a-go v0.1.0 h1:3Qm0liEiCErViKERO2Su5wp+9PfMRiuS6XB5FvpKnYQ= +github.com/google/s2a-go v0.1.0/go.mod h1:OJpEgntRZo8ugHpF9hkoLJbS5dSI20XZeXJ9JVywLlM= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/googleapis/enterprise-certificate-proxy v0.2.0 h1:y8Yozv7SZtlU//QXbezB6QkpuE6jMD2/gfzk4AftXjs= -github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= +github.com/googleapis/enterprise-certificate-proxy v0.2.3 h1:yk9/cqRKtT9wXZSsRH9aurXEpJX+U6FLtpYTdC3R06k= +github.com/googleapis/enterprise-certificate-proxy v0.2.3/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= -github.com/googleapis/gax-go/v2 v2.7.0 h1:IcsPKeInNvYi7eqSaDjiZqDDKu5rsmunY0Y1YupQSSQ= -github.com/googleapis/gax-go/v2 v2.7.0/go.mod h1:TEop28CZZQ2y+c0VxMUmu1lV+fQx57QpBWsYpwqHJx8= +github.com/googleapis/gax-go/v2 v2.8.0 h1:UBtEZqx1bjXtOQ5BVTkuYghXrr3N4V123VKJK67vJZc= +github.com/googleapis/gax-go/v2 v2.8.0/go.mod h1:4orTrqY6hXxxaUL4LHIPl6lGo8vAE38/qKbhSAKP6QI= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= @@ -224,15 +251,19 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= @@ -256,6 +287,10 @@ github.com/mattn/go-ieproxy v0.0.9 h1:RvVbLiMv/Hbjf1gRaC2AQyzwbdVhdId7D2vPnXIml4 github.com/mattn/go-ieproxy v0.0.9/go.mod h1:eF30/rfdQUO9EnzNIZQr0r9HiLMlZNCpJkHbmMuOAE0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= +github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= +github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= @@ -281,6 +316,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/segmentio/kafka-go v0.4.34/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0= @@ -356,6 +392,9 @@ github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4= github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE= go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= @@ -372,17 +411,18 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20221012134737-56aed061732a h1:NmSIgad6KjE6VvHciPZuNRTKxGhlPfD6OA87W/PLkqg= -golang.org/x/crypto v0.0.0-20221012134737-56aed061732a/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20191002040644-a1355ae1e2c3 h1:n9HxLrNxWWtEb1cA950nuEEj3QnKbtsCJ6KjcgisNUs= golang.org/x/exp v0.0.0-20191002040644-a1355ae1e2c3/go.mod h1:NOZ3BPKG0ec/BKJQgnvsSFpcKLM5xXVWnvZS97DWHgE= +golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -400,9 +440,10 @@ golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -418,22 +459,24 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220630215102-69896b714898/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220706163947-c90051bbdb60/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= -golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= +golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= +golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -462,14 +505,15 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= +golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -477,8 +521,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -494,8 +538,8 @@ golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -504,16 +548,16 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3j golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= -gonum.org/v1/gonum v0.9.3 h1:DnoIG+QAMaF5NvxnGe/oKsgKcAc6PcUyl8q0VetfQ8s= gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= gonum.org/v1/plot v0.9.0/go.mod h1:3Pcqqmp6RHvJI72kgb8fThyUnav364FOsdDo2aGW5lY= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= -google.golang.org/api v0.103.0 h1:9yuVqlu2JCvcLg9p8S3fcFLZij8EPSyvODIY1rkMizQ= -google.golang.org/api v0.103.0/go.mod h1:hGtW6nK1AC+d9si/UBhw8Xli+QMOf6xyNAyJw4qU9w0= +google.golang.org/api v0.118.0 h1:FNfHq9Z2GKULxu7cEhCaB0wWQHg43UpomrrN+24ZRdE= +google.golang.org/api v0.118.0/go.mod h1:76TtD3vkgmZ66zZzp72bUUklpmQmKlhh6sYtIjYK+5E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -529,8 +573,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= -google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd h1:OjndDrsik+Gt+e6fs45z9AxiewiKyLKYpA45W5Kpkks= -google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd/go.mod h1:cTsE614GARnxrLsqKREzmNYJACSWWpAWdNMwnD7c2BE= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -543,8 +587,9 @@ google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= -google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U= -google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= +google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= +google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= +google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -557,8 +602,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/lib/array/strings.go b/lib/array/strings.go index 7f661129c..0ae8d0a47 100644 --- a/lib/array/strings.go +++ b/lib/array/strings.go @@ -12,14 +12,14 @@ import ( "github.com/artie-labs/transfer/lib/typing" ) -func InterfaceToArrayStringEscaped(val interface{}) (string, error) { +func InterfaceToArrayString(val interface{}) ([]string, error) { if val == nil { - return "", nil + return nil, nil } list := reflect.ValueOf(val) if list.Kind() != reflect.Slice { - return "", fmt.Errorf("wrong data type") + return nil, fmt.Errorf("wrong data type") } var vals []string @@ -39,16 +39,16 @@ func InterfaceToArrayStringEscaped(val interface{}) (string, error) { if kind == reflect.Map || kind == reflect.Struct || shouldParse { bytes, err := json.Marshal(value) if err != nil { - return "", err + return nil, err } - vals = append(vals, stringutil.Wrap(string(bytes))) + vals = append(vals, string(bytes)) } else { - vals = append(vals, stringutil.Wrap(value)) + vals = append(vals, stringutil.WrapNoQuotes(value)) } } - return fmt.Sprintf("[%s]", strings.Join(vals, ",")), nil + return vals, nil } type StringsJoinAddPrefixArgs struct { diff --git a/lib/array/strings_test.go b/lib/array/strings_test.go index 2e8687070..351363a78 100644 --- a/lib/array/strings_test.go +++ b/lib/array/strings_test.go @@ -15,7 +15,7 @@ func TestToArrayString(t *testing.T) { name string val interface{} - expectedList string + expectedList []string expectedErr error } @@ -26,23 +26,23 @@ func TestToArrayString(t *testing.T) { { name: "wrong data type", val: true, - expectedList: "", + expectedList: nil, expectedErr: fmt.Errorf("wrong data type"), }, { name: "list of numbers", val: []int{1, 2, 3, 4, 5}, - expectedList: "['1','2','3','4','5']", + expectedList: []string{"1", "2", "3", "4", "5"}, }, { name: "list of strings", val: []string{"abc", "def", "ghi"}, - expectedList: "['abc','def','ghi']", + expectedList: []string{"abc", "def", "ghi"}, }, { name: "list of bools", val: []bool{true, false, true}, - expectedList: "['true','false','true']", + expectedList: []string{"true", "false", "true"}, }, { name: "array of nested objects", @@ -54,7 +54,7 @@ func TestToArrayString(t *testing.T) { "hello": "world", }, }, - expectedList: `['{"foo":"bar"}','{"hello":"world"}']`, + expectedList: []string{`{"foo":"bar"}`, `{"hello":"world"}`}, }, { name: "array of nested lists", @@ -66,12 +66,12 @@ func TestToArrayString(t *testing.T) { "abc", "def", }, }, - expectedList: `['[foo bar]','[abc def]']`, + expectedList: []string{"[foo bar]", "[abc def]"}, }, } for _, testCase := range testCases { - actualString, actualErr := InterfaceToArrayStringEscaped(testCase.val) + actualString, actualErr := InterfaceToArrayString(testCase.val) assert.Equal(t, testCase.expectedList, actualString, testCase.name) assert.Equal(t, testCase.expectedErr, actualErr, testCase.name) } diff --git a/lib/config/config.go b/lib/config/config.go index ef346d0eb..0b1ff507e 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -161,9 +161,12 @@ func (c *Config) Validate() error { c.FlushIntervalSeconds, flushIntervalSecondsStart, flushIntervalSecondsEnd) } - if !numbers.BetweenEq(bufferPoolSizeStart, bufferPoolSizeEnd, int(c.BufferRows)) { - return fmt.Errorf("config is invalid, buffer pool is outside of our range: %v, expected start: %v, end: %v", - c.BufferRows, bufferPoolSizeStart, bufferPoolSizeEnd) + if bufferPoolSizeStart > int(c.BufferRows) { + return fmt.Errorf("config is invalid, buffer pool is too small, min value: %v, actual: %v", bufferPoolSizeStart, int(c.BufferRows)) + } + + if c.Output == constants.Snowflake && int(c.BufferRows) > bufferPoolSizeEnd { + return fmt.Errorf("snowflake does not allow more than 15k rows, actual: %v", int(c.BufferRows)) } if !constants.IsValidDestination(c.Output) { diff --git a/lib/config/config_test.go b/lib/config/config_test.go index 7eebf4984..63daae01c 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -434,17 +434,26 @@ func TestConfig_Validate(t *testing.T) { pubsub.PathToCredentials = "/tmp/abc" assert.Nil(t, cfg.Validate()) + // Check Snowflake and BigQuery for large rows + // Snowflake should error, BigQuery will not. + cfg.Output = constants.Snowflake + cfg.BufferRows = bufferPoolSizeEnd + 1 + assert.Contains(t, cfg.Validate().Error(), "snowflake does not allow more than 15k rows") + + cfg.Output = constants.BigQuery + assert.Nil(t, cfg.Validate()) + // Test the various flush error settings. - for _, count := range []int{0, 5000000} { + for i := 0; i < bufferPoolSizeStart; i++ { // Reset buffer rows. cfg.BufferRows = 500 - cfg.FlushIntervalSeconds = count + cfg.FlushIntervalSeconds = i assert.Contains(t, cfg.Validate().Error(), "flush interval is outside of our range") // Reset Flush cfg.FlushIntervalSeconds = 20 - cfg.BufferRows = uint(count) - assert.Contains(t, cfg.Validate().Error(), "buffer pool is outside of our range") + cfg.BufferRows = uint(i) + assert.Contains(t, cfg.Validate().Error(), "buffer pool is too small") } cfg.BufferRows = 500 diff --git a/lib/config/constants/constants.go b/lib/config/constants/constants.go index defb1a92d..aee4ba346 100644 --- a/lib/config/constants/constants.go +++ b/lib/config/constants/constants.go @@ -14,6 +14,8 @@ const ( DBZPostgresAltFormat = "debezium.postgres.wal2json" DBZMongoFormat = "debezium.mongodb" DBZMySQLFormat = "debezium.mysql" + + BigQueryTempTableTTL = 6 * time.Hour ) // ExporterKind is used for the Telemetry package diff --git a/lib/dwh/ddl/ddl.go b/lib/dwh/ddl/ddl.go index e490c14eb..8e5aa9f64 100644 --- a/lib/dwh/ddl/ddl.go +++ b/lib/dwh/ddl/ddl.go @@ -3,28 +3,47 @@ package ddl import ( "context" "fmt" + "strings" "time" "github.com/artie-labs/transfer/lib/config/constants" "github.com/artie-labs/transfer/lib/dwh" "github.com/artie-labs/transfer/lib/dwh/types" + "github.com/artie-labs/transfer/lib/logger" "github.com/artie-labs/transfer/lib/typing" ) type AlterTableArgs struct { - Dwh dwh.DataWarehouse - Tc *types.DwhTableConfig - FqTableName string - CreateTable bool - ColumnOp constants.ColumnOperation + Dwh dwh.DataWarehouse + Tc *types.DwhTableConfig + FqTableName string + CreateTable bool + TemporaryTable bool + ColumnOp constants.ColumnOperation CdcTime time.Time } +func DropTemporaryTable(ctx context.Context, dwh dwh.DataWarehouse, fqTableName string) { + if strings.Contains(fqTableName, constants.ArtiePrefix) { + // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#drop_table_statement + _, err := dwh.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", fqTableName)) + if err != nil { + logger.FromContext(ctx).WithError(err).Warn("failed to drop temporary table, it will get garbage collected by the TTL...") + } + } + return +} + func AlterTable(_ context.Context, args AlterTableArgs, cols ...typing.Column) error { + // You can't DROP a column and try to create a table at the same time. + if args.ColumnOp == constants.Delete && args.CreateTable { + return fmt.Errorf("incompatiable operation - cannot drop columns and create table at the asme time, args: %v", args) + } + var mutateCol []typing.Column - var colSQLPart string - var err error + // It's okay to combine since args.ColumnOp only takes one of: `Delete` or `Add` + var colSQLParts []string for _, col := range cols { if col.KindDetails == typing.Invalid { // Let's not modify the table if the column kind is invalid @@ -39,28 +58,48 @@ func AlterTable(_ context.Context, args AlterTableArgs, cols ...typing.Column) e mutateCol = append(mutateCol, col) switch args.ColumnOp { case constants.Add: - colSQLPart = fmt.Sprintf("%s %s", col.Name, typing.KindToDWHType(col.KindDetails, args.Dwh.Label())) + colSQLParts = append(colSQLParts, fmt.Sprintf("%s %s", col.Name, typing.KindToDWHType(col.KindDetails, args.Dwh.Label()))) case constants.Delete: - colSQLPart = fmt.Sprintf("%s", col.Name) + colSQLParts = append(colSQLParts, fmt.Sprintf("%s", col.Name)) } + } - // If the table does not exist, create it. - sqlQuery := fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", args.FqTableName, args.ColumnOp, colSQLPart) - if args.CreateTable { - sqlQuery = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", args.FqTableName, colSQLPart) - args.CreateTable = false + var err error + if args.CreateTable { + var sqlQuery string + if args.TemporaryTable { + // Snowflake has this feature too, but we don't need it as our CTE approach with Snowflake is extremely performant. + if args.Dwh.Label() != constants.BigQuery { + return fmt.Errorf("unexpected temporary table for destination: %v", args.Dwh.Label()) + } + expiry := time.Now().UTC().Add(constants.BigQueryTempTableTTL) + sqlQuery = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s) OPTIONS (expiration_timestamp = TIMESTAMP("%s"))`, + args.FqTableName, strings.Join(colSQLParts, ","), typing.BigQueryDate(expiry)) + } else { + sqlQuery = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (%s)", args.FqTableName, strings.Join(colSQLParts, ",")) } _, err = args.Dwh.Exec(sqlQuery) - if err != nil && ColumnAlreadyExistErr(err, args.Dwh.Label()) { + if ColumnAlreadyExistErr(err, args.Dwh.Label()) { err = nil } else if err != nil { return err } + } else { + for _, colSQLPart := range colSQLParts { + sqlQuery := fmt.Sprintf("ALTER TABLE %s %s COLUMN %s", args.FqTableName, args.ColumnOp, colSQLPart) + _, err = args.Dwh.Exec(sqlQuery) + if ColumnAlreadyExistErr(err, args.Dwh.Label()) { + err = nil + } else if err != nil { + return err + } + } } if err == nil { - args.Tc.MutateInMemoryColumns(args.CreateTable, args.ColumnOp, mutateCol...) + // createTable = false since it all successfully updated. + args.Tc.MutateInMemoryColumns(false, args.ColumnOp, mutateCol...) } return nil diff --git a/lib/dwh/ddl/ddl_bq_test.go b/lib/dwh/ddl/ddl_bq_test.go index ba0265ed4..ad5edff41 100644 --- a/lib/dwh/ddl/ddl_bq_test.go +++ b/lib/dwh/ddl/ddl_bq_test.go @@ -17,6 +17,32 @@ import ( "github.com/artie-labs/transfer/lib/typing" ) +func (d *DDLTestSuite) TestDropTemporaryTable() { + doNotDropTables := []string{ + "foo", + "bar", + "abcd", + "customers.customers", + } + + for _, doNotDropTable := range doNotDropTables { + ddl.DropTemporaryTable(d.ctx, d.bigQueryStore, doNotDropTable) + assert.Equal(d.T(), 0, d.fakeBigQueryStore.ExecCallCount()) + } + + for index, table := range doNotDropTables { + fullTableName := fmt.Sprintf("%s_%s", table, constants.ArtiePrefix) + ddl.DropTemporaryTable(d.ctx, d.bigQueryStore, fullTableName) + + count := index + 1 + assert.Equal(d.T(), count, d.fakeBigQueryStore.ExecCallCount()) + + query, _ := d.fakeBigQueryStore.ExecArgsForCall(index) + assert.Equal(d.T(), fmt.Sprintf("DROP TABLE IF EXISTS %s", fullTableName), query) + + } +} + func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuery() { ctx := context.Background() ts := time.Now() @@ -284,7 +310,7 @@ func (d *DDLTestSuite) TestAlterTableDropColumnsBigQuerySafety() { FqTableName: fqName, CreateTable: tc.CreateTable, ColumnOp: constants.Delete, - CdcTime: ts.Add(2*constants.DeletionConfidencePadding), + CdcTime: ts.Add(2 * constants.DeletionConfidencePadding), } err := ddl.AlterTable(ctx, alterTableArgs, column) diff --git a/lib/dwh/ddl/ddl_sflk_test.go b/lib/dwh/ddl/ddl_sflk_test.go index 782457e69..b98479fce 100644 --- a/lib/dwh/ddl/ddl_sflk_test.go +++ b/lib/dwh/ddl/ddl_sflk_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "time" "github.com/stretchr/testify/assert" @@ -17,39 +16,93 @@ import ( ) func (d *DDLTestSuite) TestCreateTable() { - ctx := context.Background() - fqTable := "demo.public.experiments" - d.snowflakeStore.GetConfigMap().AddTableToConfig(fqTable, types.NewDwhTableConfig(typing.Columns{}, nil, true, true)) - tc := d.snowflakeStore.GetConfigMap().TableConfig(fqTable) + type _testCase struct { + name string + cols []typing.Column - cols := []typing.Column{ + expectedQuery string + } + + var ( + happyPathCols = []typing.Column{ + { + Name: "user_id", + KindDetails: typing.String, + }, + } + twoCols = []typing.Column{ + { + Name: "user_id", + KindDetails: typing.String, + }, + { + Name: "enabled", + KindDetails: typing.Boolean, + }, + } + bunchOfCols = []typing.Column{ + { + Name: "user_id", + KindDetails: typing.String, + }, + { + Name: "enabled_boolean", + KindDetails: typing.Boolean, + }, + { + Name: "array", + KindDetails: typing.Array, + }, + { + Name: "struct", + KindDetails: typing.Struct, + }, + } + ) + + testCases := []_testCase{ { - Name: "key", - KindDetails: typing.String, + name: "happy path", + cols: happyPathCols, + expectedQuery: "CREATE TABLE IF NOT EXISTS demo.public.experiments (user_id string)", }, { - Name: "enabled", - KindDetails: typing.Boolean, + name: "happy path + enabled", + cols: twoCols, + expectedQuery: "CREATE TABLE IF NOT EXISTS demo.public.experiments (user_id string,enabled boolean)", + }, + { + name: "complex table creation", + cols: bunchOfCols, + expectedQuery: "CREATE TABLE IF NOT EXISTS demo.public.experiments (user_id string,enabled_boolean boolean,array array,struct variant)", }, } - alterTableArgs := ddl.AlterTableArgs{ - Dwh: d.snowflakeStore, - Tc: tc, - FqTableName: fqTable, - CreateTable: tc.CreateTable, - ColumnOp: constants.Add, - CdcTime: time.Now().UTC(), - } - err := ddl.AlterTable(ctx, alterTableArgs, cols...) - assert.NoError(d.T(), err) + for index, testCase := range testCases { + ctx := context.Background() + fqTable := "demo.public.experiments" + d.snowflakeStore.GetConfigMap().AddTableToConfig(fqTable, types.NewDwhTableConfig(typing.Columns{}, nil, true, true)) + tc := d.snowflakeStore.GetConfigMap().TableConfig(fqTable) + + alterTableArgs := ddl.AlterTableArgs{ + Dwh: d.snowflakeStore, + Tc: tc, + FqTableName: fqTable, + CreateTable: tc.CreateTable, + ColumnOp: constants.Add, + CdcTime: time.Now().UTC(), + } - execQuery, _ := d.fakeSnowflakeStore.ExecArgsForCall(0) - assert.Equal(d.T(), strings.Contains(execQuery, "CREATE TABLE IF NOT EXISTS"), true, execQuery) + err := ddl.AlterTable(ctx, alterTableArgs, testCase.cols...) + assert.NoError(d.T(), err, testCase.name) - execQuery, _ = d.fakeSnowflakeStore.ExecArgsForCall(1) - assert.Equal(d.T(), fmt.Sprintf("ALTER TABLE %s add COLUMN enabled boolean", fqTable), execQuery, execQuery) - assert.Equal(d.T(), d.snowflakeStore.GetConfigMap().TableConfig(fqTable).CreateTable, false, d.snowflakeStore.GetConfigMap().TableConfig(fqTable)) + execQuery, _ := d.fakeSnowflakeStore.ExecArgsForCall(index) + assert.Equal(d.T(), testCase.expectedQuery, execQuery, testCase.name) + + // Check if the table is now marked as created where `CreateTable = false`. + assert.Equal(d.T(), d.snowflakeStore.GetConfigMap().TableConfig(fqTable).CreateTable, + false, d.snowflakeStore.GetConfigMap().TableConfig(fqTable), testCase.name) + } } func (d *DDLTestSuite) TestAlterComplexObjects() { ctx := context.Background() diff --git a/lib/dwh/dml/merge.go b/lib/dwh/dml/merge.go index 4c96031b1..61aaebcf8 100644 --- a/lib/dwh/dml/merge.go +++ b/lib/dwh/dml/merge.go @@ -19,10 +19,11 @@ type MergeArgument struct { Columns []string ColumnsToTypes typing.Columns - // BigQueryTypeCasting - This is used for columns that have JSON value. This is required for BigQuery - // We will be casting the value in this column as such: `TO_JSON_STRING()` - BigQueryTypeCasting bool - SoftDelete bool + // BigQuery is used to: + // 1) escape JSON columns + // 2) merge temp table vs. subquery + BigQuery bool + SoftDelete bool } func MergeStatement(m MergeArgument) (string, error) { @@ -46,7 +47,7 @@ func MergeStatement(m MergeArgument) (string, error) { return "", fmt.Errorf("error: column: %s does not exist in columnToType: %v", primaryKey, m.ColumnsToTypes) } - if m.BigQueryTypeCasting && pkCol.KindDetails.Kind == typing.Struct.Kind { + if m.BigQuery && pkCol.KindDetails.Kind == typing.Struct.Kind { // BigQuery requires special casting to compare two JSON objects. equalitySQL = fmt.Sprintf("TO_JSON_STRING(c.%s) = TO_JSON_STRING(cc.%s)", primaryKey, primaryKey) } @@ -54,9 +55,14 @@ func MergeStatement(m MergeArgument) (string, error) { equalitySQLParts = append(equalitySQLParts, equalitySQL) } + subQuery := fmt.Sprintf("( %s )", m.SubQuery) + if m.BigQuery { + subQuery = m.SubQuery + } + if m.SoftDelete { return fmt.Sprintf(` - MERGE INTO %s c using (%s) as cc on %s + MERGE INTO %s c using %s as cc on %s when matched %sthen UPDATE SET %s when not matched AND IFNULL(cc.%s, false) = false then INSERT @@ -67,9 +73,9 @@ func MergeStatement(m MergeArgument) (string, error) { ( %s ); - `, m.FqTableName, m.SubQuery, strings.Join(equalitySQLParts, " and "), + `, m.FqTableName, subQuery, strings.Join(equalitySQLParts, " and "), // Update + Soft Deletion - idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.BigQueryTypeCasting), + idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.BigQuery), // Insert constants.DeleteColumnMarker, strings.Join(m.Columns, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ @@ -94,7 +100,7 @@ func MergeStatement(m MergeArgument) (string, error) { } return fmt.Sprintf(` - MERGE INTO %s c using (%s) as cc on %s + MERGE INTO %s c using %s as cc on %s when matched AND cc.%s then DELETE when matched AND IFNULL(cc.%s, false) = false %sthen UPDATE SET %s @@ -106,11 +112,11 @@ func MergeStatement(m MergeArgument) (string, error) { ( %s ); - `, m.FqTableName, m.SubQuery, strings.Join(equalitySQLParts, " and "), + `, m.FqTableName, subQuery, strings.Join(equalitySQLParts, " and "), // Delete constants.DeleteColumnMarker, // Update - constants.DeleteColumnMarker, idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.BigQueryTypeCasting), + constants.DeleteColumnMarker, idempotentClause, array.ColumnsUpdateQuery(m.Columns, m.ColumnsToTypes, m.BigQuery), // Insert constants.DeleteColumnMarker, strings.Join(m.Columns, ","), array.StringsJoinAddPrefix(array.StringsJoinAddPrefixArgs{ diff --git a/lib/dwh/dml/merge_bigquery_test.go b/lib/dwh/dml/merge_bigquery_test.go new file mode 100644 index 000000000..51fd3a980 --- /dev/null +++ b/lib/dwh/dml/merge_bigquery_test.go @@ -0,0 +1,72 @@ +package dml + +import ( + "testing" + + "github.com/artie-labs/transfer/lib/config/constants" + + "github.com/stretchr/testify/assert" + + "github.com/artie-labs/transfer/lib/typing" +) + +func TestMergeStatement_TempTable(t *testing.T) { + var cols typing.Columns + cols.AddColumn(typing.Column{ + Name: "order_id", + KindDetails: typing.Integer, + }) + cols.AddColumn(typing.Column{ + Name: "name", + KindDetails: typing.String, + }) + cols.AddColumn(typing.Column{ + Name: constants.DeleteColumnMarker, + KindDetails: typing.Boolean, + }) + + mergeArg := MergeArgument{ + FqTableName: "customers.orders", + SubQuery: "customers.orders_tmp", + PrimaryKeys: []string{"order_id"}, + Columns: []string{"order_id", "name", constants.DeleteColumnMarker}, + ColumnsToTypes: cols, + BigQuery: true, + SoftDelete: false, + } + + mergeSQL, err := MergeStatement(mergeArg) + assert.NoError(t, err) + + assert.Contains(t, mergeSQL, "MERGE INTO customers.orders c using customers.orders_tmp as cc on c.order_id = cc.order_id", mergeSQL) +} + +func TestMergeStatement_JSONKey(t *testing.T) { + var cols typing.Columns + cols.AddColumn(typing.Column{ + Name: "order_oid", + KindDetails: typing.Struct, + }) + cols.AddColumn(typing.Column{ + Name: "name", + KindDetails: typing.String, + }) + cols.AddColumn(typing.Column{ + Name: constants.DeleteColumnMarker, + KindDetails: typing.Boolean, + }) + + mergeArg := MergeArgument{ + FqTableName: "customers.orders", + SubQuery: "customers.orders_tmp", + PrimaryKeys: []string{"order_oid"}, + Columns: []string{"order_oid", "name", constants.DeleteColumnMarker}, + ColumnsToTypes: cols, + BigQuery: true, + SoftDelete: false, + } + + mergeSQL, err := MergeStatement(mergeArg) + assert.NoError(t, err) + assert.Contains(t, mergeSQL, "MERGE INTO customers.orders c using customers.orders_tmp as cc on TO_JSON_STRING(c.order_oid) = TO_JSON_STRING(cc.order_oid)", mergeSQL) +} diff --git a/lib/dwh/dml/merge_test.go b/lib/dwh/dml/merge_test.go index df943948e..5f632e5be 100644 --- a/lib/dwh/dml/merge_test.go +++ b/lib/dwh/dml/merge_test.go @@ -39,14 +39,14 @@ func TestMergeStatementSoftDelete(t *testing.T) { for _, idempotentKey := range []string{"", "updated_at"} { mergeSQL, err := MergeStatement(MergeArgument{ - FqTableName: fqTable, - SubQuery: subQuery, - IdempotentKey: idempotentKey, - PrimaryKeys: []string{"id"}, - Columns: cols, - ColumnsToTypes: _cols, - BigQueryTypeCasting: false, - SoftDelete: true, + FqTableName: fqTable, + SubQuery: subQuery, + IdempotentKey: idempotentKey, + PrimaryKeys: []string{"id"}, + Columns: cols, + ColumnsToTypes: _cols, + BigQuery: false, + SoftDelete: true, }) assert.NoError(t, err) assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL) @@ -84,14 +84,14 @@ func TestMergeStatement(t *testing.T) { subQuery := fmt.Sprintf("SELECT %s from (values %s) as %s(%s)", strings.Join(cols, ","), strings.Join(tableValues, ","), "_tbl", strings.Join(cols, ",")) mergeSQL, err := MergeStatement(MergeArgument{ - FqTableName: fqTable, - SubQuery: subQuery, - IdempotentKey: "", - PrimaryKeys: []string{"id"}, - Columns: cols, - ColumnsToTypes: _cols, - BigQueryTypeCasting: false, - SoftDelete: false, + FqTableName: fqTable, + SubQuery: subQuery, + IdempotentKey: "", + PrimaryKeys: []string{"id"}, + Columns: cols, + ColumnsToTypes: _cols, + BigQuery: false, + SoftDelete: false, }) assert.NoError(t, err) assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL) @@ -124,14 +124,14 @@ func TestMergeStatementIdempotentKey(t *testing.T) { }) mergeSQL, err := MergeStatement(MergeArgument{ - FqTableName: fqTable, - SubQuery: subQuery, - IdempotentKey: "updated_at", - PrimaryKeys: []string{"id"}, - Columns: cols, - ColumnsToTypes: _cols, - BigQueryTypeCasting: false, - SoftDelete: false, + FqTableName: fqTable, + SubQuery: subQuery, + IdempotentKey: "updated_at", + PrimaryKeys: []string{"id"}, + Columns: cols, + ColumnsToTypes: _cols, + BigQuery: false, + SoftDelete: false, }) assert.NoError(t, err) assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL) @@ -169,14 +169,14 @@ func TestMergeStatementCompositeKey(t *testing.T) { }) mergeSQL, err := MergeStatement(MergeArgument{ - FqTableName: fqTable, - SubQuery: subQuery, - IdempotentKey: "updated_at", - PrimaryKeys: []string{"id", "another_id"}, - Columns: cols, - ColumnsToTypes: _cols, - BigQueryTypeCasting: false, - SoftDelete: false, + FqTableName: fqTable, + SubQuery: subQuery, + IdempotentKey: "updated_at", + PrimaryKeys: []string{"id", "another_id"}, + Columns: cols, + ColumnsToTypes: _cols, + BigQuery: false, + SoftDelete: false, }) assert.NoError(t, err) assert.True(t, strings.Contains(mergeSQL, fmt.Sprintf("MERGE INTO %s", fqTable)), mergeSQL) diff --git a/lib/kafkalib/topic.go b/lib/kafkalib/topic.go index 578a0c753..e3f08617c 100644 --- a/lib/kafkalib/topic.go +++ b/lib/kafkalib/topic.go @@ -2,6 +2,7 @@ package kafkalib import ( "fmt" + "github.com/artie-labs/transfer/lib/array" "github.com/artie-labs/transfer/lib/config/constants" ) diff --git a/lib/optimization/event.go b/lib/optimization/event.go index c7fe3da96..23c654a69 100644 --- a/lib/optimization/event.go +++ b/lib/optimization/event.go @@ -2,9 +2,12 @@ package optimization import ( "context" + "fmt" "strings" "time" + "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/artie" "github.com/artie-labs/transfer/lib/config" "github.com/artie-labs/transfer/lib/config/constants" @@ -28,6 +31,9 @@ type TableData struct { // This is used for the automatic schema detection LatestCDCTs time.Time approxSize int + + // BigQuery specific. We are creating a temporary table to execute a merge, in order to avoid in-memory tables via UNION ALL. + temporaryTableSuffix string } func (t *TableData) SetInMemoryColumns(columns *typing.Columns) { @@ -60,6 +66,7 @@ func NewTableData(inMemoryColumns *typing.Columns, primaryKeys []string, topicCo PrimaryKeys: primaryKeys, TopicConfig: topicConfig, PartitionsToLastMessage: map[string][]artie.Message{}, + temporaryTableSuffix: fmt.Sprintf("%s_%s", constants.ArtiePrefix, stringutil.Random(10)), } } @@ -110,6 +117,10 @@ func (t *TableData) Rows() uint { return uint(len(t.rowsData)) } +func (t *TableData) TempTableSuffix() string { + return t.temporaryTableSuffix +} + func (t *TableData) ShouldFlush(ctx context.Context) bool { settings := config.FromContext(ctx) return t.Rows() > settings.Config.BufferRows || t.approxSize > settings.Config.FlushSizeKb*1024 diff --git a/lib/stringutil/strings.go b/lib/stringutil/strings.go index e4bcb6332..ca1b6b20d 100644 --- a/lib/stringutil/strings.go +++ b/lib/stringutil/strings.go @@ -2,7 +2,9 @@ package stringutil import ( "fmt" + "math/rand" "strings" + "time" ) func Reverse(val string) string { @@ -15,6 +17,13 @@ func Reverse(val string) string { return string(reverseParts) } +func WrapNoQuotes(colVal interface{}) string { + // Escape line breaks, JSON_PARSE does not like it. + colVal = strings.ReplaceAll(fmt.Sprint(colVal), `\`, `\\`) + // The normal string escape is to do for O'Reilly is O\\'Reilly, but Snowflake escapes via \' + return strings.ReplaceAll(fmt.Sprint(colVal), "'", `\'`) +} + func Wrap(colVal interface{}) string { // Escape line breaks, JSON_PARSE does not like it. colVal = strings.ReplaceAll(fmt.Sprint(colVal), `\`, `\\`) @@ -40,3 +49,16 @@ func EscapeSpaces(col string) (escaped bool, newString string) { func LineBreaksToCarriageReturns(paragraph string) string { return strings.ReplaceAll(paragraph, "\n", `\n`) } + +func stringWithCharset(length int, charset string) string { + b := make([]byte, length) + for i := range b { + b[i] = charset[rand.Intn(len(charset))] + } + return string(b) +} + +func Random(length int) string { + rand.Seed(time.Now().UnixNano()) + return stringWithCharset(length, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") +} diff --git a/lib/typing/bigquery.go b/lib/typing/bigquery.go index 6d84dae06..282289334 100644 --- a/lib/typing/bigquery.go +++ b/lib/typing/bigquery.go @@ -1,15 +1,13 @@ package typing import ( - "fmt" "strings" + "time" "github.com/artie-labs/transfer/lib/typing/ext" ) -func BigQueryJSON(json interface{}) string { - return fmt.Sprintf(`JSON '%v'`, json) -} +const StreamingTimeFormat = "15:04:05" func BigQueryTypeToKind(bqType string) KindDetails { bqType = strings.ToLower(bqType) @@ -84,3 +82,10 @@ func kindToBigQuery(kindDetails KindDetails) string { return kindDetails.Kind } + +func BigQueryDate(time time.Time) string { + // BigQuery expects the timestamp to look in this format: 2023-01-01 00:00:00 UTC + // This is used as part of table options. + layout := "2006-01-02 15:04:05 MST" + return time.Format(layout) +} diff --git a/lib/typing/bigquery_test.go b/lib/typing/bigquery_test.go index fa8e001b1..c1de517b6 100644 --- a/lib/typing/bigquery_test.go +++ b/lib/typing/bigquery_test.go @@ -8,11 +8,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBigQueryJSON(t *testing.T) { - jsonString := `{"foo": "bar"}` - assert.Equal(t, `JSON '{"foo": "bar"}'`, BigQueryJSON(jsonString)) -} - func TestBigQueryTypeToKind(t *testing.T) { bqColToExpectedKind := map[string]KindDetails{ // Integer diff --git a/lib/typing/columns.go b/lib/typing/columns.go index 149c673c0..dbab387ad 100644 --- a/lib/typing/columns.go +++ b/lib/typing/columns.go @@ -57,6 +57,24 @@ func (c *Columns) GetColumn(name string) (Column, bool) { return Column{}, false } +// GetColumnsToUpdate will filter all the `Invalid` columns so that we do not update it. +func (c *Columns) GetColumnsToUpdate() []string { + if c == nil { + return []string{} + } + + var cols []string + for _, col := range c.columns { + if col.KindDetails == Invalid { + continue + } + + cols = append(cols, col.Name) + } + + return cols +} + func (c *Columns) GetColumns() []Column { if c == nil { return []Column{} diff --git a/lib/typing/columns_test.go b/lib/typing/columns_test.go index 71fbcd46e..71d683f90 100644 --- a/lib/typing/columns_test.go +++ b/lib/typing/columns_test.go @@ -1,11 +1,62 @@ package typing import ( + "fmt" "testing" "github.com/stretchr/testify/assert" ) +func TestColumns_GetColumnsToUpdate(t *testing.T) { + type _testCase struct { + name string + cols []Column + expectedCols []string + } + + var ( + happyPathCols = []Column{ + { + Name: "hi", + KindDetails: String, + }, + { + Name: "bye", + KindDetails: String, + }, + } + ) + + extraCols := happyPathCols + for i := 0; i < 100; i++ { + extraCols = append(extraCols, Column{ + Name: fmt.Sprintf("hello_%v", i), + KindDetails: Invalid, + }) + } + + testCases := []_testCase{ + { + name: "happy path", + cols: happyPathCols, + expectedCols: []string{"hi", "bye"}, + }, + { + name: "happy path + extra col", + cols: extraCols, + expectedCols: []string{"hi", "bye"}, + }, + } + + for _, testCase := range testCases { + cols := &Columns{ + columns: testCase.cols, + } + + assert.Equal(t, testCase.expectedCols, cols.GetColumnsToUpdate(), testCase.name) + } +} + func TestColumns_UpsertColumns(t *testing.T) { keys := []string{"a", "b", "c", "d", "e"} var cols Columns diff --git a/lib/typing/ext/time.go b/lib/typing/ext/time.go index 62f3efcd2..4bc0533bd 100644 --- a/lib/typing/ext/time.go +++ b/lib/typing/ext/time.go @@ -70,3 +70,11 @@ func (e *ExtendedTime) String(overrideFormat string) string { return e.Time.Format(e.NestedKind.Format) } + +func (e *ExtendedTime) StringUTC(overrideFormat string) string { + if overrideFormat != "" { + return e.Time.In(time.UTC).Format(overrideFormat) + } + + return e.Time.In(time.UTC).Format(e.NestedKind.Format) +} diff --git a/lib/typing/ext/variables.go b/lib/typing/ext/variables.go index 1e66adff8..8857e4b01 100644 --- a/lib/typing/ext/variables.go +++ b/lib/typing/ext/variables.go @@ -3,6 +3,7 @@ package ext import "time" const ( + BigQueryDateTimeFormat = "2006-01-02 15:04:05.999999" ISO8601 = "2006-01-02T15:04:05-07:00" PostgresDateFormat = "2006-01-02" PostgresTimeFormat = "15:04:05.999999-07" // microsecond precision