Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upload benchmark lineages from SDK to studio #1164

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Introduce `BenchmarkRepository`and `StudioBenchmarkRepository`
- Add `create_project` bool to `StudioClient.__init__()` to enable users to automatically create their Studio projects
- Add progressbar to the `Runner` to be able to track the `Run`
- Add `StudioClient.submit_benchmark_lineages` function and include it in `StudioClient.submit_benchmark_execution`

### Fixes
...
Expand Down
133 changes: 130 additions & 3 deletions src/intelligence_layer/connectors/studio/studio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import json
import os
from collections import defaultdict, deque
Expand All @@ -8,7 +9,7 @@
from uuid import uuid4

import requests
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, RootModel
from requests.exceptions import ConnectionError, MissingSchema

from intelligence_layer.connectors import JsonSerializable
Expand All @@ -24,6 +25,8 @@

Input = TypeVar("Input", bound=PydanticSerializable)
ExpectedOutput = TypeVar("ExpectedOutput", bound=PydanticSerializable)
Output = TypeVar("Output", bound=PydanticSerializable)
Evaluation = TypeVar("Evaluation", bound=BaseModel, covariant=True)


class StudioProject(BaseModel):
Expand Down Expand Up @@ -140,6 +143,38 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]):
items: Sequence[StudioExample[Input, ExpectedOutput]]


class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]):
trace_id: str
input: Input
expected_output: ExpectedOutput
output: Output
example_metadata: Optional[dict[str, Any]] = None
evaluation: Any
run_latency: int
run_tokens: int


class PostBenchmarkLineagesRequest(RootModel[Sequence[BenchmarkLineage]]):
pass


class PostBenchmarkLineagesResponse(RootModel[Sequence[str]]):
pass


class GetBenchmarkLineageResponse(BaseModel):
id: str
trace_id: str
benchmark_execution_id: str
input: Any
expected_output: Any
example_metadata: Optional[dict[str, Any]] = None
output: Any
evaluation: Any
run_latency: int
run_tokens: int


class StudioClient:
"""Client for communicating with Studio.

Expand Down Expand Up @@ -403,7 +438,7 @@ def get_dataset_examples(
if page is None:
break

def create_benchmark(
def submit_benchmark(
self,
dataset_id: str,
eval_logic: EvaluationLogicIdentifier,
Expand Down Expand Up @@ -449,7 +484,7 @@ def get_benchmark(
return None
return GetBenchmarkResponse.model_validate(response_text)

def create_benchmark_execution(
def submit_benchmark_execution(
self, benchmark_id: str, data: PostBenchmarkExecution
) -> str:
url = urljoin(
Expand All @@ -464,6 +499,98 @@ def create_benchmark_execution(
self._raise_for_status(response)
return str(response.json())

def submit_benchmark_lineages(
self,
benchmark_lineages: Sequence[BenchmarkLineage],
benchmark_id: str,
execution_id: str,
max_payload_size: int = 50
* 1024
* 1024, # Maximum request size handled by Studio
) -> PostBenchmarkLineagesResponse:
"""Submit benchmark lineages in batches to avoid exceeding the maximum payload size.

Args:
benchmark_lineages: List of :class: `BenchmarkLineages` to submit.
benchmark_id: ID of the benchmark.
execution_id: ID of the execution.
max_payload_size: Maximum size of the payload in bytes. Defaults to 50MB.

