Skip to content

Commit

Permalink
Add 5 seconds wait when retrieving distributed locks for index operat…
Browse files Browse the repository at this point in the history
…ions (#999)
  • Loading branch information
papa99do authored Oct 22, 2024
1 parent 8aafb3b commit fc10e79
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 38 deletions.
1 change: 1 addition & 0 deletions src/marqo/api/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ def default_env_vars() -> dict:
EnvVars.MARQO_BEST_AVAILABLE_DEVICE: "cpu", # on_start_script will determine this.
EnvVars.MARQO_MAX_TENSOR_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED: 100,
EnvVars.MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT: 5, # index operations acquire this distributed lock with a timeout
EnvVars.ZOOKEEPER_CONNECTION_TIMEOUT: 15,
}
6 changes: 5 additions & 1 deletion src/marqo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def __init__(
utils.read_env_vars_and_defaults(EnvVars.MARQO_BEST_AVAILABLE_DEVICE))

# Initialize Core layer dependencies
self.index_management = IndexManagement(vespa_client, zookeeper_client, enable_index_operations=True)
deployment_lock_timeout = utils.read_env_vars_and_defaults_ints(EnvVars.MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT)
self.index_management = IndexManagement(vespa_client, zookeeper_client,
enable_index_operations=True,
deployment_lock_timeout_seconds=deployment_lock_timeout)

