Skip to content

Commit

Permalink
feat: warehouse transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 17, 2024
1 parent bf2b4c6 commit 5ae4838
Show file tree
Hide file tree
Showing 55 changed files with 15,412 additions and 8 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/databricks/databricks-sql-go v1.6.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.3.1
github.com/dlclark/regexp2 v1.11.4
github.com/docker/docker v27.3.1+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -76,7 +77,7 @@ require (
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/bing-ads-go-sdk v0.2.3
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.43.0
github.com/rudderlabs/rudder-go-kit v0.43.1-0.20241017045502-08a98c5f8442
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.5.3
github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
github.com/docker/cli-docs-tool v0.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,8 @@ github.com/rudderlabs/goqu/v10 v10.3.1 h1:rnfX+b4EwBWQ2UQfIGeEW299JBBkK5biEbnf7K
github.com/rudderlabs/goqu/v10 v10.3.1/go.mod h1:LH2vI5gGHBxEQuESqFyk5ZA2anGINc8o25hbidDWOYw=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.43.0 h1:N6CAvQdjufitdiUl424+AcMebEmieB0TO5PhARwXvw8=
github.com/rudderlabs/rudder-go-kit v0.43.0/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4=
github.com/rudderlabs/rudder-go-kit v0.43.1-0.20241017045502-08a98c5f8442 h1:WAYL/6chiRSIeKwSNGd9sclWNWbKBwenGbUhiyxQIi4=
github.com/rudderlabs/rudder-go-kit v0.43.1-0.20241017045502-08a98c5f8442/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4=
github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q=
github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8=
github.com/rudderlabs/rudder-schemas v0.5.3 h1:IWWjAo2TzsjwHNhS2EAr1+0MjvA8BoTpJvB2o/GFwNU=
Expand Down
20 changes: 16 additions & 4 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,25 @@ func WithClient(client *http.Client) Opt {
}
}

// Transformer provides methods to transform events
type Transformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
type UserTransformer interface {
UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type DestinationTransformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type TrackingPlanValidator interface {
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

// Transformer provides methods to transform events
type Transformer interface {
UserTransformer
DestinationTransformer
TrackingPlanValidator
}

// handle is the handle for this class
type handle struct {
sentStat stats.Measurement
Expand Down Expand Up @@ -526,7 +538,7 @@ func (trans *handle) destTransformURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))

if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
whSchemaVersionQueryParam := fmt.Sprintf("whIDResolve=%v", trans.conf.GetBool("Warehouse.enableIDResolution", false))
switch destType {
case warehouseutils.RS:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
Expand Down
58 changes: 58 additions & 0 deletions warehouse/transformer/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package transformer

import (
"fmt"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/rules"
)

func (t *transformer) handleAliasEvent(pi *processingInfo) ([]map[string]any, error) {
aliasEvent := make(map[string]any)
columnTypes := make(map[string]string)

if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["traits"], aliasEvent, columnTypes,
"alias_traits_", 2, "", 0,
); err != nil {
return nil, fmt.Errorf("setting data and column types from message: %w", err)
}
if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["context"], aliasEvent, columnTypes,
"alias_context_", 2, "context_", 0,
); err != nil {
return nil, fmt.Errorf("setting data and column types from message: %w", err)
}
if err := t.setDataAndColumnTypeFromRules(pi, aliasEvent, columnTypes,
lo.Assign(rules.DefaultRules, rules.AliasRules), rules.DefaultFunctionalRules,
); err != nil {
return nil, fmt.Errorf("setting data and column types from rules: %w", err)
}
if err := storeRudderEvent(pi, aliasEvent, columnTypes); err != nil {
return nil, fmt.Errorf("storing rudder event: %w", err)
}

table, err := SafeTableName(pi.event.Metadata.DestinationType, pi.itrOpts, "aliases")
if err != nil {
return nil, fmt.Errorf("safe table name: %w", err)
}
columns, err := t.getColumns(pi.event.Metadata.DestinationType, aliasEvent, columnTypes)
if err != nil {
return nil, fmt.Errorf("getting columns: %w", err)
}

mergeEvents, err := t.handleMergeEvent(pi)
if err != nil {
return nil, fmt.Errorf("handling merge event: %w", err)
}

aliasOutput := map[string]any{
"data": aliasEvent,
"metadata": map[string]any{
"table": table,
"columns": columns,
"receivedAt": pi.event.Metadata.ReceivedAt,
},
"userId": "",
}
return append([]map[string]any{aliasOutput}, mergeEvents...), nil
}
Loading

0 comments on commit 5ae4838

Please sign in to comment.