Skip to content

Commit

Permalink
fix: Removed and commented out DataDog related observability instrume…
Browse files Browse the repository at this point in the history
…ntation code.

Signed-off-by: Shuchu Han <[email protected]>
  • Loading branch information
shuchu committed Nov 1, 2024
1 parent 6cfb2d6 commit 90d32bf
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 232 deletions.
27 changes: 0 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,29 @@ require (
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
gopkg.in/DataDog/dd-trace-go.v1 v1.68.0
)

require (
github.com/DataDog/appsec-internal-go v1.7.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.57.1 // indirect
github.com/DataDog/datadog-go/v5 v5.5.0 // indirect
github.com/DataDog/go-tuf v1.1.0-0.5.2 // indirect
github.com/DataDog/sketches-go v1.4.6 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/ebitengine/purego v0.7.1 // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.8.0 // indirect
github.com/tinylib/msgp v1.2.1 // indirect
golang.org/x/time v0.6.0 // indirect
)

require (
github.com/DataDog/go-libddwaf/v3 v3.4.0 // indirect
github.com/DataDog/go-sqllexer v0.0.14 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.8 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.6 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.29.0 // indirect
Expand Down
137 changes: 1 addition & 136 deletions go.sum

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"net"
"os"
"os/signal"
"strings"
//"strings"
"syscall"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
Expand All @@ -32,7 +32,7 @@ import (
jsonlog "github.com/rs/zerolog/log"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc"
//grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc"
)

type OnlineFeatureService struct {
Expand Down Expand Up @@ -302,10 +302,10 @@ func (s *OnlineFeatureService) constructLoggingService(writeLoggedFeaturesCallba
// StartGrpcServerWithLogging starts gRPC server with enabled feature logging
// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts)
func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error {
if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
tracer.Start(tracer.WithRuntimeMetrics())
defer tracer.Stop()
}
//if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
// tracer.Start(tracer.WithRuntimeMetrics())
// defer tracer.Stop()
//}

loggingService, err := s.constructLoggingService(writeLoggedFeaturesCallback, loggingOpts)
if err != nil {
Expand All @@ -318,7 +318,8 @@ func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int,
return err
}

grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor()))
//grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor()))
grpcServer := grpc.NewServer()

serving.RegisterServingServiceServer(grpcServer, ser)
healthService := health.NewServer()
Expand Down
6 changes: 3 additions & 3 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"

"github.com/apache/arrow/go/v17/arrow/memory"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/onlineserving"
Expand Down Expand Up @@ -316,8 +316,8 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p
requestedFeatureNames []string,
) ([][]onlinestore.FeatureData, error) {
// Create a Datadog span from context
span, _ := tracer.StartSpanFromContext(ctx, "fs.readFromOnlineStore")
defer span.Finish()
//span, _ := tracer.StartSpanFromContext(ctx, "fs.readFromOnlineStore")
//defer span.Finish()

numRows := len(entityRows)
entityRowsValue := make([]*prototypes.EntityKey, numRows)
Expand Down
30 changes: 15 additions & 15 deletions go/internal/feast/onlinestore/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
//"os"
"sort"
"strconv"
"strings"

"github.com/feast-dev/feast/go/internal/feast/registry"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"

"github.com/redis/go-redis/v9"
"github.com/spaolacci/murmur3"
Expand All @@ -22,7 +22,7 @@ import (
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/rs/zerolog/log"
redistrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9"
//redistrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/go-redis.v9"
)

type redisType int
Expand Down Expand Up @@ -107,10 +107,10 @@ func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStor
}

// Metrics are not showing up when the service name is set to DD_SERVICE
redisTraceServiceName := os.Getenv("DD_SERVICE") + "-redis"
if redisTraceServiceName == "" {
redisTraceServiceName = "redis.client" // default service name if DD_SERVICE is not set
}
//redisTraceServiceName := os.Getenv("DD_SERVICE") + "-redis"
//if redisTraceServiceName == "" {
// redisTraceServiceName = "redis.client" // default service name if DD_SERVICE is not set
//}

