diff --git a/api/pkg/inference-logger/logger/mlobs_sink.go b/api/pkg/inference-logger/logger/mlobs_sink.go index 6cdf06d5b..53a3b4af8 100644 --- a/api/pkg/inference-logger/logger/mlobs_sink.go +++ b/api/pkg/inference-logger/logger/mlobs_sink.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" "google.golang.org/protobuf/types/known/timestamppb" @@ -19,6 +20,10 @@ var ( ErrMalformedLogEntry = errors.New("malformed log entry") ) +const ( + SamplingRate = 0.01 +) + type MLObsSink struct { logger *zap.SugaredLogger producer KafkaProducer @@ -163,6 +168,9 @@ func (m *MLObsSink) buildNewKafkaMessage(predictionLog *upiv1.PredictionLog) (*k func (m *MLObsSink) Sink(rawLogEntries []*LogEntry) error { for _, rawLogEntry := range rawLogEntries { + if rand.Float64() >= SamplingRate { + continue + } predictionLog, err := m.newPredictionLog(rawLogEntry) if err != nil { m.logger.Errorf("unable to convert log entry: %v", err)