self.monitoring = Monitoring(vespa_client, self.index_management)
self.document = Document(vespa_client, self.index_management)
self.recommender = Recommender(vespa_client, self.index_management)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_deployment_lock(zookeeper_client: ZookeeperClient, acquire_timeout: floa
Args:
zookeeper_client: The Zookeeper client.
acquire_timeout: The timeout to acquire the lock. Default is 0.
acquire_timeout: The timeout to acquire the lock, in seconds. Default is 0.
Returns:
ZookeeperDistributedLock: The deployment lock.
Expand Down
32 changes: 23 additions & 9 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self,
enable_index_operations: bool = False,
deployment_timeout_seconds: int = 60,
convergence_timeout_seconds: int = 120,
deployment_lock_timeout: int = 0,
deployment_lock_timeout_seconds: float = 5,
):
"""Instantiate an IndexManagement object.
Expand All @@ -47,9 +47,12 @@ def __init__(self,
zookeeper_client: ZookeeperClient object
enable_index_operations: A flag to enable index operations. If set to True,
the object can create/delete indexes, otherwise, it raises an InternalError during index operations.
deployment_timeout_seconds: Vespa deployment timeout in seconds
convergence_timeout_seconds: Vespa convergence timeout in seconds
deployment_lock_timeout_seconds: Vespa deployment lock timeout in seconds
"""
self.vespa_client = vespa_client
self._zookeeper_deployment_lock = get_deployment_lock(zookeeper_client, deployment_lock_timeout) \
self._zookeeper_deployment_lock = get_deployment_lock(zookeeper_client, deployment_lock_timeout_seconds) \
if zookeeper_client else None
self._enable_index_operations = enable_index_operations
self._deployment_timeout_seconds = deployment_timeout_seconds
Expand Down Expand Up @@ -198,11 +201,22 @@ def update_index(self, marqo_index: SemiStructuredMarqoIndex) -> None:
OperationConflictError: If another index creation/deletion operation is
in progress and the lock cannot be acquired
"""
if not isinstance(marqo_index, SemiStructuredMarqoIndex):
# This is just a sanity check, it should not happen since we do not expose this method to end user.
raise InternalError(f'Index {marqo_index.name} can not be updated.')

with self._vespa_deployment_lock():
existing_index = self.get_index(marqo_index.name)
if not isinstance(existing_index, SemiStructuredMarqoIndex):
# This is just a sanity check, it should not happen since we do not expose this method to end user.
raise InternalError(f'Index {marqo_index.name} created by Marqo version {marqo_index.marqo_version} '
f'can not be updated.')

def is_subset(dict_a, dict_b):
# check if dict_a is a subset of dict_b
return all(k in dict_b and dict_b[k] == v for k, v in dict_a.items())

if (is_subset(marqo_index.tensor_field_map, existing_index.tensor_field_map) and
is_subset(marqo_index.field_map, existing_index.field_map)):
logger.debug(f'Another thread has updated the index {marqo_index.name} already.')
return

schema = SemiStructuredVespaSchema.generate_vespa_schema(marqo_index)
self._get_vespa_application().update_index_setting_and_schema(marqo_index, schema)

Expand Down Expand Up @@ -342,8 +356,8 @@ def _vespa_deployment_lock(self):
else:
try:
with self._zookeeper_deployment_lock:
logger.info(f"Retrieved the distributed lock for index operations. ")
logger.debug(f"Retrieved the distributed lock for index operations. ")
yield
except ZookeeperLockNotAcquiredError:
raise OperationConflictError("Another index creation/deletion operation is in progress. "
"Your request is rejected. Please try again later")
# TODO add a doclink for troubleshooting this issue
raise OperationConflictError("Your indexes are being updated. Please try again shortly.")
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from marqo.core.semi_structured_vespa_index.semi_structured_vespa_index import SemiStructuredVespaIndex
from marqo.core.semi_structured_vespa_index.semi_structured_vespa_schema import SemiStructuredVespaSchema
from marqo.core.unstructured_vespa_index.unstructured_add_document_handler import UnstructuredAddDocumentsHandler
from marqo.core.vespa_index.add_documents_handler import logger
from marqo.tensor_search.enums import EnvVars
from marqo.tensor_search.telemetry import RequestMetricsStore
from marqo.tensor_search.utils import read_env_vars_and_defaults_ints
Expand Down Expand Up @@ -84,6 +85,8 @@ def _add_lexical_field_to_index(self, field_name):
f'limit in MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED environment variable.')

# Add missing lexical fields to marqo index
logger.debug(f'Adding lexical field {field_name} to index {self.marqo_index.name}')

self.marqo_index.lexical_fields.append(
Field(name=field_name, type=FieldType.Text,
features=[FieldFeature.LexicalSearch],
Expand All @@ -104,6 +107,8 @@ def _add_tensor_field_to_index(self, field_name):
f'limit in MARQO_MAX_TENSOR_FIELD_COUNT_UNSTRUCTURED environment variable.')

# Add missing tensor fields to marqo index
logger.debug(f'Adding tensor field {field_name} to index {self.marqo_index.name}')

if field_name not in self.marqo_index.tensor_field_map:
self.marqo_index.tensor_fields.append(TensorField(
name=field_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import marqo.core.constants as index_constants
import marqo.core.search.search_filter as search_filter
from marqo import marqo_docs
from marqo.api import exceptions as errors
from marqo.core.models import MarqoQuery
from marqo.core.models.marqo_query import (MarqoTensorQuery, MarqoLexicalQuery, MarqoHybridQuery)
Expand Down Expand Up @@ -40,10 +41,10 @@ def to_marqo_document(self, vespa_document: Dict[str, Any], return_highlights: b

def to_vespa_query(self, marqo_query: MarqoQuery) -> Dict[str, Any]:
if marqo_query.searchable_attributes is not None:
# TODO Add a marqo doc link here on how to create a structured index
raise errors.InvalidArgError('searchable_attributes is not supported for an unstructured index. '
'You can create a structured index '
'by `mq.create_index("your_index_name", type="structured")`')
raise errors.InvalidArgError('Searchable attributes are not supported for unstructured indexes created '
'with Marqo versions prior to 2.13.0. To take advantage of this feature, '
'please create a new Marqo index. For more information, refer to the Create '
f'Index API reference: {marqo_docs.create_index()}.')

if isinstance(marqo_query, MarqoHybridQuery): # TODO: Rethink structure so order of checking doesn't matter
return self._to_vespa_hybrid_query(marqo_query)
Expand Down Expand Up @@ -298,7 +299,7 @@ def _to_vespa_lexical_query(self, marqo_query: MarqoLexicalQuery) -> Dict[str, A
return query

def _to_vespa_hybrid_query(self, marqo_query: MarqoHybridQuery) -> Dict[str, Any]:
# TODO: Add "fields to search" when searchable attributes get implemented
# This is for legacy unstructured index only. Searchable attributes is not supported
# Tensor term
tensor_term = self._get_tensor_search_term(marqo_query)
# Lexical term
Expand All @@ -320,7 +321,7 @@ def _to_vespa_hybrid_query(self, marqo_query: MarqoHybridQuery) -> Dict[str, Any
else:
filter_term = ''

select_attributes = "*" # TODO: Fix when searchable attributes are implemented
select_attributes = "*"

summary = unstructured_common.SUMMARY_ALL_VECTOR if marqo_query.expose_facets \
else unstructured_common.SUMMARY_ALL_NON_VECTOR
Expand All @@ -332,17 +333,6 @@ def _to_vespa_hybrid_query(self, marqo_query: MarqoHybridQuery) -> Dict[str, Any
unstructured_common.QUERY_INPUT_HYBRID_FIELDS_TO_RANK_TENSOR: {}
}

# TODO: add this back when searchable attributes are implemented
# Separate fields to rank (lexical and tensor)
#query_inputs.update({
# unstructured_common.QUERY_INPUT_HYBRID_FIELDS_TO_RANK_LEXICAL: {
# f: 1 for f in fields_to_search_lexical
# },
# unstructured_common.QUERY_INPUT_HYBRID_FIELDS_TO_RANK_TENSOR: {
# f: 1 for f in fields_to_search_tensor
# }
#})

# Extract score modifiers
hybrid_score_modifiers = self._get_hybrid_score_modifiers(marqo_query)
if hybrid_score_modifiers[constants.MARQO_SEARCH_METHOD_LEXICAL]:
Expand Down
1 change: 1 addition & 0 deletions src/marqo/tensor_search/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class EnvVars:
MARQO_INFERENCE_CACHE_TYPE = "MARQO_INFERENCE_CACHE_TYPE"
MARQO_MAX_TENSOR_FIELD_COUNT_UNSTRUCTURED = "MARQO_MAX_TENSOR_FIELD_COUNT_UNSTRUCTURED"
MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED = "MARQO_MAX_LEXICAL_FIELD_COUNT_UNSTRUCTURED"
MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT = "MARQO_INDEX_DEPLOYMENT_LOCK_TIMEOUT"
ZOOKEEPER_HOSTS = "ZOOKEEPER_HOSTS"
ZOOKEEPER_CONNECTION_TIMEOUT = "ZOOKEEPER_CONNECTION_TIMEOUT"

Expand Down
82 changes: 73 additions & 9 deletions tests/core/index_management/test_index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@ def test_update_index_should_succeed(self):

# update this index
semi_structured_marqo_index = cast(SemiStructuredMarqoIndex, self.index_management.get_index(request.name))
semi_structured_marqo_index.tensor_fields.append(TensorField(
name='title', chunk_field_name='marqo__chunks_title', embeddings_field_name='marqo__embeddings_title'))
semi_structured_marqo_index.tensor_fields.append(self._tensor_field('title'))
vespa_schema = cast(SemiStructuredVespaSchema, vespa_schema_factory(request))
new_schema = vespa_schema.generate_vespa_schema(semi_structured_marqo_index)
self.index_management.update_index(semi_structured_marqo_index)
Expand All @@ -384,13 +383,11 @@ def test_update_index_should_fail_under_race_condition(self):
semi_structured_marqo_index = cast(SemiStructuredMarqoIndex, self.index_management.get_index(request.name))

change1 = semi_structured_marqo_index.copy(deep=True)
change1.tensor_fields.append(TensorField(
name='title', chunk_field_name='marqo__chunks_title', embeddings_field_name='marqo__embeddings_title'))
change1.tensor_fields.append(self._tensor_field('title'))
self.index_management.update_index(change1)

change2 = semi_structured_marqo_index.copy(deep=True)
change2.tensor_fields.append(TensorField(
name='desc', chunk_field_name='marqo__chunks_desc', embeddings_field_name='marqo__embeddings_desc'))
change2.tensor_fields.append(self._tensor_field('description'))

with self.assertRaisesStrict(OperationConflictError) as err:
self.index_management.update_index(change2)
Expand Down Expand Up @@ -420,11 +417,62 @@ def test_update_index_should_fail_for_wrong_index_types(self):
)
]:
with self.subTest(f'request_type={type(request)}'):
self.index_management.create_index(request)

_, index = vespa_schema_factory(request).generate_schema()

with self.assertRaisesStrict(InternalError):
with self.assertRaisesStrict(InternalError) as err:
self.index_management.update_index(index)

self.assertIn('can not be update', str(err.exception))

def test_update_index_should_skip_if_nothing_to_update(self):
request = self.unstructured_marqo_index_request(model=Model(name='hf/e5-small'))
self.index_management.bootstrap_vespa()
self.index_management.create_index(request)

semi_structured_marqo_index = cast(SemiStructuredMarqoIndex, self.index_management.get_index(request.name))

change1 = semi_structured_marqo_index.copy(deep=True)
change1.tensor_fields.extend([self._tensor_field('title'), self._tensor_field('description'), self._tensor_field('tags')])
change1.lexical_fields.extend([self._lexical_field('field1'), self._lexical_field('field2'), self._lexical_field('field3')])

change2 = semi_structured_marqo_index.copy(deep=True)
# Deliberately use a different order to see if the comparison is order-agnostic
# Also use a subset to see if it skips the update if all fields needed are already present
change2.tensor_fields.extend([self._tensor_field('description'), self._tensor_field('title')])
change2.lexical_fields.extend([self._lexical_field('field2'), self._lexical_field('field1')])

exception_list_1 = []
exception_list_2 = []
mock_update_index_and_schema = mock.MagicMock()

def worker1():
try:
self.index_management.update_index(change1)
except Exception as err:
exception_list_1.append(err)

@mock.patch("marqo.core.index_management.vespa_application_package.VespaApplicationPackage.update_index_setting_and_schema", mock_update_index_and_schema)
def worker2():
try:
self.index_management.update_index(change2)
except Exception as err:
exception_list_2.append(err)

thread1 = threading.Thread(target=worker1)
thread2 = threading.Thread(target=worker2)

thread1.start()
time.sleep(1)
thread2.start()
thread1.join()
thread2.join()

self.assertEqual(exception_list_1, [])
self.assertEqual(exception_list_2, [])
mock_update_index_and_schema.assert_not_called()

def test_create_index_should_fail_if_index_already_exists(self):
request = self.unstructured_marqo_index_request(name="test-index")
self.index_management.bootstrap_vespa()
Expand Down Expand Up @@ -502,15 +550,18 @@ def test_batch_delete_index_should_fail_atomically(self):
self._assert_index_is_not_present(app, index2.name, index2.schema_name)

def test_concurrent_updates_is_prevented_by_distributed_locking(self):
exception_list = []

def worker1():
request = self.unstructured_marqo_index_request(name="index1")
self.index_management.create_index(request)

def worker2():
with self.assertRaisesStrict(OperationConflictError) as e:
try:
request = self.unstructured_marqo_index_request(name="index2")
self.index_management.create_index(request)
self.assertEqual("Another index creation/deletion operation is in progress.", str(e.exception))
except Exception as err:
exception_list.append(err)

self.index_management.bootstrap_vespa()
thread1 = threading.Thread(target=worker1)
Expand All @@ -522,6 +573,9 @@ def worker2():
thread1.join()
thread2.join()

self.assertEqual(1, len(exception_list))
self.assertTrue(isinstance(exception_list[0], OperationConflictError))
self.assertEqual("Your indexes are being updated. Please try again shortly.", str(exception_list[0]))

@pytest.mark.skip(reason="This test case is just used to verify the optimistic locking mechanism works")
def test_race_condition(self):
Expand Down Expand Up @@ -650,3 +704,13 @@ def _save_index_settings_to_vespa(self, marqo_index: MarqoIndex) -> None:
),
schema=self.index_management._MARQO_SETTINGS_SCHEMA_NAME
)

def _tensor_field(self, field_name: str):
return TensorField(
name=field_name, chunk_field_name=f'marqo__chunks_{field_name}',
embeddings_field_name=f'marqo__embeddings_{field_name}')

def _lexical_field(self, field_name: str):
return Field(name=field_name, type=FieldType.Text,
features=[FieldFeature.LexicalSearch],
lexical_field_name=f'marqo__lexical_{field_name}')
3 changes: 2 additions & 1 deletion tests/marqo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def setUpClass(cls) -> None:
cls.configure_request_metrics()
cls.vespa_client = vespa_client
cls.zookeeper_client = zookeeper_client
cls.index_management = IndexManagement(cls.vespa_client, cls.zookeeper_client, enable_index_operations=True)
cls.index_management = IndexManagement(cls.vespa_client, cls.zookeeper_client, enable_index_operations=True,
deployment_lock_timeout_seconds=2)
cls.monitoring = Monitoring(cls.vespa_client, cls.index_management)
cls.config = config.Config(vespa_client=vespa_client, default_device="cpu",
zookeeper_client=cls.zookeeper_client)
Expand Down

0 comments on commit fc10e79

Please sign in to comment.