Skip to content

Commit

Permalink
cleaned up logging, exception handling aorund streaming disconnects, …
Browse files Browse the repository at this point in the history
…fixed bug aorund completion tokens missing on embbedding endpoints for logging workers
  • Loading branch information
anevjes committed Jul 26, 2024
1 parent e022dd8 commit 3619010
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 63 deletions.
83 changes: 44 additions & 39 deletions aisentry/facade/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import uuid
from datetime import datetime
from dapr.clients import DaprClient
import aiohttp
import httpcore
from enum import Enum
from typing import Tuple
from quart import Quart, jsonify, request, make_response
Expand All @@ -12,6 +12,7 @@
from urllib.parse import urljoin
from datetime import datetime, timezone
import httpx
from requests.exceptions import HTTPError
import jwt
import json
from dotenv import load_dotenv
Expand Down Expand Up @@ -107,15 +108,17 @@ async def catch_all(path):
logger.info(f"ai-sentry adapters used: {ai_sentry_adapters}")

ai_sentry_adapters_json = json.loads(ai_sentry_adapters)

logger.info(f"Selected pool name: {pool_name}")

# Create a new set of headers that exclude the ai-sentry specific headers which we will forward onto openAI endpoints
exclude_headers = ['host', 'content-length']+list(ai_sentry_headers_used)
openAI_request_headers = {k: v for k, v in original_headers.items() if k.lower() not in exclude_headers}

pool_endpoints = select_pool(open_ai_endpoint_availability_stats, pool_name)
logger.info(f"Selected pool: {pool_endpoints}")

#strip api-key value if it is in use
pool_endpoints_without_api_key = [{k: v for k, v in endpoint.items() if k != 'api-key'} for endpoint in pool_endpoints]
logger.info(f"Selected pool: {pool_endpoints_without_api_key}")

while not request_processed and current_retry <= max_retries:
logger.info(f"Processing request retry#: {current_retry}")
Expand Down Expand Up @@ -178,20 +181,32 @@ async def catch_all(path):
# potentially recieve a timeout or a HTTP > 499
response.raise_for_status()
current_retry += 1

except httpcore.ConnectTimeout as timeout_err:
logger.error(f"Connection timed out: {timeout_err}")
return jsonify(error=str(timeout_err)), 500

except HTTPError as http_err:
logger.info(f"HTTP error occurred: {http_err}")
if http_err.response.status_code == 429: # 429 is the status code for Too Many Requests
logger.info(f"Received 429 response from endpoint, retrying next available endpoint")
current_retry += 1
endpoint_info["connection_errors_count"]+=1
request_processed = False
continue

except Exception as e:
# Connection Failures
logger.error(f"An unexpected error occurred: {e}")
# increment connection errors count for the endpoint
endpoint_info["connection_errors_count"]+=1
current_retry += 1
continue

if response.status_code == 429:
logger.info(f"Received 429 response from endpoint, retrying next avilable endpoint")
current_retry += 1
continue
# Connection Failures
logger.error(f"An unexpected error occurred: {e}")

if "429 Too Many Requests" in str(e):
logger.info(f"Received 429 response from endpoint, retrying next available endpoint")
current_retry += 1
endpoint_info["connection_errors_count"]+=1
request_processed = False
continue

return jsonify(error=str(e)), 500


@stream_with_context
Expand All @@ -204,7 +219,7 @@ async def stream_response(response):
response_stream = []
global model_name
global openai_response_id

async for line in response.aiter_lines():
yield f"{line}\r\n"
if line.startswith("data: "):
Expand All @@ -223,6 +238,7 @@ async def stream_response(response):
if delta.get('content') is not None:
content_buffered.append(delta['content'])


content_buffered_string = "".join(content_buffered)

# Calculate the token count using tiktok library
Expand All @@ -234,13 +250,15 @@ async def stream_response(response):

logger.info(f"Streamed completion total Token count: {streaming_completion_token_count}")
logger.info(f"Streamed prompt total Token count: {streaming_prompt_token_count}")

proxy_streaming_response = await make_response( stream_response(response))
proxy_streaming_response_body = await proxy_streaming_response.data
proxy_streaming_response.timeout = None
proxy_streaming_response.status_code = response.status_code
proxy_streaming_response.headers = {k: str(v) for k, v in response.headers.items()}

try:
proxy_streaming_response = await make_response( stream_response(response))
proxy_streaming_response_body = await proxy_streaming_response.data
proxy_streaming_response.timeout = None
proxy_streaming_response.status_code = response.status_code
proxy_streaming_response.headers = {k: str(v) for k, v in response.headers.items()}
except Exception as e:
logger.error(f"An error occurred while streaming response: {e}")
return jsonify(error=str(e)), 500

# Record the stats for openAi endpoints
if proxy_streaming_response.headers.get("x-ratelimit-remaining-tokens") is not None:
Expand All @@ -251,6 +269,7 @@ async def stream_response(response):
if proxy_streaming_response.headers.get("x-ratelimit-remaining-requests") is not None:
endpoint_info["x-ratelimit-remaining-requests"]=response.headers["x-ratelimit-remaining-requests"]
else:
endpoint_info["x-ratelimit-remaining-tokens"]=0
endpoint_info["x-ratelimit-remaining-requests"]=0

utc_now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
Expand Down Expand Up @@ -300,19 +319,6 @@ async def stream_response(response):
try:
response = await client.send(req, stream=False)
response.raise_for_status()
# except httpx.TimeoutException:
# logger.error("A TimeoutException occurred.")
# except httpx.ConnectTimeout:
# logger.error("A ConnectTimeout occurred.")
# except httpx.ReadTimeout:
# logger.error("A ReadTimeout occurred.")
# except httpx.HTTPStatusError:
# logger.error("A TooManyRequests occurred.")
# #increment connection errors count for the endpoint
# endpoint_info["connection_errors_count"]+=1
# current_retry += 1
# request_processed = False
# break

except Exception as e:
# Connection Failures
Expand All @@ -325,12 +331,13 @@ async def stream_response(response):
request_processed = False
continue

# If response is a 429 Incremet retry count - to pick next aviable endpoint
# If response is a 429 Increment retry count - to pick next aviable endpoint
if response.status_code == 429:

logger.info(f"Received 429 response from endpoint, retrying next available endpoint")
#endpoint_info["x-retry-after-ms"]=response.headers["x-retry-after-ms"]
current_retry += 1
endpoint_info["connection_errors_count"]+=1
request_processed = False
continue

Expand Down Expand Up @@ -396,6 +403,4 @@ async def stream_response(response):

return proxy_response

return jsonify(message=f"Request failed to process. Attempted to run: {current_retry}, against AI endpoint configuration unsucessfully"), 500


return jsonify(message=f"Request failed to process. Attempted to run: {current_retry}, against AI endpoint configuration unsucessfully"), 500
14 changes: 7 additions & 7 deletions aisentry/utils/ai_sentry_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def select_pool(pool_endpoints, pool_name):
return pool_endpoints.get(pool_name, None)

async def getNextAvailableEndpointInfo(open_ai_endpoint_availability_stats):
logger.info(f"open_ai_endpoint_availability_stats: {open_ai_endpoint_availability_stats}")
remaining_requests = sorted(open_ai_endpoint_availability_stats ,key=lambda x: int(x['x-ratelimit-remaining-requests'], reverse=True))[0]
remaining_tokens = sorted(open_ai_endpoint_availability_stats ,key=lambda x: int(x['x-ratelimit-remaining-tokens'], reverse=True))[0]
logger.debug(f"open_ai_endpoint_availability_stats: {open_ai_endpoint_availability_stats}")
remaining_requests = sorted(open_ai_endpoint_availability_stats ,key=lambda x: int(x['x-ratelimit-remaining-requests']), reverse=True)[0]
remaining_tokens = sorted(open_ai_endpoint_availability_stats ,key=lambda x: int(x['x-ratelimit-remaining-tokens']), reverse=True)[0]
logger.info(f"Next available endpoint: {remaining_requests['url']}")

