Skip to content

Commit

Permalink
Feat.warehouse transformer fuzz (#5206)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored and achettyiitr committed Oct 21, 2024
1 parent 98f7827 commit a1fd52e
Show file tree
Hide file tree
Showing 36 changed files with 5,280 additions and 7,735 deletions.
96 changes: 92 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime/trace"
"slices"
"strconv"
"strings"
"sync"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -55,6 +59,7 @@ import (
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
wtrans "github.com/rudderlabs/rudder-server/warehouse/transformer"
)

const (
Expand Down Expand Up @@ -84,10 +89,12 @@ type trackedUsersReporter interface {

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
conf *config.Config
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
warehouseTransformer transformer.DestinationTransformer
warehouseDebugLogger *wtrans.DebugLogger

gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
Expand Down Expand Up @@ -156,6 +163,7 @@ type Handle struct {
eventAuditEnabled map[string]bool
credentialsMap map[string][]transformer.Credential
nonEventStreamSources map[string]bool
enableWarehouseTransformations config.ValueLoader[bool]
}

drainConfig struct {
Expand Down Expand Up @@ -615,6 +623,9 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.warehouseTransformer = wtrans.New(proc.conf, proc.logger, proc.statsFactory)
proc.warehouseDebugLogger = wtrans.NewDebugLogger(proc.conf, proc.logger)

if proc.config.enableDedup {
var err error
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
Expand Down Expand Up @@ -815,6 +826,7 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv
proc.config.archivalEnabled = config.GetReloadableBoolVar(true, "archival.Enabled")
// Capture event name as a tag in event level stats
proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName")
proc.config.enableWarehouseTransformations = config.GetReloadableBoolVar(false, "Processor.enableWarehouseTransformations")
}

type connection struct {
Expand Down Expand Up @@ -2765,6 +2777,7 @@ func (proc *Handle) transformSrcDest(
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
s := time.Now()
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
proc.handleResponseForWarehouseTransformation(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID)

destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
destTransformationStat.transformTime.Since(s)
Expand Down Expand Up @@ -2923,6 +2936,81 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) handleResponseForWarehouseTransformation(
ctx context.Context,
eventsToTransform []transformer.TransformerEvent,
pResponse transformer.Response,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) {
if _, ok := warehouseutils.WarehouseDestinationMap[commonMetaData.DestinationType]; !ok {
return
}
if len(eventsToTransform) == 0 || !proc.config.enableWarehouseTransformations.Load() {
return
}
defer proc.statsFactory.NewStat("proc_warehouse_transformations_time", stats.TimerType).RecordDuration()()

wResponse := proc.warehouseTransformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
differingEvents := proc.responsesDiffer(eventsToTransform, pResponse, wResponse, eventsByMessageID)
if err := proc.warehouseDebugLogger.LogEvents(differingEvents, commonMetaData); err != nil {
proc.logger.Warnn("Failed to log events for warehouse transformation debugging", obskit.Error(err))
}
}

func (proc *Handle) responsesDiffer(
eventsToTransform []transformer.TransformerEvent,
pResponse, wResponse transformer.Response,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) []types.SingularEventT {
// If the event counts differ, return all events in the transformation
if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) {
events := lo.Map(eventsToTransform, func(e transformer.TransformerEvent, _ int) types.SingularEventT {
return eventsByMessageID[e.Metadata.MessageID].SingularEvent
})
proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType).Count(len(events))
return events
}

var (
differedSampleEvents []types.SingularEventT
differedEventsCount int
collectedSampleEvent bool
collectedSampleFailedEvent bool
)

for i := range pResponse.Events {
if !reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) {
differedEventsCount++
if !collectedSampleEvent {
// Collect the mismatched messages and break (sample only)
differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
collectedSampleEvent = true
}
}
}
for i := range pResponse.FailedEvents {
wResponse.FailedEvents[i].Error = pResponse.FailedEvents[i].Error // Ensure errors match

if !reflect.DeepEqual(pResponse.FailedEvents[i], wResponse.FailedEvents[i]) {
differedEventsCount++
if !collectedSampleFailedEvent {
// Collect the mismatched messages and break (sample only)
differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.FailedEvents[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
collectedSampleFailedEvent = true
}
}
}

proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType).Count(differedEventsCount)

return differedSampleEvents
}

func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
if len(droppedJobs) > 0 {
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch
Expand Down
Loading

0 comments on commit a1fd52e

Please sign in to comment.