-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement open telemetry integration - client/server and explict token latency #296
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -36,6 +36,18 @@ import ( | |||||
|
||||||
mcv1alpha3 "github.com/opea-project/GenAIInfra/microservices-connector/api/v1alpha3" | ||||||
flag "github.com/spf13/pflag" | ||||||
|
||||||
// Prometheus and opentelemetry imports | ||||||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||||||
|
||||||
"go.opentelemetry.io/otel" | ||||||
"go.opentelemetry.io/otel/exporters/prometheus" | ||||||
api "go.opentelemetry.io/otel/metric" | ||||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||||||
|
||||||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" | ||||||
|
||||||
"go.opentelemetry.io/otel/metric" | ||||||
) | ||||||
|
||||||
const ( | ||||||
|
@@ -60,8 +72,8 @@ var ( | |||||
TLSHandshakeTimeout: time.Minute, | ||||||
ExpectContinueTimeout: 30 * time.Second, | ||||||
} | ||||||
callClient = &http.Client{ | ||||||
Transport: transport, | ||||||
callClient = http.Client{ | ||||||
Transport: otelhttp.NewTransport(transport), | ||||||
Timeout: 30 * time.Second, | ||||||
} | ||||||
) | ||||||
|
@@ -80,6 +92,69 @@ type ReadCloser struct { | |||||
*bytes.Reader | ||||||
} | ||||||
|
||||||
var ( | ||||||
firstTokenLatencyMeasure metric.Float64Histogram | ||||||
nextTokenLatencyMeasure metric.Float64Histogram | ||||||
allTokenLatencyMeasure metric.Float64Histogram | ||||||
pipelineLatencyMeasure metric.Float64Histogram | ||||||
) | ||||||
|
||||||
func init() { | ||||||
|
||||||
// The exporter embeds a default OpenTelemetry Reader and | ||||||
// implements prometheus.Collector, allowing it to be used as | ||||||
// both a Reader and Collector. | ||||||
exporter, err := prometheus.New() | ||||||
if err != nil { | ||||||
log.Error(err, "metrics: cannot init prometheus collector") | ||||||
} | ||||||
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter)) | ||||||
otel.SetMeterProvider(provider) | ||||||
|
||||||
// ppalucki: Own metrics defintion bellow | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typos:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice catch! |
||||||
const meterName = "entrag-telemetry" | ||||||
meter := provider.Meter(meterName) | ||||||
|
||||||
firstTokenLatencyMeasure, err = meter.Float64Histogram( | ||||||
"llm.first.token.latency", | ||||||
metric.WithUnit("ms"), | ||||||
metric.WithDescription("Measures the duration of first token generation."), | ||||||
api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), | ||||||
) | ||||||
if err != nil { | ||||||
log.Error(err, "metrics: cannot register first token histogram measure") | ||||||
} | ||||||
nextTokenLatencyMeasure, err = meter.Float64Histogram( | ||||||
"llm.next.token.latency", | ||||||
metric.WithUnit("ms"), | ||||||
metric.WithDescription("Measures the duration of generating all but first tokens."), | ||||||
api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), | ||||||
) | ||||||
if err != nil { | ||||||
log.Error(err, "metrics: cannot register next token histogram measure") | ||||||
} | ||||||
|
||||||
allTokenLatencyMeasure, err = meter.Float64Histogram( | ||||||
"llm.all.token.latency", | ||||||
metric.WithUnit("ms"), | ||||||
metric.WithDescription("Measures the duration to generate response with all tokens."), | ||||||
api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), | ||||||
) | ||||||
if err != nil { | ||||||
log.Error(err, "metrics: cannot register all token histogram measure") | ||||||
} | ||||||
|
||||||
pipelineLatencyMeasure, err = meter.Float64Histogram( | ||||||
"llm.pipeline.latency", | ||||||
metric.WithUnit("ms"), | ||||||
metric.WithDescription("Measures the duration to going through pipeline steps until first token is being generated (including read data time from client)."), | ||||||
api.WithExplicitBucketBoundaries(1, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16364), | ||||||
) | ||||||
if err != nil { | ||||||
log.Error(err, "metrics: cannot register pipeline histogram measure") | ||||||
} | ||||||
Comment on lines
+118
to
+155
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a bit of duplication. Maybe this could iterate over slice like (untested code):
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice catch! |
||||||
} | ||||||
|
||||||
func (ReadCloser) Close() error { | ||||||
// Typically, you would release resources here, but for bytes.Reader, there's nothing to do. | ||||||
return nil | ||||||
|
@@ -536,6 +611,7 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) { | |||||
go func() { | ||||||
defer close(done) | ||||||
|
||||||
allTokensStartTime := time.Now() | ||||||
inputBytes, err := io.ReadAll(req.Body) | ||||||
if err != nil { | ||||||
log.Error(err, "failed to read request body") | ||||||
|
@@ -544,6 +620,9 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) { | |||||
} | ||||||
|
||||||
responseBody, statusCode, err := routeStep(defaultNodeName, *mcGraph, inputBytes, inputBytes, req.Header) | ||||||
|
||||||
pipelineLatencyMeasure.Record(ctx, float64(time.Since(allTokensStartTime))/float64(time.Millisecond)) | ||||||
|
||||||
if err != nil { | ||||||
log.Error(err, "failed to process request") | ||||||
w.Header().Set("Content-Type", "application/json") | ||||||
|
@@ -561,9 +640,22 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) { | |||||
}() | ||||||
|
||||||
w.Header().Set("Content-Type", "application/json") | ||||||
firstTokenCollected := false | ||||||
buffer := make([]byte, BufferSize) | ||||||
for { | ||||||
|
||||||
// measure time of reading another portion of response | ||||||
tokenStartTime := time.Now() | ||||||
n, err := responseBody.Read(buffer) | ||||||
elapsedTimeMilisecond := float64(time.Since(tokenStartTime)) / float64(time.Millisecond) | ||||||
|
||||||
if !firstTokenCollected { | ||||||
firstTokenCollected = true | ||||||
firstTokenLatencyMeasure.Record(ctx, elapsedTimeMilisecond) | ||||||
} else { | ||||||
nextTokenLatencyMeasure.Record(ctx, elapsedTimeMilisecond) | ||||||
} | ||||||
|
||||||
if err != nil && err != io.EOF { | ||||||
log.Error(err, "failed to read from response body") | ||||||
http.Error(w, "failed to read from response body", http.StatusInternalServerError) | ||||||
|
@@ -586,6 +678,10 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) { | |||||
return | ||||||
} | ||||||
} | ||||||
|
||||||
allTokensElapsedTimeMilisecond := float64(time.Since(allTokensStartTime)) / float64(time.Millisecond) | ||||||
allTokenLatencyMeasure.Record(ctx, allTokensElapsedTimeMilisecond) | ||||||
|
||||||
}() | ||||||
|
||||||
select { | ||||||
|
@@ -729,8 +825,23 @@ func handleMultipartError(writer *multipart.Writer, err error) { | |||||
|
||||||
func initializeRoutes() *http.ServeMux { | ||||||
mux := http.NewServeMux() | ||||||
mux.HandleFunc("/", mcGraphHandler) | ||||||
mux.HandleFunc("/dataprep", mcDataHandler) | ||||||
|
||||||
// Wrap connector handlers with otelhttp wrappers | ||||||
// "http.server.request.size" - Int64Counter - "Measures the size of HTTP request messages" (Incoming request bytes total) | ||||||
// "http.server.response.size" - Int64Counter - "Measures the size of HTTP response messages" (Incoming response bytes total) | ||||||
// "http.server.duration" - Float64histogram "Measures the duration of inbound HTTP requests." (Incoming end to end duration, milliseconds) | ||||||
handleFunc := func(pattern string, handlerFunc func(http.ResponseWriter, *http.Request), operation string) { | ||||||
handler := otelhttp.NewHandler(otelhttp.WithRouteTag(pattern, http.HandlerFunc(handlerFunc)), operation) | ||||||
mux.Handle(pattern, handler) | ||||||
} | ||||||
|
||||||
handleFunc("/", mcGraphHandler, "mcGraphHandler") | ||||||
handleFunc("/dataprep", mcDataHandler, "mcDataHandler") | ||||||
|
||||||
promHandler := promhttp.Handler() | ||||||
handleFunc("/metrics", promHandler.ServeHTTP, "metrics") | ||||||
log.Info("Metrics exposed on /metrics.") | ||||||
|
||||||
return mux | ||||||
} | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this would look nicer as: