Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge patch release 2.13.2 to mainline #1052

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/arm64_docker_marqo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ on:
push:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/cpu_docker_marqo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ on:
push:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'

Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/cpu_local_marqo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ on:
push:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'
pull_request_target:
pull_request:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'

permissions:
contents: read

concurrency:
group: cpu-local-api-tests-${{ github.head_ref || github.ref }}
group: cpu-local-api-tests-${{ github.ref }}
cancel-in-progress: true

jobs:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/cuda_docker_marqo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ on:
push:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'

Expand Down
8 changes: 5 additions & 3 deletions .github/workflows/largemodel_unit_test_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ on:
push:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'
pull_request_target:
pull_request:
branches:
- mainline
- releases/*
paths-ignore:
- '**.md'

concurrency:
group: large-model-unit-tests-${{ github.head_ref || github.ref }}
group: large-model-unit-tests-${{ github.ref }}
cancel-in-progress: true

permissions:
Expand Down Expand Up @@ -82,7 +84,7 @@ jobs:

- name: Install dependencies
run: |
pip install -r marqo-base/requirements.txt
pip install -r marqo-base/requirements/amd64-gpu-requirements.txt
# override base requirements with marqo requirements, if needed:
pip install -r marqo/requirements.dev.txt
pip install pytest==7.4.0
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/unit_test_200gb_CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ on:
branches:
- mainline
- releases/*
pull_request_target:
pull_request:
branches:
- mainline
- releases/*

concurrency:
group: unit-tests-${{ github.head_ref || github.ref }}
group: unit-tests-${{ github.ref }}
cancel-in-progress: true

permissions:
Expand Down Expand Up @@ -80,7 +80,7 @@ jobs:

- name: Install dependencies
run: |
pip install -r marqo-base/requirements.txt
pip install -r marqo-base/requirements/amd64-gpu-requirements.txt
# override base requirements with marqo requirements, if needed:
pip install -r marqo/requirements.dev.txt

Expand Down
16 changes: 16 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# Release 2.13.2

## Bug fixes and minor changes

- Fix a bug where adding documents with numeric lists to an unstructured index results in a 500 error. Now, Marqo successfully processes the document batch, and returns a 400 error only for individual documents that contain numeric lists([1034](https://github.com/marqo-ai/marqo/pull/1034)).
- Fix validation of custom vector fields. Custom vector fields were silently ignored when not specified as tensor fields for an unstructured index. This will now trigger a 400 error. This helps guide users to properly define the field as a tensor field([1034](https://github.com/marqo-ai/marqo/pull/1034)).
- Improve the bootstrapping process to prevent Marqo from crashing during startup when the vector store takes longer to converge, especially with multiple indexes. This ensures a smoother startup process even if the vector store takes time to fully initialize([1036](https://github.com/marqo-ai/marqo/pull/1036)).

# Release 2.13.1

## Bug fixes and minor changes

- Fix a bug where Marqo returns a 500 error if an inaccessible private image is encountered in the query or embed endpoint. Marqo now correctly returns a 400 error with a helpful error message ([1027](https://github.com/marqo-ai/marqo/pull/1027)).
- Fix a bug preventing Marqo from warming up Languagebind models. Marqo now successfully warms up Languagebind models as expected ([1031](https://github.com/marqo-ai/marqo/pull/1031)).
- Fix a bug where Languagebind models always generate normalized embeddings for non-text content. These models now correctly produce unnormalized embeddings for video, audio, and image content ([1032](https://github.com/marqo-ai/marqo/pull/1032)).

# Release 2.13.0

## New features
Expand Down
43 changes: 31 additions & 12 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,26 @@ def bootstrap_vespa(self) -> bool:
Returns:
True if Vespa was bootstrapped, False if it was already up-to-date
"""
with self._vespa_deployment_lock():
vespa_app = self._get_vespa_application(check_configured=False, need_binary_file_support=True)

to_version = version.get_version()
from_version = vespa_app.get_marqo_config().version if vespa_app.is_configured else None
# We skip the Vespa convergence check here so that Marqo instance can be bootstrapped even when Vespa is
# not converged.
to_version = version.get_version()
vespa_app_for_version_check = self._get_vespa_application(check_configured=False, need_binary_file_support=True,
check_for_application_convergence=False)
from_version = vespa_app_for_version_check.get_marqo_config().version \
if vespa_app_for_version_check.is_configured else None

if from_version and semver.VersionInfo.parse(from_version) >= semver.VersionInfo.parse(to_version):
# skip bootstrapping if already bootstrapped to this version or later
return False

if from_version and semver.VersionInfo.parse(from_version) >= semver.VersionInfo.parse(to_version):
# skip bootstrapping if already bootstrapped to this version or later
return False
with self._vespa_deployment_lock():
# Initialise another session based on the latest active Vespa session. The reason we do this again while
# holding the distributed lock is that the Vespa application might be changed by other operations when
# we wait for the lock. This time, we error out if the Vespa application is not converged, which reduces
# the chance of running into race conditions.
vespa_app = self._get_vespa_application(check_configured=False, need_binary_file_support=True,
check_for_application_convergence=True)

# Only retrieving existing index when the vespa app is not configured and the index settings schema exists
existing_indexes = self._get_existing_indexes() if not vespa_app.is_configured and \
Expand All @@ -105,8 +116,12 @@ def bootstrap_vespa(self) -> bool:
return True

def rollback_vespa(self) -> None:
"""
Roll back Vespa application package to the previous version backed up in the current app package.
"""
with self._vespa_deployment_lock():
self._get_vespa_application(need_binary_file_support=True).rollback(version.get_version())
vespa_app = self._get_vespa_application(need_binary_file_support=True)
vespa_app.rollback(version.get_version())

def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
"""
Expand Down Expand Up @@ -274,15 +289,17 @@ def get_marqo_version(self) -> str:
"""
return self._get_vespa_application().get_marqo_config().version

def _get_vespa_application(self, check_configured: bool = True, need_binary_file_support: bool = False) \
-> VespaApplicationPackage:
def _get_vespa_application(self, check_configured: bool = True, need_binary_file_support: bool = False,
check_for_application_convergence: bool = True) -> VespaApplicationPackage:
"""
Retrieve a Vespa application package. Depending on whether we need to handle binary files and the Vespa version,
it uses different implementation of VespaApplicationStore.

Args:
check_configured: if set to True, it checks whether the application package is configured or not.
need_binary_file_support: indicates whether the support for binary file is needed.
check_for_application_convergence: whether we check convergence of the Vespa app package. If set to true and
Vespa is not converged, this process will fail with a VespaError raised.

Returns:
The VespaApplicationPackage instance we can use to do bootstrapping/rollback and any index operations.
Expand Down Expand Up @@ -314,13 +331,15 @@ def _get_vespa_application(self, check_configured: bool = True, need_binary_file
application_package_store = VespaApplicationFileStore(
vespa_client=self.vespa_client,
deploy_timeout=self._deployment_timeout_seconds,
wait_for_convergence_timeout=self._convergence_timeout_seconds
wait_for_convergence_timeout=self._convergence_timeout_seconds,
check_for_application_convergence=check_for_application_convergence
)
else:
application_package_store = ApplicationPackageDeploymentSessionStore(
vespa_client=self.vespa_client,
deploy_timeout=self._deployment_timeout_seconds,
wait_for_convergence_timeout=self._convergence_timeout_seconds
wait_for_convergence_timeout=self._convergence_timeout_seconds,
check_for_application_convergence=check_for_application_convergence
)

application = VespaApplicationPackage(application_package_store)
Expand Down
12 changes: 8 additions & 4 deletions src/marqo/core/index_management/vespa_application_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,11 @@ class VespaApplicationFileStore(VespaApplicationStore):
more details. This is the only viable option to deploy changes of binary files before Vespa version 8.382.22.
We implement this approach to support bootstrapping and rollback for Vespa version prior to 8.382.22.
"""
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int):
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int,
check_for_application_convergence: bool = True):
super().__init__(vespa_client, deploy_timeout, wait_for_convergence_timeout)
self._app_root_path = vespa_client.download_application(check_for_application_convergence=True)
self._app_root_path = vespa_client.download_application(
check_for_application_convergence=check_for_application_convergence)

def _full_path(self, *paths: str) -> str:
return os.path.join(self._app_root_path, *paths)
Expand Down Expand Up @@ -483,9 +485,11 @@ class ApplicationPackageDeploymentSessionStore(VespaApplicationStore):
See https://docs.vespa.ai/en/reference/deploy-rest-api-v2.html#create-session for more details.
However, this approach does not support binary files for Vespa version prior to 8.382.22.
"""
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int):
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int,
check_for_application_convergence: bool = True):
super().__init__(vespa_client, deploy_timeout, wait_for_convergence_timeout)
self._content_base_url, self._prepare_url = vespa_client.create_deployment_session()
self._content_base_url, self._prepare_url = vespa_client.create_deployment_session(
check_for_application_convergence)
self._all_contents = vespa_client.list_contents(self._content_base_url)

def file_exists(self, *paths: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SemiStructuredVespaDocument(MarqoBaseModel):
@classmethod
def from_vespa_document(cls, document: Dict, marqo_index: SemiStructuredMarqoIndex) -> "SemiStructuredVespaDocument":
"""
Instantiate an UnstructuredVespaDocument from a Vespa document.
Instantiate an SemiStructuredVespaDocument from a Vespa document.
Used in get_document_by_id or get_documents_by_ids
"""
fields = document.get(cls._VESPA_DOC_FIELDS, {})
Expand Down Expand Up @@ -76,10 +76,11 @@ def extract_field(cls, fields, name: str, default: Any):

@classmethod
def from_marqo_document(cls, document: Dict, marqo_index: SemiStructuredMarqoIndex) -> "SemiStructuredVespaDocument":
"""Instantiate an UnstructuredVespaDocument from a valid Marqo document for feeding to Vespa"""
"""Instantiate an SemiStructuredVespaDocument from a valid Marqo document for feeding to Vespa"""

if index_constants.MARQO_DOC_ID not in document:
raise VespaDocumentParsingError(
# Please note we still use unstructured in the error message since it will be exposed to user
raise MarqoDocumentParsingError(
f"Unstructured Marqo document does not have a {index_constants.MARQO_DOC_ID} field. "
f"This should be assigned for a valid document")

Expand Down Expand Up @@ -116,7 +117,7 @@ def from_marqo_document(cls, document: Dict, marqo_index: SemiStructuredMarqoInd
instance.fixed_fields.float_fields[f"{field_name}.{k}"] = float(v)
instance.fixed_fields.score_modifiers_fields[f"{field_name}.{k}"] = v
else:
raise VespaDocumentParsingError(
raise MarqoDocumentParsingError(
f"In document {doc_id}, field {field_name} has an "
f"unsupported type {type(field_content)} which has not been validated in advance.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,31 @@
from marqo.api import exceptions as api_errors
from marqo.core import constants
from marqo.core.constants import MARQO_DOC_ID
from marqo.core.vespa_index.add_documents_handler import AddDocumentsHandler, AddDocumentsError
from marqo.core.models.add_docs_params import AddDocsParams
from marqo.core.inference.tensor_fields_container import TensorFieldsContainer, MODALITY_FIELD_TYPE_MAP
from marqo.core.models import UnstructuredMarqoIndex
from marqo.core.models.add_docs_params import AddDocsParams
from marqo.core.models.marqo_index import FieldType
from marqo.core.unstructured_vespa_index.common import MARQO_DOC_MULTIMODAL_PARAMS
from marqo.core.unstructured_vespa_index.unstructured_validation import validate_tensor_fields, validate_field_name, \
validate_mappings_object_format, validate_coupling_of_mappings_and_doc
from marqo.core.unstructured_vespa_index.unstructured_vespa_index import UnstructuredVespaIndex
from marqo.core.vespa_index.add_documents_handler import AddDocumentsHandler, AddDocumentsError
from marqo.s2_inference.errors import MediaDownloadError
from marqo.s2_inference.multimodal_model_load import infer_modality, Modality

from marqo.vespa.models import VespaDocument
from marqo.vespa.models.get_document_response import Document
from marqo.vespa.vespa_client import VespaClient

# TODO deps to tensor_search needs to be removed
from marqo.tensor_search.constants import ALLOWED_UNSTRUCTURED_FIELD_TYPES
from marqo.tensor_search.validation import list_types_valid, validate_custom_vector, \
validate_multimodal_combination, validate_map_numeric_field
from marqo.vespa.vespa_client import VespaClient
from marqo.tensor_search.validation import validate_custom_vector, \
validate_map_numeric_field


class UnstructuredAddDocumentsHandler(AddDocumentsHandler):
_MINIMUM_MARQO_VERSION_SUPPORTS_MAP_NUMERIC_FIELDS = semver.VersionInfo.parse("2.9.0")

def __init__(self, marqo_index: UnstructuredMarqoIndex, add_docs_params: AddDocsParams, vespa_client: VespaClient):
self._validate_add_docs_params(add_docs_params)
super().__init__(marqo_index, add_docs_params, vespa_client)
Expand Down Expand Up @@ -100,27 +102,29 @@ def _validate_field(self, field_name: str, field_content: Any) -> None:
f"Allowed content types: {[ty.__name__ for ty in ALLOWED_UNSTRUCTURED_FIELD_TYPES]}"
)

if isinstance(field_content, list) and not list_types_valid(field_content):
raise AddDocumentsError(
f"Field content '{field_content}' "
f"of type {type(field_content).__name__} is not of valid content type! "
f"All list elements must be of the same type and that type must be int, float or string"
)
if isinstance(field_content, list):
for element in field_content:
if not isinstance(element, str):
# if the field content is a list, it should only contain strings.
raise AddDocumentsError(
f"Field content {field_content} includes an element of type {type(element).__name__} "
f"which is not a string. Unstructured Marqo index only supports string lists."
)

is_tensor_field = field_name in self.add_docs_params.tensor_fields

if isinstance(field_content, dict):
if self.tensor_fields_container.is_custom_tensor_field(field_name):
# TODO is_non_tensor_field check can be move out to AddDocsParams level
validate_custom_vector(field_content, False, self.marqo_index.model.get_dimension())
elif self.tensor_fields_container.is_multimodal_field(field_name):
# FIXME, multimodal field should not be present in the doc
# TODO This validation should be done at AddDocsParams level
validate_multimodal_combination(field_content, False,
self.tensor_fields_container.get_multimodal_field_mapping(field_name))
elif self.marqo_index.parsed_marqo_version() < semver.VersionInfo.parse("2.9.0"):
# TODO This check should not happen at root level
# TODO should is_non_tensor_field check be moved out to AddDocsParams validation?
# Please note that if one of the documents in the batch has a custom field which does not exist
# in the tensor field, the whole batch will fail and user will get a 400 pydantic.ValidationError.
# We keep this behaviour unchanged to be compatible with the legacy unstructured index.
validate_custom_vector(field_content, not is_tensor_field, self.marqo_index.model.get_dimension())
elif self.marqo_index.parsed_marqo_version() < self._MINIMUM_MARQO_VERSION_SUPPORTS_MAP_NUMERIC_FIELDS:
# We do not support map of numeric fields prior to 2.9.0
raise AddDocumentsError(
f"The field {field_name} is a map field and only supported for indexes created with Marqo 2.9.0 "
f"or later. See {marqo_docs.map_fields()} and {marqo_docs.mappings()}."
f"The field {field_name} is a map field and only supported for indexes created with Marqo 2.9.0"
f" or later. See {marqo_docs.map_fields()} and {marqo_docs.mappings()}."
)
else:
validate_map_numeric_field(field_content)
Expand Down
Loading
Loading