# Add a new key 'max_limit' to each dictionary that is the maximum of 'x-ratelimit-remaining-requests' and 'x-ratelimit-remaining-tokens'
Expand Down Expand Up @@ -143,9 +143,9 @@ def to_dict(self):
'model': self.model,
'timestamp': self.timestamp,
'ProductId': self.product_id,
'promptTokens': self.usage['prompt_tokens'],
'responseTokens': self.usage['completion_tokens'],
'totalTokens': self.usage['total_tokens'],
'promptTokens': self.usage['prompt_tokens'] if self.usage is not None else None,
'responseTokens': self.usage.get('completion_tokens', None) if self.usage is not None else None,
'totalTokens': self.usage.get('total_tokens', None) if self.usage is not None else None ,
'month_year': self.month_year
}

Expand Down Expand Up @@ -179,7 +179,7 @@ def to_dict(self):
}

class Usage:
def __init__(self, completion_tokens, prompt_tokens, total_tokens):
def __init__(self, completion_tokens=None, prompt_tokens=None, total_tokens=None):
self.completion_tokens = completion_tokens
self.prompt_tokens = prompt_tokens
self.total_tokens = total_tokens
Expand Down
14 changes: 9 additions & 5 deletions aisentry/worker/cosmos_logger/cosmos_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,28 @@ async def oairequests_subscriber():

logger.debug(f"Received a request to log data{data}")

response_body = data.get('response_body')
response_body = data.get('response_body', None)

consumer_id=data['sentry_ai_headers'].get('ai-sentry-consumer')
model_name=data.get('model','null')
openai_response_id=data.get('openai_response_id','null')
consumer_id=data['sentry_ai_headers'].get('ai-sentry-consumer', None)
log_level=data['sentry_ai_headers'].get('ai-sentry-log-level', None)
model_name=data.get('model',None)
openai_response_id=data.get('openai_response_id',None)
date = datetime.datetime.fromisoformat(data.get('date_time_utc'))
month_date = date.strftime("%m_%Y")

logger.info('Consumer Product Id: %s', consumer_id)
logger.info('LogId: %s', model_name)
logger.info('LogLevel: %s', log_level)

data['LogId']=f"{model_name}_{consumer_id}_{month_date}"
data['id']=openai_response_id
data['logLevel']=log_level

output_binding_data = json.dumps(data)

data = {key.lower(): value for key, value in data.items()}

headers = data['sentry_ai_headers']
headers = data.get('sentry_ai_headers', None)


if headers['ai-sentry-log-level'] == 'PII_STRIPPING_ENABLED':
Expand Down
10 changes: 5 additions & 5 deletions aisentry/worker/usage_summary/usage_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ async def oairequests_subscriber():

logger.debug(f"Received a request to log data{data}")

consumer_id=data['sentry_ai_headers'].get('ai-sentry-consumer')
model_name=data['model']
usage_info=data['usage']
openai_response_id=data['openai_response_id']
date = datetime.datetime.fromisoformat(data['date_time_utc'])
consumer_id=data['sentry_ai_headers'].get('ai-sentry-consumer', None)
model_name = data.get('model', None)
usage_info=data.get('usage', None)
openai_response_id=data.get('openai_response_id', None)
date = datetime.datetime.fromisoformat(data.get('date_time_utc', None))
month_date = date.strftime("%m_%Y")

logger.info('Consumer Product Id: %s', consumer_id)
Expand Down
Binary file added content/images/cosmosdb_request_logs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified content/images/cosmosdb_summary_logs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
26 changes: 26 additions & 0 deletions tests/http/adapter_test-ai-sentry.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
POST http://4.200.49.22/openai/deployments/gpt-4o/chat/completions?api-version=2024-02-15-preview HTTP/1.1
ai-sentry-consumer: Product-car-review
ai-sentry-log-level: PII_STRIPPING_ENABLED
ai-sentry-backend-pool: pool1
ai-sentry-adapters: ["SampleApiRequestTransformer"]
Content-Type: application/json

{
"messages": [
{
"role": "system",
"content": "you are a car reviewer studying japenese cars"
},
{
"role":"user",
"content":"Write a review on toyota yaris gr"
}
],
"stream":true,
"max_tokens": 800,
"temperature": 0.7,
"frequency_penalty": 0,
"presence_penalty": 0,
"top_p": 0.95,
"stop": null
}
22 changes: 22 additions & 0 deletions tests/http/adapter_test-apim.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
POST https://anevjes-apim-001.azure-api.net/openai-2024-02-01/deployments/gpt-4o/chat/completions?api-version=2024-02-15-preview HTTP/1.1
Content-Type: application/json

