Skip to content

Commit

Permalink
[Opik-534] Implement TLS/SSL configuration (#827)
Browse files Browse the repository at this point in the history
* Implement configuration for TLS certificate

* Fix lint errors

* Add retries for anthropic tests if anthropic raises errors due to overloaded services

* Fix lint errors

* Add internal server error to retry list

* Adjust retrying rule

* Fix lint errors

* Increase multiplier in retry rule from 1 to 2

* Add missing return type hint

* Update wait rule

* Update model name in anthropic tests
  • Loading branch information
alexkuzmik authored Dec 5, 2024
1 parent 326a3e5 commit 3ebc00d
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 25 deletions.
8 changes: 7 additions & 1 deletion sdks/python/src/opik/api_objects/opik_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def __init__(
base_url=config_.url_override,
workers=config_.background_workers,
api_key=config_.api_key,
check_tls_certificate=config_.check_tls_certificate,
use_batching=_use_batching,
)
atexit.register(self.end, timeout=self._flush_timeout)
Expand All @@ -83,9 +84,14 @@ def _initialize_streamer(
base_url: str,
workers: int,
api_key: Optional[str],
check_tls_certificate: bool,
use_batching: bool,
) -> None:
httpx_client_ = httpx_client.get(workspace=self._workspace, api_key=api_key)
httpx_client_ = httpx_client.get(
workspace=self._workspace,
api_key=api_key,
check_tls_certificate=check_tls_certificate,
)
self._rest_client = rest_api_client.OpikApi(
base_url=base_url,
httpx_client=httpx_client_,
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/src/opik/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ def settings_customise_sources(
If enabled, tests decorated with `llm_unit` will log data to Opik experiments
"""

check_tls_certificate: bool = True
"""
If enabled, TLS verification is enabled for all HTTP requests.
"""

@property
def config_file_fullpath(self) -> pathlib.Path:
config_file_path = os.getenv("OPIK_CONFIG_PATH", CONFIG_FILE_PATH_DEFAULT)
Expand Down
25 changes: 18 additions & 7 deletions sdks/python/src/opik/configurator/opik_rest_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@
import httpx

from opik.exceptions import ConfigurationError
from opik import url_helpers
from opik import url_helpers, config, httpx_client

LOGGER = logging.getLogger(__name__)

HEALTH_CHECK_TIMEOUT: Final[float] = 1.0


def _get_httpx_client(api_key: Optional[str] = None) -> httpx.Client:
config_ = config.OpikConfig()
client = httpx_client.get(
workspace=None,
api_key=api_key,
check_tls_certificate=config_.check_tls_certificate,
)

return client


def is_instance_active(url: str) -> bool:
"""
Returns True if the given Opik URL responds to an HTTP GET request.
Expand All @@ -22,8 +33,10 @@ def is_instance_active(url: str) -> bool:
bool: True if the instance responds with HTTP status 200, otherwise False.
"""
try:
with httpx.Client(timeout=HEALTH_CHECK_TIMEOUT) as http_client:
response = http_client.get(url=url_helpers.get_is_alive_ping_url(url))
with _get_httpx_client() as http_client:
response = http_client.get(
url=url_helpers.get_is_alive_ping_url(url), timeout=HEALTH_CHECK_TIMEOUT
)
return response.status_code == 200
except httpx.ConnectTimeout:
return False
Expand All @@ -43,8 +56,7 @@ def is_api_key_correct(api_key: str, url: str) -> bool:
"""

try:
with httpx.Client() as client:
client.headers.update({"Authorization": f"{api_key}"})
with _get_httpx_client(api_key) as client:
response = client.get(url=url_helpers.get_account_details_url(url))
if response.status_code == 200:
return True
Expand Down Expand Up @@ -75,8 +87,7 @@ def is_workspace_name_correct(api_key: Optional[str], workspace: str, url: str)
raise ConfigurationError("API key must be set to check workspace name.")

try:
with httpx.Client() as client:
client.headers.update({"Authorization": f"{api_key}"})
with _get_httpx_client(api_key) as client:
response = client.get(url=url_helpers.get_workspace_list_url(url))
except httpx.RequestError as e:
# Raised for network-related errors such as timeouts
Expand Down
28 changes: 22 additions & 6 deletions sdks/python/src/opik/httpx_client.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Union
import httpx

import os
from . import hooks, package_version
import platform


def get(workspace: str, api_key: Optional[str]) -> httpx.Client:
CABundlePath = str


def get(
workspace: Optional[str], api_key: Optional[str], check_tls_certificate: bool
) -> httpx.Client:
limits = httpx.Limits(keepalive_expiry=30)
client = httpx.Client(limits=limits)

verify: Union[bool, CABundlePath] = (
os.environ["SSL_CERT_FILE"]
if check_tls_certificate is True and "SSL_CERT_FILE" in os.environ
else check_tls_certificate
)

client = httpx.Client(limits=limits, verify=verify)

headers = _prepare_headers(workspace=workspace, api_key=api_key)
client.headers.update(headers)
Expand All @@ -17,13 +29,17 @@ def get(workspace: str, api_key: Optional[str]) -> httpx.Client:
return client


def _prepare_headers(workspace: str, api_key: Optional[str]) -> Dict[str, Any]:
def _prepare_headers(
workspace: Optional[str], api_key: Optional[str]
) -> Dict[str, Any]:
result = {
"Comet-Workspace": workspace,
"X-OPIK-DEBUG-SDK-VERSION": package_version.VERSION,
"X-OPIK-DEBUG-PY-VERSION": platform.python_version(),
}

if workspace is not None:
result["Comet-Workspace"] = workspace

if api_key is not None:
result["Authorization"] = api_key

Expand Down
49 changes: 38 additions & 11 deletions sdks/python/tests/library_integration/anthropic/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@
)

import anthropic
import tenacity


def _is_internal_server_error(exception: Exception) -> bool:
if isinstance(exception, anthropic.APIStatusError):
return exception.status_code >= 500 and exception.status_code < 600

return False


retry_on_internal_server_errors = tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_incrementing(start=5, increment=5),
retry=tenacity.retry_if_exception(_is_internal_server_error),
)


@pytest.fixture(autouse=True)
Expand All @@ -33,6 +48,7 @@ def ensure_anthropic_configured():
("anthropic-integration-test", "anthropic-integration-test"),
],
)
@retry_on_internal_server_errors
def test_anthropic_messages_create__happyflow(
fake_backend, project_name, expected_project_name
):
Expand All @@ -44,7 +60,7 @@ def test_anthropic_messages_create__happyflow(
messages = [{"role": "user", "content": "Tell a short fact"}]

response = wrapped_client.messages.create(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -88,6 +104,7 @@ def test_anthropic_messages_create__happyflow(
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_anthropic_messages_create__create_raises_an_error__span_and_trace_finished_gracefully(
fake_backend,
):
Expand Down Expand Up @@ -139,6 +156,7 @@ def test_anthropic_messages_create__create_raises_an_error__span_and_trace_finis
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_anthropic_messages_create__create_call_made_in_another_tracked_function__anthropic_span_attached_to_existing_trace(
fake_backend,
):
Expand All @@ -161,7 +179,7 @@ def f():
]

_ = wrapped_client.messages.create(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -216,6 +234,7 @@ def f():
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_async_anthropic_messages_create_call_made_in_another_tracked_async_function__anthropic_span_attached_to_existing_trace(
fake_backend,
):
Expand All @@ -228,7 +247,7 @@ async def async_f():
client = anthropic.AsyncAnthropic()
wrapped_client = track_anthropic(client)
_ = await wrapped_client.messages.create(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -283,6 +302,7 @@ async def async_f():
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_anthropic_messages_stream__generator_tracked_correctly(
fake_backend,
):
Expand All @@ -296,7 +316,7 @@ def test_anthropic_messages_stream__generator_tracked_correctly(
]

message_stream_manager = wrapped_client.messages.stream(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -341,12 +361,13 @@ def test_anthropic_messages_stream__generator_tracked_correctly(
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_anthropic_messages_stream__stream_called_2_times__generator_tracked_correctly(
fake_backend,
):
def run_stream(client, messages):
message_stream_manager = wrapped_client.messages.stream(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -441,6 +462,7 @@ def run_stream(client, messages):
assert_equal(EXPECTED_TRACE_TREE_WITH_JOKE, fake_backend.trace_trees[1])


@retry_on_internal_server_errors
def test_anthropic_messages_stream__get_final_message_called__generator_tracked_correctly(
fake_backend,
):
Expand All @@ -454,7 +476,7 @@ def test_anthropic_messages_stream__get_final_message_called__generator_tracked_
]

message_stream_manager = wrapped_client.messages.stream(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -498,6 +520,7 @@ def test_anthropic_messages_stream__get_final_message_called__generator_tracked_
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_anthropic_messages_stream__get_final_message_called_after_stream_iteration_loop__generator_tracked_correctly_only_once(
fake_backend,
):
Expand All @@ -511,7 +534,7 @@ def test_anthropic_messages_stream__get_final_message_called_after_stream_iterat
]

message_stream_manager = wrapped_client.messages.stream(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -557,6 +580,7 @@ def test_anthropic_messages_stream__get_final_message_called_after_stream_iterat
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_async_anthropic_messages_stream__data_tracked_correctly(
fake_backend,
):
Expand All @@ -571,7 +595,7 @@ def test_async_anthropic_messages_stream__data_tracked_correctly(

async def async_f():
message_stream_manager = wrapped_client.messages.stream(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -618,6 +642,7 @@ async def async_f():
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_async_anthropic_messages_stream__get_final_message_called_twice__data_tracked_correctly_once(
fake_backend,
):
Expand All @@ -632,7 +657,7 @@ def test_async_anthropic_messages_stream__get_final_message_called_twice__data_t

async def async_f():
message_stream_manager = wrapped_client.messages.stream(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -679,6 +704,7 @@ async def async_f():
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_anthropic_messages_create__stream_argument_is_True__Stream_object_returned__generations_tracked_correctly(
fake_backend,
):
Expand All @@ -692,7 +718,7 @@ def test_anthropic_messages_create__stream_argument_is_True__Stream_object_retur
]

stream = wrapped_client.messages.create(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down Expand Up @@ -737,6 +763,7 @@ def test_anthropic_messages_create__stream_argument_is_True__Stream_object_retur
assert_equal(EXPECTED_TRACE_TREE, fake_backend.trace_trees[0])


@retry_on_internal_server_errors
def test_async_anthropic_messages_create__stream_argument_is_True__AsyncStream_object_returned__generations_tracked_correctly(
fake_backend,
):
Expand All @@ -751,7 +778,7 @@ async def async_f():
]

stream = await wrapped_client.messages.create(
model="claude-3-opus-20240229",
model="claude-3-5-haiku-latest",
messages=messages,
max_tokens=10,
system="You are a helpful assistant",
Expand Down

0 comments on commit 3ebc00d

Please sign in to comment.