if redisStoreType == redisNode {
log.Info().Msgf("Using Redis: %s", address[0])
Expand All @@ -120,9 +120,9 @@ func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStor
DB: db,
TLSConfig: tlsConfig,
})
if strings.ToLower(os.Getenv("ENABLE_DATADOG_REDIS_TRACING")) == "true" {
redistrace.WrapClient(store.client, redistrace.WithServiceName(redisTraceServiceName))
}
//if strings.ToLower(os.Getenv("ENABLE_DATADOG_REDIS_TRACING")) == "true" {
// redistrace.WrapClient(store.client, redistrace.WithServiceName(redisTraceServiceName))
//}
} else if redisStoreType == redisCluster {
log.Info().Msgf("Using Redis Cluster: %s", address)
store.clusterClient = redis.NewClusterClient(&redis.ClusterOptions{
Expand All @@ -131,9 +131,9 @@ func NewRedisOnlineStore(project string, config *registry.RepoConfig, onlineStor
TLSConfig: tlsConfig,
ReadOnly: true,
})
if strings.ToLower(os.Getenv("ENABLE_DATADOG_REDIS_TRACING")) == "true" {
redistrace.WrapClient(store.clusterClient, redistrace.WithServiceName(redisTraceServiceName))
}
//if strings.ToLower(os.Getenv("ENABLE_DATADOG_REDIS_TRACING")) == "true" {
// redistrace.WrapClient(store.clusterClient, redistrace.WithServiceName(redisTraceServiceName))
//}
}

return &store, nil
Expand Down Expand Up @@ -212,8 +212,8 @@ func (r *RedisOnlineStore) buildRedisKeys(entityKeys []*types.EntityKey) ([]*[]b
}

func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
span, _ := tracer.StartSpanFromContext(ctx, "redis.OnlineRead")
defer span.Finish()
//span, _ := tracer.StartSpanFromContext(ctx, "redis.OnlineRead")
//defer span.Finish()

featureCount := len(featureNames)
featureViewIndices, indicesFeatureView, index := r.buildFeatureViewIndices(featureViewNames, featureNames)
Expand Down
19 changes: 9 additions & 10 deletions go/internal/feast/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/google/uuid"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const feastServerVersion = "0.0.1"
Expand All @@ -34,17 +34,16 @@ func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, requ
// Metadata contains feature names that corresponds to the number of rows in response.Results.
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
//span, ctx := tracer.StartSpanFromContext(ctx, "getOnlineFeatures", tracer.ResourceName("ServingService/GetOnlineFeatures"))
//defer span.Finish()

span, ctx := tracer.StartSpanFromContext(ctx, "getOnlineFeatures", tracer.ResourceName("ServingService/GetOnlineFeatures"))
defer span.Finish()

logSpanContext := LogWithSpanContext(span)
//logSpanContext := LogWithSpanContext(span)

requestId := GenerateRequestId()
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())

if err != nil {
logSpanContext.Error().Err(err).Msg("Error parsing feature service or feature list from request")
//logSpanContext.Error().Err(err).Msg("Error parsing feature service or feature list from request")
return nil, err
}

Expand All @@ -57,7 +56,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
request.GetFullFeatureNames())

if err != nil {
logSpanContext.Error().Err(err).Msg("Error getting online features")
//logSpanContext.Error().Err(err).Msg("Error getting online features")
return nil, err
}

Expand All @@ -76,7 +75,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
featureNames[idx] = vector.Name
values, err := types.ArrowValuesToProtoValues(vector.Values)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error converting Arrow values to proto values")
//logSpanContext.Error().Err(err).Msg("Error converting Arrow values to proto values")
return nil, err
}
if _, ok := request.Entities[vector.Name]; ok {
Expand All @@ -94,13 +93,13 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
logger, err := s.loggingService.GetOrCreateLogger(featureService)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error to instantiating logger for feature service: " + featuresOrService.FeatureService.Name)
//logSpanContext.Error().Err(err).Msg("Error to instantiating logger for feature service: " + featuresOrService.FeatureService.Name)
fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err)
}

err = logger.Log(request.Entities, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error to logging to feature service: " + featuresOrService.FeatureService.Name)
//logSpanContext.Error().Err(err).Msg("Error to logging to feature service: " + featuresOrService.FeatureService.Name)
fmt.Printf("LoggerImpl error[%s]: %+v", featuresOrService.FeatureService.Name, err)
}
}
Expand Down
40 changes: 21 additions & 19 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
//"os"
"runtime"
"strconv"
"strings"
//"strings"
"time"

