diff --git a/clients/bigquery/dialect/dialect.go b/clients/bigquery/dialect/dialect.go index 9fe08cfa5..2d1ae108c 100644 --- a/clients/bigquery/dialect/dialect.go +++ b/clients/bigquery/dialect/dialect.go @@ -43,11 +43,13 @@ func (BigQueryDialect) IsTableDoesNotExistErr(_ error) bool { func (bd BigQueryDialect) BuildIsNotToastValueExpression(tableAlias constants.TableAlias, column columns.Column) string { colName := sql.QuoteTableAliasColumn(tableAlias, column, bd) - if column.KindDetails == typing.Struct { - return fmt.Sprintf(`COALESCE(TO_JSON_STRING(%s) != '{"key":"%s"}', true)`, - colName, constants.ToastUnavailableValuePlaceholder) + + switch column.KindDetails { + case typing.Struct, typing.Array: + return fmt.Sprintf(`TO_JSON_STRING(%s) NOT LIKE '%s'`, colName, "%"+constants.ToastUnavailableValuePlaceholder+"%") + default: + return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } - return fmt.Sprintf("COALESCE(%s != '%s', true)", colName, constants.ToastUnavailableValuePlaceholder) } func (bd BigQueryDialect) BuildDedupeTableQuery(tableID sql.TableIdentifier, primaryKeys []string) string { diff --git a/clients/bigquery/storagewrite.go b/clients/bigquery/storagewrite.go index 828a9060d..e8e84b7c8 100644 --- a/clients/bigquery/storagewrite.go +++ b/clients/bigquery/storagewrite.go @@ -243,6 +243,12 @@ func rowToMessage(row map[string]any, columns []columns.Column, messageDescripto if err != nil { return nil, err } + + fmt.Println("Type of values", fmt.Sprintf("%T", values)) + for _, val := range values { + fmt.Println("value", val) + } + list := message.Mutable(field).List() for _, val := range values { list.Append(protoreflect.ValueOfString(val)) diff --git a/clients/shared/merge.go b/clients/shared/merge.go index 743c7e317..51392e31f 100644 --- a/clients/shared/merge.go +++ b/clients/shared/merge.go @@ -151,6 +151,8 @@ func Merge(ctx context.Context, dwh destination.DataWarehouse, tableData *optimi return fmt.Errorf("failed to generate merge statements: %w", err) } + fmt.Println("mergeStatements", mergeStatements) + if err = destination.ExecStatements(dwh, mergeStatements); err != nil { return fmt.Errorf("failed to execute merge statements: %w", err) }