Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upload benchmark lineages from SDK to studio #1164

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- Introduce `BenchmarkRepository`and `StudioBenchmarkRepository`
- Add `create_project` bool to `StudioClient.__init__()` to enable users to automatically create their Studio projects
- Add progressbar to the `Runner` to be able to track the `Run`
- Add `StudioClient.submit_benchmark_lineages` function and include it in `StudioClient.submit_benchmark_execution`

### Fixes
...
Expand Down
100 changes: 97 additions & 3 deletions src/intelligence_layer/connectors/studio/studio.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import json
import os
from collections import defaultdict, deque
Expand All @@ -8,7 +9,7 @@
from uuid import uuid4

import requests
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, RootModel
from requests.exceptions import ConnectionError, MissingSchema

from intelligence_layer.connectors import JsonSerializable
Expand All @@ -24,6 +25,8 @@

Input = TypeVar("Input", bound=PydanticSerializable)
ExpectedOutput = TypeVar("ExpectedOutput", bound=PydanticSerializable)
Output = TypeVar("Output", bound=PydanticSerializable)
Evaluation = TypeVar("Evaluation", bound=BaseModel, covariant=True)


class StudioProject(BaseModel):
Expand Down Expand Up @@ -140,6 +143,25 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]):
items: Sequence[StudioExample[Input, ExpectedOutput]]


class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]):
trace_id: str
input: Input
expected_output: ExpectedOutput
output: Output
example_metadata: Optional[dict[str, Any]] = None
evaluation: Any
run_latency: int
run_tokens: int


class PostBenchmarkLineagesRequest(RootModel[Sequence[BenchmarkLineage]]):
pass


class PostBenchmarkLineagesResponse(RootModel[Sequence[str]]):
pass