"github.com/feast-dev/feast/go/internal/feast"
Expand All @@ -19,8 +19,8 @@ import (
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/rs/zerolog/log"
httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
//httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
//"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

type httpServer struct {
Expand Down Expand Up @@ -149,10 +149,11 @@ func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingServic
func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
var err error

span, ctx := tracer.StartSpanFromContext(r.Context(), "getOnlineFeatures", tracer.ResourceName("/get-online-features"))
defer span.Finish(tracer.WithError(err))
ctx := r.Context()
//span, ctx := tracer.StartSpanFromContext(r.Context(), "getOnlineFeatures", tracer.ResourceName("/get-online-features"))
//defer span.Finish(tracer.WithError(err))

logSpanContext := LogWithSpanContext(span)
//logSpanContext := LogWithSpanContext(span)

if r.Method != "POST" {
http.NotFound(w, r)
Expand All @@ -165,7 +166,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
if statusQuery != "" {
status, err = strconv.ParseBool(statusQuery)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error parsing status query parameter")
//logSpanContext.Error().Err(err).Msg("Error parsing status query parameter")
writeJSONError(w, fmt.Errorf("Error parsing status query parameter: %+v", err), http.StatusBadRequest)
return
}
Expand All @@ -175,15 +176,15 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
var request getOnlineFeaturesRequest
err = decoder.Decode(&request)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error decoding JSON request data")
//logSpanContext.Error().Err(err).Msg("Error decoding JSON request data")
writeJSONError(w, fmt.Errorf("Error decoding JSON request data: %+v", err), http.StatusInternalServerError)
return
}
var featureService *model.FeatureService
if request.FeatureService != nil {
featureService, err = s.fs.GetFeatureService(*request.FeatureService)
if err != nil {
logSpanContext.Error().Err(err).Msg("Error getting feature service from registry")
//logSpanContext.Error().Err(err).Msg("Error getting feature service from registry")
writeJSONError(w, fmt.Errorf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError)
return
}
Expand All @@ -206,7 +207,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
request.FullFeatureNames)

if err != nil {
logSpanContext.Error().Err(err).Msg("Error getting feature vector")
//logSpanContext.Error().Err(err).Msg("Error getting feature vector")
writeJSONError(w, fmt.Errorf("Error getting feature vector: %+v", err), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -248,15 +249,15 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
err = json.NewEncoder(w).Encode(response)

if err != nil {
logSpanContext.Error().Err(err).Msg("Error encoding response")
//logSpanContext.Error().Err(err).Msg("Error encoding response")
writeJSONError(w, fmt.Errorf("Error encoding response: %+v", err), http.StatusInternalServerError)
return
}

if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
logger, err := s.loggingService.GetOrCreateLogger(featureService)
if err != nil {
logSpanContext.Error().Err(err).Msgf("Couldn't instantiate logger for feature service %s", featureService.Name)
//logSpanContext.Error().Err(err).Msgf("Couldn't instantiate logger for feature service %s", featureService.Name)
writeJSONError(w, fmt.Errorf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError)
return
}
Expand All @@ -269,7 +270,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
for _, vector := range featureVectors[len(request.Entities):] {
values, err := types.ArrowValuesToProtoValues(vector.Values)
if err != nil {
logSpanContext.Error().Err(err).Msg("Couldn't convert arrow values into protobuf")
//logSpanContext.Error().Err(err).Msg("Couldn't convert arrow values into protobuf")
writeJSONError(w, fmt.Errorf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -339,11 +340,12 @@ func recoverMiddleware(next http.Handler) http.Handler {
}

func (s *httpServer) Serve(host string, port int) error {
if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
tracer.Start(tracer.WithRuntimeMetrics())
defer tracer.Stop()
}
mux := httptrace.NewServeMux()
// DD
//if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" {
// tracer.Start(tracer.WithRuntimeMetrics())
// defer tracer.Stop()
//}
mux := http.NewServeMux()
mux.Handle("/get-online-features", recoverMiddleware(http.HandlerFunc(s.getOnlineFeatures)))
mux.HandleFunc("/health", healthCheckHandler)
s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: mux, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, IdleTimeout: 15 * time.Second}
Expand Down
Loading

0 comments on commit 90d32bf

Please sign in to comment.