Returns:
Response containing the results of the submissions.
"""
all_responses = []
remaining_lineages = list(benchmark_lineages)
lineage_sizes = [
len(lineage.model_dump_json().encode("utf-8"))
for lineage in benchmark_lineages
]

while remaining_lineages:
batch = []
current_size = 0
# Build batch while checking size
for lineage, size in zip(remaining_lineages, lineage_sizes, strict=True):
if current_size + size <= max_payload_size:
batch.append(lineage)
current_size += size
else:
break

if batch:
# Send batch
response = self._send_compressed_batch(
batch, benchmark_id, execution_id
)
all_responses.extend(response)

else: # Only reached if a lineage is too big for the request
print("Lineage exceeds maximum of upload size", lineage)
batch.append(lineage)
remaining_lineages = remaining_lineages[len(batch) :]
lineage_sizes = lineage_sizes[len(batch) :]

return PostBenchmarkLineagesResponse(all_responses)

def get_benchmark_lineage(
self, benchmark_id: str, execution_id: str, lineage_id: str
) -> GetBenchmarkLineageResponse | None:
url = urljoin(
self.url,
f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages/{lineage_id}",
)
response = requests.get(
url,
headers=self._headers,
)
self._raise_for_status(response)
response_text = response.json()
if response_text is None:
return None
return GetBenchmarkLineageResponse.model_validate(response_text)

def _send_compressed_batch(
self, batch: list[BenchmarkLineage], benchmark_id: str, execution_id: str
) -> list[str]:
url = urljoin(
self.url,
f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages",
)

json_data = PostBenchmarkLineagesRequest(root=batch).model_dump_json()
compressed_data = gzip.compress(json_data.encode("utf-8"))

headers = {**self._headers, "Content-Encoding": "gzip"}

response = requests.post(
url,
headers=headers,
data=compressed_data,
)

self._raise_for_status(response)
return response.json()

def _raise_for_status(self, response: requests.Response) -> None:
try:
response.raise_for_status()
Expand Down
56 changes: 52 additions & 4 deletions src/intelligence_layer/evaluation/benchmark/studio_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
from collections.abc import Sequence
from datetime import datetime
from http import HTTPStatus
from typing import Any, Optional
Expand All @@ -9,6 +10,7 @@

from intelligence_layer.connectors.studio.studio import (
AggregationLogicIdentifier,
BenchmarkLineage,
EvaluationLogicIdentifier,
PostBenchmarkExecution,
StudioClient,
Expand Down Expand Up @@ -39,6 +41,9 @@
from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import (
InMemoryEvaluationRepository,
)
from intelligence_layer.evaluation.infrastructure.repository_navigator import (
EvaluationLineage,
)
from intelligence_layer.evaluation.run.in_memory_run_repository import (
InMemoryRunRepository,
)
Expand Down Expand Up @@ -135,17 +140,60 @@ def execute(
statistics=aggregation_overview.statistics.model_dump_json(),
)

benchmark_execution_id = self.client.create_benchmark_execution(
benchmark_execution_id = self.client.submit_benchmark_execution(
benchmark_id=self.id, data=data
)

evaluation_lineages = self.evaluator.evaluation_lineages(evaluation_overview.id)
evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)
trace_ids = []
for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"):
trace = lineage.tracers[0]
assert trace
self.client.submit_trace(trace.export_for_viewing())
trace_id = self.client.submit_trace(trace.export_for_viewing())
trace_ids.append(trace_id)

benchmark_lineages = self._create_benchmark_lineages(
eval_lineages=evaluation_lineages,
trace_ids=trace_ids,
)
self.client.submit_benchmark_lineages(
benchmark_lineages=benchmark_lineages,
execution_id=benchmark_execution_id,
benchmark_id=self.id,
)

return benchmark_execution_id

def _create_benchmark_lineages(
self,
eval_lineages: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
trace_ids: list[str],
) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]:
return [
self._create_benchmark_lineage(eval_lineage, trace_id)
for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True)
]

def _create_benchmark_lineage(
MerlinKallenbornAA marked this conversation as resolved.
Show resolved Hide resolved
self,
eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation],
trace_id: str,
) -> BenchmarkLineage:
return BenchmarkLineage(
trace_id=trace_id,
input=eval_lineage.example.input,
expected_output=eval_lineage.example.expected_output,
example_metadata=eval_lineage.example.metadata,
output=eval_lineage.outputs[0].output,
evaluation=eval_lineage.evaluation.result,
run_latency=0, # TODO: Implement this
run_tokens=0, # TODO: Implement this
)


class StudioBenchmarkRepository(BenchmarkRepository):
def __init__(self, studio_client: StudioClient):
Expand All @@ -161,7 +209,7 @@ def create_benchmark(
description: Optional[str] = None,
) -> StudioBenchmark:
try:
benchmark_id = self.client.create_benchmark(
benchmark_id = self.client.submit_benchmark(
dataset_id,
create_evaluation_logic_identifier(eval_logic),
create_aggregation_logic_identifier(aggregation_logic),
Expand Down
Loading