class StudioClient:
"""Client for communicating with Studio.

Expand Down Expand Up @@ -403,7 +425,7 @@ def get_dataset_examples(
if page is None:
break

def create_benchmark(
def submit_benchmark(
self,
dataset_id: str,
eval_logic: EvaluationLogicIdentifier,
Expand Down Expand Up @@ -449,7 +471,7 @@ def get_benchmark(
return None
return GetBenchmarkResponse.model_validate(response_text)

def create_benchmark_execution(
def submit_benchmark_execution(
self, benchmark_id: str, data: PostBenchmarkExecution
) -> str:
url = urljoin(
Expand All @@ -464,6 +486,78 @@ def create_benchmark_execution(
self._raise_for_status(response)
return str(response.json())

def submit_benchmark_lineages(
self,
benchmark_lineages: Sequence[BenchmarkLineage],
benchmark_id: str,
execution_id: str,
max_payload_size: int = 50 * 1024 * 1024,
MerlinKallenbornAA marked this conversation as resolved.
Show resolved Hide resolved
) -> PostBenchmarkLineagesResponse:
all_responses = []
remaining_lineages = list(benchmark_lineages)

converted_lineage_sizes = [
len(lineage.model_dump_json().encode("utf-8"))
for lineage in benchmark_lineages
]

while remaining_lineages:
batch = []
current_size = 0
# Build batch while checking size
for lineage, size in zip(
remaining_lineages, converted_lineage_sizes, strict=True
):
if current_size + size <= max_payload_size:
batch.append(lineage)
current_size += size
else:
break

if batch:
# Send batch
response = self._send_compressed_batch(
batch, benchmark_id, execution_id
)
all_responses.extend(response)

else: # Only reached if a lineage is too big for the request
print("Lineage exceeds maximum of upload size", lineage)
batch.append(lineage)
MerlinKallenbornAA marked this conversation as resolved.
Show resolved Hide resolved

remaining_lineages = remaining_lineages[len(batch) :]
converted_lineage_sizes = converted_lineage_sizes[len(batch) :]

return PostBenchmarkLineagesResponse(all_responses)

def _send_compressed_batch(
self, batch: list[BenchmarkLineage], benchmark_id: str, execution_id: str
) -> list[str]:
url = urljoin(
self.url,
f"/api/projects/{self.project_id}/evaluation/benchmarks/{benchmark_id}/executions/{execution_id}/lineages",
)

request_data = self._create_post_bechnmark_lineages_request(batch)
json_data = request_data.model_dump_json()
compressed_data = gzip.compress(json_data.encode("utf-8"))

headers = {**self._headers, "Content-Encoding": "gzip"}

response = requests.post(
url,
headers=headers,
data=compressed_data,
)

self._raise_for_status(response)
return response.json()

def _create_post_bechnmark_lineages_request(
self, benchmark_lineages: Sequence[BenchmarkLineage]
) -> PostBenchmarkLineagesRequest:
return PostBenchmarkLineagesRequest(root=benchmark_lineages)
MerlinKallenbornAA marked this conversation as resolved.
Show resolved Hide resolved

def _raise_for_status(self, response: requests.Response) -> None:
try:
response.raise_for_status()
Expand Down
56 changes: 52 additions & 4 deletions src/intelligence_layer/evaluation/benchmark/studio_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
from collections.abc import Sequence
from datetime import datetime
from http import HTTPStatus
from typing import Any, Optional
Expand All @@ -9,6 +10,7 @@

from intelligence_layer.connectors.studio.studio import (
AggregationLogicIdentifier,
BenchmarkLineage,
EvaluationLogicIdentifier,
PostBenchmarkExecution,
StudioClient,
Expand Down Expand Up @@ -39,6 +41,9 @@
from intelligence_layer.evaluation.evaluation.in_memory_evaluation_repository import (
InMemoryEvaluationRepository,
)
from intelligence_layer.evaluation.infrastructure.repository_navigator import (
EvaluationLineage,
)
from intelligence_layer.evaluation.run.in_memory_run_repository import (
InMemoryRunRepository,
)
Expand Down Expand Up @@ -135,17 +140,60 @@ def execute(
statistics=aggregation_overview.statistics.model_dump_json(),
)

benchmark_execution_id = self.client.create_benchmark_execution(
benchmark_execution_id = self.client.submit_benchmark_execution(
benchmark_id=self.id, data=data
)

evaluation_lineages = self.evaluator.evaluation_lineages(evaluation_overview.id)
evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)
trace_ids = []
for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"):
trace = lineage.tracers[0]
assert trace
self.client.submit_trace(trace.export_for_viewing())
trace_id = self.client.submit_trace(trace.export_for_viewing())
trace_ids.append(trace_id)

benchmark_lineages = self._create_benchmark_lineages(
eval_lineages=evaluation_lineages,
trace_ids=trace_ids,
)
self.client.submit_benchmark_lineages(
benchmark_lineages=benchmark_lineages,
execution_id=benchmark_execution_id,
benchmark_id=self.id,
)

return benchmark_execution_id

def _create_benchmark_lineages(
self,
eval_lineages: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
trace_ids: list[str],
) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]:
return [
self._create_benchmark_lineage(eval_lineage, trace_id)
for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True)
]

def _create_benchmark_lineage(
MerlinKallenbornAA marked this conversation as resolved.
Show resolved Hide resolved
self,
eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation],
trace_id: str,
) -> BenchmarkLineage:
return BenchmarkLineage(
trace_id=trace_id,
input=eval_lineage.example.input,
expected_output=eval_lineage.example.expected_output,
example_metadata=eval_lineage.example.metadata,
output=eval_lineage.outputs[0].output,
evaluation=eval_lineage.evaluation.result,
run_latency=0, # TODO: Implement this
run_tokens=0, # TODO: Implement this
)


class StudioBenchmarkRepository(BenchmarkRepository):
def __init__(self, studio_client: StudioClient):
Expand All @@ -161,7 +209,7 @@ def create_benchmark(
description: Optional[str] = None,
) -> StudioBenchmark:
try:
benchmark_id = self.client.create_benchmark(
benchmark_id = self.client.submit_benchmark(
dataset_id,
create_evaluation_logic_identifier(eval_logic),
create_aggregation_logic_identifier(aggregation_logic),
Expand Down
Loading