Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: warehouse transformer #5205

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/databricks/databricks-sql-go v1.6.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.3.1
github.com/dlclark/regexp2 v1.11.4
github.com/docker/docker v27.3.1+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
github.com/docker/cli-docs-tool v0.8.0 // indirect
Expand Down
80 changes: 76 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime/trace"
"slices"
"strconv"
"strings"
"sync"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -55,6 +59,7 @@ import (
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
wtrans "github.com/rudderlabs/rudder-server/warehouse/transformer"
)

const (
Expand Down Expand Up @@ -84,10 +89,12 @@ type trackedUsersReporter interface {

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
conf *config.Config
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
warehouseTransformer transformer.DestinationTransformer
warehouseDebugLogger *wtrans.DebugLogger

gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
Expand Down Expand Up @@ -156,6 +163,7 @@ type Handle struct {
eventAuditEnabled map[string]bool
credentialsMap map[string][]transformer.Credential
nonEventStreamSources map[string]bool
enableWarehouseTransformations config.ValueLoader[bool]
}

drainConfig struct {
Expand Down Expand Up @@ -615,6 +623,9 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.warehouseTransformer = wtrans.New(proc.conf, proc.logger, proc.statsFactory)
proc.warehouseDebugLogger = wtrans.NewDebugLogger(proc.conf, proc.logger)

if proc.config.enableDedup {
var err error
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
Expand Down Expand Up @@ -815,6 +826,7 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv
proc.config.archivalEnabled = config.GetReloadableBoolVar(true, "archival.Enabled")
// Capture event name as a tag in event level stats
proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName")
proc.config.enableWarehouseTransformations = config.GetReloadableBoolVar(false, "Processor.enableWarehouseTransformations")
}

type connection struct {
Expand Down Expand Up @@ -2765,6 +2777,7 @@ func (proc *Handle) transformSrcDest(
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
s := time.Now()
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
proc.handleResponseForWarehouseTransformation(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID)
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved

destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
destTransformationStat.transformTime.Since(s)
Expand Down Expand Up @@ -2923,6 +2936,65 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) handleResponseForWarehouseTransformation(
ctx context.Context,
eventsToTransform []transformer.TransformerEvent,
pResponse transformer.Response,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) {
if _, ok := warehouseutils.WarehouseDestinationMap[commonMetaData.DestinationType]; !ok {
return
}
if len(eventsToTransform) == 0 || !proc.config.enableWarehouseTransformations.Load() {
return
}
defer proc.statsFactory.NewStat("proc_warehouse_transformations_time", stats.TimerType).RecordDuration()()

wResponse := proc.warehouseTransformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
differingEvents := proc.responsesDiffer(eventsToTransform, pResponse, wResponse, eventsByMessageID)
if err := proc.warehouseDebugLogger.LogEvents(differingEvents, commonMetaData); err != nil {
proc.logger.Warnn("Failed to log events for warehouse transformation debugging", obskit.Error(err))
}
}

func (proc *Handle) responsesDiffer(
eventsToTransform []transformer.TransformerEvent,
pResponse, wResponse transformer.Response,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) []types.SingularEventT {
// If the event counts differ, return all events in the transformation
if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) {
events := lo.Map(eventsToTransform, func(e transformer.TransformerEvent, _ int) types.SingularEventT {
return eventsByMessageID[e.Metadata.MessageID].SingularEvent
})
proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType).Count(len(events))
return events
}

var (
differedSampleEvents []types.SingularEventT
differedEventsCount int
collectedSampleEvent bool
)

for i := range pResponse.Events {
if !reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) {
differedEventsCount++
if !collectedSampleEvent {
// Collect the mismatched messages and break (sample only)
differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
collectedSampleEvent = true
}
}
}
proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType).Count(differedEventsCount)

return differedSampleEvents
}

func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
if len(droppedJobs) > 0 {
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch
Expand Down
20 changes: 16 additions & 4 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,25 @@ func WithClient(client *http.Client) Opt {
}
}

// Transformer provides methods to transform events
type Transformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
type UserTransformer interface {
UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type DestinationTransformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type TrackingPlanValidator interface {
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

// Transformer provides methods to transform events
type Transformer interface {
UserTransformer
DestinationTransformer
TrackingPlanValidator
}

// handle is the handle for this class
type handle struct {
sentStat stats.Measurement
Expand Down Expand Up @@ -526,7 +538,7 @@ func (trans *handle) destTransformURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))

if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
whSchemaVersionQueryParam := fmt.Sprintf("whIDResolve=%t", trans.conf.GetBool("Warehouse.enableIDResolution", false))
switch destType {
case warehouseutils.RS:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
Expand Down
62 changes: 62 additions & 0 deletions warehouse/transformer/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package transformer

import (
"fmt"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/rules"
)

func (t *transformer) handleAliasEvent(pi *processingInfo) ([]map[string]any, error) {
event := make(map[string]any)
columnTypes := make(map[string]string)

if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["traits"], event, columnTypes, &prefixInfo{
completePrefix: "alias_traits_",
completeLevel: 2,
}); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
}
if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["context"], event, columnTypes, &prefixInfo{
completePrefix: "alias_context_",
completeLevel: 2,
prefix: "context_",
}); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
}
if err := t.setDataAndColumnTypeFromRules(pi, event, columnTypes,
lo.Assign(rules.DefaultRules, rules.AliasRules), rules.DefaultFunctionalRules,
); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from rules: %w", err)
}

if err := storeRudderEvent(pi, event, columnTypes); err != nil {
return nil, fmt.Errorf("alias: storing rudder event: %w", err)
}

tableName, err := SafeTableName(pi.event.Metadata.DestinationType, pi.itrOpts, "aliases")
if err != nil {
return nil, fmt.Errorf("alias: safe table name: %w", err)
}
columns, err := t.getColumns(pi.event.Metadata.DestinationType, event, columnTypes)
if err != nil {
return nil, fmt.Errorf("alias: getting columns: %w", err)
}

mergeEvents, err := t.handleMergeEvent(pi)
if err != nil {
return nil, fmt.Errorf("merge event: %w", err)
}

aliasOutput := map[string]any{
"data": event,
"metadata": map[string]any{
"table": tableName,
"columns": columns,
"receivedAt": pi.event.Metadata.ReceivedAt,
},
"userId": "",
}
return append([]map[string]any{aliasOutput}, mergeEvents...), nil
}
Loading
Loading