{
"messages": [
{
"role": "system",
"content": "you are a car reviewer studying japenese cars"
},
{
"role":"user",
"content":"Write a review on honda Integra DC2 Type R"
}
],
"stream":false,
"max_tokens": 800,
"temperature": 0.7,
"frequency_penalty": 0,
"presence_penalty": 0,
"top_p": 0.95,
"stop": null
}
5 changes: 3 additions & 2 deletions tests/http/non_streaming_embedding.http
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
@resource_name = localhost:6124
@resource_name = 4.200.52.148
@deployment_name = text-embedding-ada-002
@api_key = {{$dotenv AOAI_API_KEY}}

###‰
POST http://{{resource_name}}/openai/deployments/{{deployment_name}}/embeddings?api-version=2024-02-01 HTTP/1.1
POST http://{{resource_name}}/openai/deployments/{{deployment_name}}/embeddings?api-version=2024-06-01 HTTP/1.1
Content-Type: application/json
api-key: {{api_key}}
ai-sentry-backend-pool:pool1
ai-sentry-consumer:embedding-automated-test1
ai-sentry-log-level:PII_STRIPPING_ENABLED
ai-sentry-adapters: []

{
"input": "Sample Document goes here"
Expand Down
22 changes: 22 additions & 0 deletions tests/loadTests/embeddings/locust.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from locust import HttpUser, task, between

class EmbeddingTestUser(HttpUser):
wait_time = between(1, 5) # Simulated users will wait 1-5 seconds between tasks

@task
def post_embedding(self):
headers = {
"Content-Type": "application/json",
"api-key": "your_api_key_here", # Replace with your actual API key
"ai-sentry-backend-pool": "pool1",
"ai-sentry-consumer": "embedding-automated-test1",
"ai-sentry-log-level": "PII_STRIPPING_ENABLED",
"ai-sentry-adapters": "[]"
}
payload = {
"input": "Sample Document goes here"
}
self.client.post("/openai/deployments/text-embedding-ada-002/embeddings?api-version=2024-06-01", json=payload, headers=headers)

# Note: Ensure you replace "your_api_key_here" with the actual API key.
# You might also need to adjust the host in the Locust command-line or within the script if it's dynamic.
10 changes: 6 additions & 4 deletions tests/loadTests/streaming/locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

load_dotenv(".env", override=True)

azure_endpoint = os.getenv("azure_endpoint", "http://ai-sentry-url/"),
api_key=os.getenv("api_key", "defaultkey"),
azure_endpoint = os.getenv("azure_endpoint", "http://4.195.10.69/"),
#api_key=os.getenv("api_key", "defaultkey"),
api_version=os.getenv("api_version", "2023-07-01-preview"),
aoai_deployment_name = os.getenv("aoai_deployment_name", "chat")
aoai_deployment_name = os.getenv("aoai_deployment_name", "gpt4o")

# Non-Streaming Load Test
class OpenAIUser(HttpUser):
Expand All @@ -18,7 +18,9 @@ class OpenAIUser(HttpUser):
"Content-Type": "application/json",
"ai-sentry-consumer": "locustloadtest",
"ai-sentry-log-level": "PII_STRIPPING_ENABLED",
"api-key": "\"{}\"".format(api_key)
"ai-sentry-backend-pool": "pool1",
"ai-sentry-adapters": "[]"
#"api-key": "\"{}\"".format(api_key)
}

body = {
Expand Down
2 changes: 1 addition & 1 deletion tests/sdk/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
client = AzureOpenAI(
api_key = os.getenv("api_key"),
api_version = "2024-02-01",
azure_endpoint = os.getenv("azure_endpoint", "http://localhost:6124/"),
azure_endpoint = os.getenv("azure_endpoint", "http://4.147.128.191/openai"),
)

response = client.embeddings.create(
Expand Down

0 comments on commit 3619010

Please sign in to comment.