Skip to content

Commit

Permalink
feat: show latency and token count in benchmark execution detail view (
Browse files Browse the repository at this point in the history
…#1185)

* feat: add per-lineage generated token count & run duration
* feat: add average successful latency and token count to benchmark execution
* refactor: Change internal creation functions to include calculated token and latency values

---------

Co-authored-by: Merlin Kallenborn <[email protected]>
  • Loading branch information
NiklasKoehneckeAA and MerlinKallenbornAA authored Dec 18, 2024
1 parent 45fae03 commit b4212d2
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 32 deletions.
6 changes: 3 additions & 3 deletions src/intelligence_layer/connectors/studio/studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ class PostBenchmarkExecution(BaseModel):
run_end: datetime
run_successful_count: int
run_failed_count: int
run_success_avg_latency: int
run_success_avg_token_count: int
run_success_avg_latency: float
run_success_avg_token_count: float
# Eval Overview
eval_start: datetime
eval_end: datetime
Expand All @@ -143,7 +143,7 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]):
items: Sequence[StudioExample[Input, ExpectedOutput]]


class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]):
class BenchmarkLineage(BaseModel, Generic[Input, ExpectedOutput, Output, Evaluation]):
trace_id: str
input: Input
expected_output: ExpectedOutput
Expand Down
104 changes: 88 additions & 16 deletions src/intelligence_layer/evaluation/benchmark/studio_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
from collections.abc import Sequence
from datetime import datetime
from http import HTTPStatus
Expand All @@ -16,6 +17,7 @@
)
from intelligence_layer.core import Input, Output
from intelligence_layer.core.task import Task
from intelligence_layer.core.tracer.tracer import ExportedSpan
from intelligence_layer.evaluation.aggregation.aggregator import (
AggregationLogic,
Aggregator,
Expand All @@ -29,6 +31,10 @@
BenchmarkRepository,
)
from intelligence_layer.evaluation.benchmark.get_code import get_source_notebook_safe
from intelligence_layer.evaluation.benchmark.trace_information import (
extract_latency_from_trace,
extract_token_count_from_trace,
)
from intelligence_layer.evaluation.dataset.domain import ExpectedOutput
from intelligence_layer.evaluation.dataset.studio_dataset_repository import (
StudioDatasetRepository,
Expand Down Expand Up @@ -118,7 +124,30 @@ def execute(

end = datetime.now()

data = PostBenchmarkExecution(
evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)

run_traces = [
self._trace_from_lineage(lineage) for lineage in evaluation_lineages
]
tokens_per_trace = [
extract_token_count_from_trace(trace) for trace in run_traces
]
latency_per_trace = [extract_latency_from_trace(trace) for trace in run_traces]

tokens_per_successful_trace, latency_per_successful_trace = (
self._filter_for_succesful_runs(
(tokens_per_trace, latency_per_trace),
source_lineage_list=evaluation_lineages,
run_id=run_overview.id,
)
)

def average_or_zero(list: list) -> float:
return sum(list) / len(list) if len(list) > 0 else 0

benchmark_execution_data = PostBenchmarkExecution(
name=name,
description=description,
labels=labels,
Expand All @@ -129,8 +158,8 @@ def execute(
run_end=run_overview.end,
run_successful_count=run_overview.successful_example_count,
run_failed_count=run_overview.failed_example_count,
run_success_avg_latency=0, # TODO: Implement this
run_success_avg_token_count=0, # TODO: Implement this
run_success_avg_latency=average_or_zero(latency_per_successful_trace),
run_success_avg_token_count=average_or_zero(tokens_per_successful_trace),
eval_start=evaluation_overview.start_date,
eval_end=evaluation_overview.end_date,
eval_successful_count=evaluation_overview.successful_evaluation_count,
Expand All @@ -141,23 +170,21 @@ def execute(
)

benchmark_execution_id = self.client.submit_benchmark_execution(
benchmark_id=self.id, data=data
benchmark_id=self.id, data=benchmark_execution_data
)

evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)
trace_ids = []
for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"):
trace = lineage.tracers[0]
assert trace
trace_id = self.client.submit_trace(trace.export_for_viewing())
for trace in tqdm(run_traces, desc="Submitting traces to Studio"):
trace_id = self.client.submit_trace(trace)
trace_ids.append(trace_id)

benchmark_lineages = self._create_benchmark_lineages(
eval_lineages=evaluation_lineages,
trace_ids=trace_ids,
latencies_per_trace=latency_per_trace,
tokens_per_trace=tokens_per_trace,
)

self.client.submit_benchmark_lineages(
benchmark_lineages=benchmark_lineages,
execution_id=benchmark_execution_id,
Expand All @@ -166,22 +193,67 @@ def execute(

return benchmark_execution_id

def _filter_for_succesful_runs(
self,
lists_to_filter: tuple[list, ...],
source_lineage_list: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
run_id: str,
) -> tuple[list, ...]:
"""This method assumes that lists_to_filter and source_lineage_list are all equal length."""
failed_example_output_ids = [
example_output.example_id
for example_output in self.run_repository.failed_example_outputs(
run_id=run_id, output_type=self.evaluator.output_type()
)
]

is_successful_run = [
lineage.example.id not in failed_example_output_ids
for lineage in source_lineage_list
]
return tuple(
list(itertools.compress(sublist, is_successful_run))
for sublist in lists_to_filter
)

def _trace_from_lineage(
self, eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
) -> Sequence[ExportedSpan]:
# since we have 1 output only in this scenario, we expected to have exactly 1 tracer
trace = eval_lineage.tracers[0]
assert trace, "eval lineages always should have at least 1 tracer"
return trace.export_for_viewing()

def _create_benchmark_lineages(
self,
eval_lineages: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
trace_ids: list[str],
) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]:
latencies_per_trace: list[int],
tokens_per_trace: list[int],
) -> Sequence[BenchmarkLineage[Input, ExpectedOutput, Output, Evaluation]]:
return [
self._create_benchmark_lineage(eval_lineage, trace_id)
for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True)
self._create_benchmark_lineage(
eval_lineage, trace_id, run_latency, run_tokens
)
for eval_lineage, trace_id, run_latency, run_tokens in zip(
eval_lineages,
trace_ids,
latencies_per_trace,
tokens_per_trace,
strict=True,
)
]

def _create_benchmark_lineage(
self,
eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation],
trace_id: str,
run_latency: int,
run_tokens: int,
) -> BenchmarkLineage:
return BenchmarkLineage(
trace_id=trace_id,
Expand All @@ -190,8 +262,8 @@ def _create_benchmark_lineage(
example_metadata=eval_lineage.example.metadata,
output=eval_lineage.outputs[0].output,
evaluation=eval_lineage.evaluation.result,
run_latency=0, # TODO: Implement this
run_tokens=0, # TODO: Implement this
run_latency=run_latency,
run_tokens=run_tokens,
)


Expand Down
66 changes: 66 additions & 0 deletions src/intelligence_layer/evaluation/benchmark/trace_information.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from collections.abc import Sequence
from datetime import timedelta
from typing import cast

from aleph_alpha_client import CompletionResponse

from intelligence_layer.core import ExportedSpan
from intelligence_layer.core.model import _Complete
from intelligence_layer.core.tracer.tracer import SpanType


def _get_root(trace: Sequence[ExportedSpan]) -> ExportedSpan | None:
root_spans = [span for span in trace if span.parent_id is None]
if len(root_spans) != 1:
return None
return root_spans[0]


def extract_latency_from_trace(trace: Sequence[ExportedSpan]) -> int:
"""Extract the total duration of a given trace based on its root trace.
Args:
trace: trace to analyze
Returns:
The duration of the trace in microseconds
"""
root_span = _get_root(trace)
if root_span is None:
raise ValueError("No root span found in the trace")
latency = (root_span.end_time - root_span.start_time) / timedelta(microseconds=1)
return int(latency)


def _is_complete_request(span: ExportedSpan) -> bool:
# Assuming that LLM requests have a specific name or attribute
return span.name == _Complete.__name__


def _extract_tokens_from_complete_request(span: ExportedSpan) -> int:
if not hasattr(span.attributes, "output"):
raise ValueError(
"Function expects a complete span with attributes.output. Output was not present."
)
completion_output = cast(CompletionResponse, span.attributes.output)
return completion_output.num_tokens_generated


def extract_token_count_from_trace(trace: Sequence[ExportedSpan]) -> int:
"""Extract the number of tokens generated in a trace based on its completion requests.
Note: Does not support traces of streamed responses.
Args:
trace: trace to analyze.
Returns:
The sum of newly generated tokens across all spans in the given trace.
"""
token_count = 0
for span in trace:
if span.attributes.type != SpanType.TASK_SPAN:
continue
if _is_complete_request(span):
token_count += _extract_tokens_from_complete_request(span)
return token_count
Loading

0 comments on commit b4212d2

Please sign in to comment.