Skip to content

Commit

Permalink
Fix cache key collision when 2 tables have same list of entity (#187)
Browse files Browse the repository at this point in the history
* Change in-memory cache options constructor arguments

* Restructure feature retrievaer initialisation

* Restructure feature retriever initialisation

* Restructure feature retriever initialisation

* Rework feature cache

* Rework feature cache

* Handle different column ordering in feature cache

* Update test after reworking cache

* Move helper functions into utils.go and converter.go

* Move internal type into types.go

* Move Benchmark_buildEntitiesRequest_geohashArrays to feature_retriever_bench_test.go

* Move batch call into batch_call.go

* Move extract feast value test to converter_test.go

* Add unit test got GetTableName

* Add batch_call_test.go

* Use entitySet instead of entityIndexMap

* Add ToInt32 conversion

* Ensure ordering should be maintained in feature cache

* Remove debugging code

* Refactor feature_retriever.go

* Refactor batch_call.go

* Rename batch call to call

* Move mergeColumnTypes to types.go
  • Loading branch information
aria authored Oct 5, 2021
1 parent b892e5b commit 9a98e57
Show file tree
Hide file tree
Showing 20 changed files with 2,187 additions and 2,323 deletions.
10 changes: 1 addition & 9 deletions api/cmd/transformer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.uber.org/zap"

"github.com/gojek/merlin/pkg/hystrix"
"github.com/gojek/merlin/pkg/transformer/cache"
"github.com/gojek/merlin/pkg/transformer/feast"
"github.com/gojek/merlin/pkg/transformer/jsonpath"
"github.com/gojek/merlin/pkg/transformer/pipeline"
Expand All @@ -39,7 +38,6 @@ func init() {
type AppConfig struct {
Server server.Options
Feast feast.Options
Cache cache.Options

StandardTransformerConfigJSON string `envconfig:"STANDARD_TRANSFORMER_CONFIG" required:"true"`
LogLevel string `envconfig:"LOG_LEVEL"`
Expand Down Expand Up @@ -98,7 +96,7 @@ func main() {
s.PreprocessHandler = feastTransformer.Enrich
} else {
// Standard Enricher
compiler := pipeline.NewCompiler(symbol.NewRegistry(), feastServingClients, &appConfig.Feast, &appConfig.Cache, logger)
compiler := pipeline.NewCompiler(symbol.NewRegistry(), feastServingClients, &appConfig.Feast, logger)
compiledPipeline, err := compiler.Compile(transformerConfig)
if err != nil {
logger.Fatal("Unable to compile standard transformer", zap.Error(err))
Expand Down Expand Up @@ -161,11 +159,6 @@ func initFeastTransformer(appCfg AppConfig,
transformerConfig *spec.StandardTransformerConfig,
logger *zap.Logger) (*feast.Enricher, error) {

var memoryCache cache.Cache
if appCfg.Feast.CacheEnabled {
memoryCache = cache.NewInMemoryCache(&appCfg.Cache)
}

compiledJSONPaths, err := feast.CompileJSONPaths(transformerConfig.TransformerConfig.Feast)
if err != nil {
return nil, err
Expand All @@ -185,7 +178,6 @@ func initFeastTransformer(appCfg AppConfig,
entityExtractor,
transformerConfig.TransformerConfig.Feast,
&appCfg.Feast,
memoryCache,
logger,
)

Expand Down
8 changes: 2 additions & 6 deletions api/pkg/transformer/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ type Cache interface {
Fetch(key []byte) ([]byte, error)
}

type Options struct {
SizeInMB int `envconfig:"CACHE_SIZE_IN_MB" default:"100"`
}

type inMemoryCache struct {
cache *freecache.Cache
}
Expand All @@ -23,8 +19,8 @@ const (
MB = 1024 * 1024
)

func NewInMemoryCache(options *Options) *inMemoryCache {
executor := freecache.NewCache(options.SizeInMB * MB)
func NewInMemoryCache(sizeInMB int) *inMemoryCache {
executor := freecache.NewCache(sizeInMB * MB)
return &inMemoryCache{cache: executor}
}

Expand Down
4 changes: 2 additions & 2 deletions api/pkg/transformer/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCache(t *testing.T) {
t.Run(tC.desc, func(t *testing.T) {
dataByte, err := json.Marshal(tC.data)
require.NoError(t, err)
cache := NewInMemoryCache(&Options{SizeInMB: 1})
cache := NewInMemoryCache(1)
cache.Insert(tC.key, dataByte, 1)
cachedValue, err := cache.Fetch(tC.key)
require.NoError(t, err)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestCache_Expiry(t *testing.T) {
t.Run(tC.desc, func(t *testing.T) {
dataByte, err := json.Marshal(tC.data)
require.NoError(t, err)
cache := NewInMemoryCache(&Options{SizeInMB: 1})
cache := NewInMemoryCache(1)
cache.Insert(tC.key, dataByte, 1*time.Second)

if tC.delayOfFetching > 0 {
Expand Down
144 changes: 144 additions & 0 deletions api/pkg/transformer/feast/call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package feast

import (
"context"
"fmt"
"time"

feast "github.com/feast-dev/feast/sdk/go"
"github.com/feast-dev/feast/sdk/go/protos/feast/serving"
"github.com/feast-dev/feast/sdk/go/protos/feast/types"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/gojek/merlin/pkg/transformer/spec"
transTypes "github.com/gojek/merlin/pkg/transformer/types"
"github.com/gojek/merlin/pkg/transformer/types/converter"
)

type call struct {
featureTableSpec *spec.FeatureTable
columns []string
entitySet map[string]bool
defaultValues defaultValues

feastClient feast.Client
feastURL string

logger *zap.Logger

statusMonitoringEnabled bool
valueMonitoringEnabled bool
}

// do create request to feast and return the result as table
func (fc *call) do(ctx context.Context, entityList []feast.Row, features []string) callResult {
tableName := GetTableName(fc.featureTableSpec)

span, ctx := opentracing.StartSpanFromContext(ctx, "feast.doBatchCall")
span.SetTag("feast.url", fc.feastURL)
span.SetTag("table", tableName)
defer span.Finish()

feastRequest := feast.OnlineFeaturesRequest{
Project: fc.featureTableSpec.Project,
Entities: entityList,
Features: features,
}

startTime := time.Now()
feastResponse, err := fc.feastClient.GetOnlineFeatures(ctx, &feastRequest)
durationMs := time.Now().Sub(startTime).Milliseconds()
if err != nil {
feastLatency.WithLabelValues("error", fc.feastURL).Observe(float64(durationMs))
feastError.WithLabelValues(fc.feastURL).Inc()

return callResult{featureTable: nil, err: err}
}
feastLatency.WithLabelValues("success", fc.feastURL).Observe(float64(durationMs))

featureTable, err := fc.processResponse(feastResponse)
if err != nil {
return callResult{featureTable: nil, err: err}
}

return callResult{tableName: tableName, featureTable: featureTable, err: nil}
}

// processResponse process response from feast serving and create an internal feature table representation of it
func (fc *call) processResponse(feastResponse *feast.OnlineFeaturesResponse) (*internalFeatureTable, error) {
responseStatus := feastResponse.Statuses()
responseRows := feastResponse.Rows()
entities := make([]feast.Row, len(responseRows))
valueRows := make([]transTypes.ValueRow, len(responseRows))
columnTypes := make([]types.ValueType_Enum, len(fc.columns))

for rowIdx, feastRow := range responseRows {
valueRow := make(transTypes.ValueRow, len(fc.columns))

// create entity object, for cache key purpose
entity := feast.Row{}
for colIdx, column := range fc.columns {
var rawValue *types.Value

featureStatus := responseStatus[rowIdx][column]
switch featureStatus {
case serving.GetOnlineFeaturesResponse_PRESENT:
rawValue = feastRow[column]
// set value of entity
_, isEntity := fc.entitySet[column]
if isEntity {
entity[column] = rawValue
}
case serving.GetOnlineFeaturesResponse_NOT_FOUND, serving.GetOnlineFeaturesResponse_NULL_VALUE, serving.GetOnlineFeaturesResponse_OUTSIDE_MAX_AGE:
defVal, ok := fc.defaultValues.GetDefaultValue(fc.featureTableSpec.Project, column)
if !ok {
// no default value is specified, we populate with nil
valueRow[colIdx] = nil
continue
}
rawValue = defVal
default:
return nil, fmt.Errorf("unsupported feature retrieval status: %s", featureStatus)
}

val, valType, err := converter.ExtractFeastValue(rawValue)
if err != nil {
return nil, err
}

// if previously we detected that the column type is invalid then we set it to the correct type
if valType != types.ValueType_INVALID {
columnTypes[colIdx] = valType
}
valueRow[colIdx] = val

fc.recordMetrics(val, column, featureStatus)
}

entities[rowIdx] = entity
valueRows[rowIdx] = valueRow
}

return &internalFeatureTable{
entities: entities,
columnNames: fc.columns,
columnTypes: columnTypes,
valueRows: valueRows,
}, nil
}

func (fc *call) recordMetrics(val interface{}, column string, featureStatus serving.GetOnlineFeaturesResponse_FieldStatus) {
// put behind feature toggle since it will generate high cardinality metrics
if fc.valueMonitoringEnabled {
v, err := converter.ToFloat64(val)
if err == nil {
feastFeatureSummary.WithLabelValues(column).Observe(v)
}
}

// put behind feature toggle since it will generate high cardinality metrics
if fc.statusMonitoringEnabled {
feastFeatureStatus.WithLabelValues(column, featureStatus.String()).Inc()
}
}
Loading

0 comments on commit 9a98e57

Please sign in to comment.