Skip to content

Commit

Permalink
Fix the vectorise perf regression in 2.13 (#1009)
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do authored Oct 21, 2024
1 parent a2c4ff4 commit 8aafb3b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 55 deletions.
40 changes: 13 additions & 27 deletions src/marqo/core/inference/tensor_fields_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ModelConfig(BaseModel):

class Vectoriser(ABC):
@abstractmethod
def vectorise(self, content_chunks: List[ContentChunkType]) -> List[List[float]]:
def vectorise(self, content_chunks: List[ContentChunkType], key_prefix: str = None) -> List[List[float]]:
"""
Generate embeddings from a list of content chunks.
Expand Down Expand Up @@ -168,7 +168,7 @@ def single_vectorisers_by_modality(cls, model_config: ModelConfig) -> Dict[Field

@classmethod
def batch_vectorisers_by_modality(cls, model_config: ModelConfig,
chunks_to_vectorise: Dict[FieldType, List[ContentChunkType]]
chunks_to_vectorise: Dict[FieldType, List[Tuple[str, ContentChunkType]]]
) -> Dict[FieldType, 'Vectoriser']:
return {field_type: BatchCachingVectoriser(modality, chunks_to_vectorise[field_type], model_config)
for modality, field_type in MODALITY_FIELD_TYPE_MAP.items()
Expand All @@ -183,7 +183,7 @@ def __init__(self, modality: Modality, model_config: ModelConfig):
self.modality = modality
self.model_config = model_config

def vectorise(self, content_chunks: List[ContentChunkType]) -> List[List[float]]:
def vectorise(self, content_chunks: List[ContentChunkType], key_prefix: str = None) -> List[List[float]]:
with RequestMetricsStore.for_request().time(f"add_documents.create_vectors"):
if self.modality in [Modality.AUDIO, Modality.VIDEO]:
# audio and video fields has to be vectorised chunk by chunk due to a limitation of languagebind
Expand All @@ -197,27 +197,12 @@ class BatchCachingVectoriser(Vectoriser):
Generate embeddings when the class is initialised and cache them. When vectorise method is called, just return
the cached embeddings.
"""
def __init__(self, modality: Modality, chunks_to_vectorise: List[ContentChunkType], model_config: ModelConfig):
def __init__(self, modality: Modality, chunks_to_vectorise: List[Tuple[str, ContentChunkType]], model_config: ModelConfig):
self.modality = modality
self.model_config = model_config
self.embedding_cache = self._vectorise_and_cache(chunks_to_vectorise)

def _dict_key(self, chunk: ContentChunkType):
if isinstance(chunk, Image):
chunk = chunk.convert('RGB')
pixel_bytes = chunk.tobytes()
# Use md5 hash for faster hashing.
return hashlib.md5(pixel_bytes).hexdigest()
elif isinstance(chunk, dict):
# Generate a sorted key-value pairs to ensure consistency.
return frozenset((k, self._dict_key(v)) for k, v in chunk.items())
elif isinstance(chunk, Tensor):
# Convert to a tuple to be hashable # TODO find a more memory efficient way, maybe hashlib.md5?
return tuple(chunk.flatten().tolist())
else:
return chunk

def _vectorise_and_cache(self, chunks_to_vectorise: List[ContentChunkType]) -> dict:
def _vectorise_and_cache(self, chunks_to_vectorise: List[Tuple[str, ContentChunkType]]) -> dict:
if not chunks_to_vectorise:
return dict()

Expand All @@ -226,14 +211,15 @@ def _vectorise_and_cache(self, chunks_to_vectorise: List[ContentChunkType]) -> d
with RequestMetricsStore.for_request().time(f"add_documents.create_vectors"):
if self.modality in [Modality.AUDIO, Modality.VIDEO]:
# audio and video fields has to be vectorised chunk by chunk due to a limitation of languagebind
embeddings = [vector for content_chunk in chunks_to_vectorise for vector in
embeddings = [vector for _, content_chunk in chunks_to_vectorise for vector in
self._s2inference_vectorise([content_chunk], self.modality, self.model_config)]
else:
embeddings = self._s2inference_vectorise(chunks_to_vectorise, self.modality, self.model_config)
return {self._dict_key(chunk): embeddings[i] for i, chunk in enumerate(chunks_to_vectorise)}
embeddings = self._s2inference_vectorise([content_chunk for _, content_chunk in chunks_to_vectorise],
self.modality, self.model_config)
return {f'{key}': embeddings[i] for i, (key, _) in enumerate(chunks_to_vectorise)}

def vectorise(self, content_chunks: List[ContentChunkType]) -> List[List[float]]:
return [self.embedding_cache[self._dict_key(chunk)] for chunk in content_chunks]
def vectorise(self, content_chunks: List[ContentChunkType], key_prefix: str = None) -> List[List[float]]:
return [self.embedding_cache[f'{key_prefix}_{i}'] for i, _ in enumerate(content_chunks)]


class TensorFieldContent(BaseModel):
Expand Down Expand Up @@ -299,14 +285,14 @@ def chunk(self, chunkers: Dict[FieldType, Chunker]):
self.chunks.extend(chunks)
self.content_chunks.extend(content_chunks)

def vectorise(self, vectorisers: Dict[FieldType, Vectoriser]) -> None:
def vectorise(self, vectorisers: Dict[FieldType, Vectoriser], key_prefix: str = None) -> None:
if self.field_type not in vectorisers:
raise AddDocumentsError(f'Vectorisation is not supported for field type: {self.field_type.name}')

if not self.content_chunks:
return

embeddings = vectorisers[self.field_type].vectorise(self.content_chunks)
embeddings = vectorisers[self.field_type].vectorise(self.content_chunks, key_prefix)
self.embeddings.extend(embeddings)
self.content_chunks = [] # drop it after vectorisation so memory can be freed
self.is_resolved = True
Expand Down
28 changes: 17 additions & 11 deletions src/marqo/core/vespa_index/add_documents_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from marqo.core.constants import MARQO_DOC_ID, MARQO_CUSTOM_VECTOR_NORMALIZATION_MINIMUM_VERSION
from marqo.core.models.add_docs_params import AddDocsParams, BatchVectorisationMode
from marqo.core.inference.tensor_fields_container import Chunker, TensorFieldsContainer, TensorFieldContent, \
TextChunker, ImageChunker, AudioVideoChunker, ModelConfig, Vectoriser
TextChunker, ImageChunker, AudioVideoChunker, ModelConfig, Vectoriser, ContentChunkType
from marqo.core.exceptions import AddDocumentsError, DuplicateDocumentError, MarqoDocumentParsingError, InternalError, \
UnsupportedFeatureError
from marqo.core.models import MarqoIndex
Expand Down Expand Up @@ -308,17 +308,20 @@ def _vectorise_tensor_fields_in_batch_per_doc(self, model_config: ModelConfig) -
with ExitStack() as exit_stack:
media_repo = self._download_media_contents(exit_stack)
chunkers = self._field_type_chunker_map(media_repo)

doc_chunks_map: Dict[str, Dict[FieldType, List[str]]] = dict()
doc_field_map: Dict[str, List[TensorFieldContent]] = dict()
# sample doc_chunks_map: {'doc_id1': {'image_pointer': [('field_a_0': content_chunk)]}}
doc_chunks_map: Dict[str, Dict[FieldType, List[Tuple[str, ContentChunkType]]]] = dict()
# sample doc_field_map: {'doc_id1': {'field_name_1': tensor_field_content}}
doc_field_map: Dict[str, Dict[str, TensorFieldContent]] = dict()

for doc_id, field_name, tensor_field_content in (
self.tensor_fields_container.tensor_fields_to_vectorise(*chunkers.keys())):
try:
tensor_field_content.chunk(chunkers)
content_chunks_with_key = [(f'{field_name}_{index}', chunk) for index, chunk in
enumerate(tensor_field_content.content_chunks)]
doc_chunks_map.setdefault(doc_id, {}).setdefault(
tensor_field_content.field_type, []).extend(tensor_field_content.content_chunks)
doc_field_map.setdefault(doc_id, []).append(tensor_field_content)
tensor_field_content.field_type, []).extend(content_chunks_with_key)
doc_field_map.setdefault(doc_id, {})[field_name] = tensor_field_content

except AddDocumentsError as err:
self.add_docs_response_collector.collect_error_response(doc_id, err)
Expand All @@ -331,8 +334,8 @@ def _vectorise_tensor_fields_in_batch_per_doc(self, model_config: ModelConfig) -
try:
vectorisers = Vectoriser.batch_vectorisers_by_modality(model_config, chunks_to_vectorise)

for tensor_field_content in doc_field_map[doc_id]:
tensor_field_content.vectorise(vectorisers)
for field_name, tensor_field_content in doc_field_map[doc_id].items():
tensor_field_content.vectorise(vectorisers, key_prefix=field_name)

except AddDocumentsError as err:
self.add_docs_response_collector.collect_error_response(doc_id, err)
Expand All @@ -342,14 +345,17 @@ def _vectorise_tensor_fields_in_batch_per_add_doc_batch(self, model_config: Mode
with ExitStack() as exit_stack:
media_repo = self._download_media_contents(exit_stack)
chunkers = self._field_type_chunker_map(media_repo)
chunks_map = dict()
# sample chunks_map: {'image_pointer': [('doc_id_1_field_a_0': content_chunk)]}
chunks_map: Dict[FieldType, List[Tuple[str, ContentChunkType]]] = dict()

for doc_id, field_name, tensor_field_content in (
self.tensor_fields_container.tensor_fields_to_vectorise(*chunkers.keys())):
try:
tensor_field_content.chunk(chunkers)
field_type = tensor_field_content.field_type
chunks_map.setdefault(field_type, []).extend(tensor_field_content.content_chunks)
content_chunks_with_key = [(f'{doc_id}_{field_name}_{index}', chunk) for index, chunk in
enumerate(tensor_field_content.content_chunks)]
chunks_map.setdefault(field_type, []).extend(content_chunks_with_key)
except AddDocumentsError as err:
self.add_docs_response_collector.collect_error_response(doc_id, err)
self.tensor_fields_container.remove_doc(doc_id)
Expand All @@ -364,7 +370,7 @@ def _vectorise_tensor_fields_in_batch_per_add_doc_batch(self, model_config: Mode

for doc_id, field_name, tensor_field_content in (
self.tensor_fields_container.tensor_fields_to_vectorise(*chunkers.keys())):
tensor_field_content.vectorise(vectorisers)
tensor_field_content.vectorise(vectorisers, key_prefix=f'{doc_id}_{field_name}')

def _download_media_contents(self, exit_stack):
url_doc_id_map = dict()
Expand Down
2 changes: 1 addition & 1 deletion tests/core/inference/test_tensor_field_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DummyVectoriser(Vectoriser):
def __init__(self):
self.vectorise_call_count = 0

def vectorise(self, content_chunks: Union[List[str], List[Image]]) -> List[List[float]]:
def vectorise(self, content_chunks: Union[List[str], List[Image]], key_prefix: str = None) -> List[List[float]]:
self.vectorise_call_count += 1
return [[1.0 * (i + 1), 2.0 * (i + 1)] for i in range(len(content_chunks))]

Expand Down
36 changes: 20 additions & 16 deletions tests/core/inference/test_tensor_field_vectorisers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import unittest
from typing import Dict
from unittest.mock import patch

Expand All @@ -10,17 +11,20 @@
from marqo.s2_inference.clip_utils import load_image_from_path
from marqo.s2_inference.errors import ModelDownloadError
from marqo.s2_inference.multimodal_model_load import Modality
from tests.marqo_test import MarqoTestCase, TestImageUrls
from marqo.tensor_search.telemetry import RequestMetricsStore
from tests.marqo_test import TestImageUrls
from marqo.s2_inference import errors as s2_inference_errors


@pytest.mark.unittest
class TestTensorFieldVectorisers(MarqoTestCase):
class TestTensorFieldVectorisers(unittest.TestCase):
def setUp(self):
self.model_config = ModelConfig(
model_name='random',
normalize_embeddings=True
)
RequestMetricsStore._set_request('request')
RequestMetricsStore.set_in_request()

@patch('marqo.s2_inference.s2_inference.vectorise')
def test_single_vectoriser_should_vectorise_chunks_passed_in_in_one_go(self, mock_vectorise):
Expand Down Expand Up @@ -56,26 +60,26 @@ def test_single_vectoriser_should_vectorise_audio_and_video_chunks_one_at_a_time
def test_batch_vectoriser_should_vectorise_chunks_in_one_batch(self, mock_vectorise):
for modality in [Modality.TEXT, Modality.IMAGE]:
with self.subTest(modality=modality):
all_chunks = ['chunk1', 'chunk2']
all_chunks = [('key_0', 'chunk1'), ('key_1', 'chunk2')]
mock_vectorise.reset_mock()
mock_vectorise.side_effect = [[[1.0, 2.0], [3.0, 4.0]]]

batch_vectoriser = BatchCachingVectoriser(modality, all_chunks, self.model_config)

self.assertEquals(1, mock_vectorise.call_count)
self.assertEquals(all_chunks, mock_vectorise.call_args_list[0].kwargs['content'])
self.assertEquals(['chunk1', 'chunk2'], mock_vectorise.call_args_list[0].kwargs['content'])

# verify if the embeddings are cached
mock_vectorise.reset_mock()
embeddings = batch_vectoriser.vectorise(['chunk2', 'chunk1'])
self.assertEquals([[3.0, 4.0], [1.0, 2.0]], embeddings)
embeddings = batch_vectoriser.vectorise(['chunk1', 'chunk2'], key_prefix='key')
self.assertEquals([[1.0, 2.0], [3.0, 4.0]], embeddings)
self.assertEquals(0, mock_vectorise.call_count)

@patch('marqo.s2_inference.s2_inference.vectorise')
def test_batch_vectoriser_should_vectorise_audio_and_video_chunks_one_at_a_time(self, mock_vectorise):
for modality in [Modality.AUDIO, Modality.VIDEO]:
with self.subTest(modality=modality):
all_chunks = ['chunk1', 'chunk2']
all_chunks = [('key_0_0', 'chunk1'), ('key_1_0', 'chunk2')]
mock_vectorise.reset_mock()
mock_vectorise.side_effect = [[[1.0, 2.0]], [[3.0, 4.0]]]

Expand All @@ -87,7 +91,7 @@ def test_batch_vectoriser_should_vectorise_audio_and_video_chunks_one_at_a_time(

# verify if the embeddings are cached
mock_vectorise.reset_mock()
embeddings = batch_vectoriser.vectorise(['chunk2'])
embeddings = batch_vectoriser.vectorise(['chunk2'], key_prefix='key_1')
self.assertEquals([[3.0, 4.0]], embeddings)
self.assertEquals(0, mock_vectorise.call_count)

Expand All @@ -97,20 +101,20 @@ def test_batch_vectoriser_should_support_different_content_chunk_types(self, moc
image1 = load_image_from_path(TestImageUrls.IMAGE1.value, {})

for modality, chunk_type, content_chunks, side_effect in [
(Modality.TEXT, str, ['chunk1', 'chunk2'], [[[1.0, 2.0], [3.0, 4.0]]]),
(Modality.IMAGE, Image, [image0, image1], [[[1.0, 2.0], [3.0, 4.0]]]),
(Modality.IMAGE, torch.Tensor, [torch.tensor([1., 2.]), torch.tensor([2., 3.])], [[[1.0, 2.0], [3.0, 4.0]]]),
(Modality.AUDIO, Dict[str, torch.Tensor], [{'audio_chunk1': torch.tensor([1., 2.])},
{'audio_chunk2': torch.tensor([2., 3.])}], [[[1.0, 2.0]], [[3.0, 4.0]]]),
(Modality.VIDEO, Dict[str, torch.Tensor], [{'video_chunk1': torch.tensor([1., 2.])},
{'video_chunk2': torch.tensor([2., 3.])}], [[[1.0, 2.0]], [[3.0, 4.0]]]),
(Modality.TEXT, str, [('key_0_0', 'chunk1'), ('key_1_0', 'chunk2')], [[[1.0, 2.0], [3.0, 4.0]]]),
(Modality.IMAGE, Image, [('key_0_0', image0), ('key_1_0', image1)], [[[1.0, 2.0], [3.0, 4.0]]]),
(Modality.IMAGE, torch.Tensor, [('key_0_0', torch.tensor([1., 2.])), ('key_1_0', torch.tensor([2., 3.]))], [[[1.0, 2.0], [3.0, 4.0]]]),
(Modality.AUDIO, Dict[str, torch.Tensor], [('key_0_0', {'audio_chunk1': torch.tensor([1., 2.])}),
('key_1_0', {'audio_chunk2': torch.tensor([2., 3.])})], [[[1.0, 2.0]], [[3.0, 4.0]]]),
(Modality.VIDEO, Dict[str, torch.Tensor], [('key_0_0', {'video_chunk1': torch.tensor([1., 2.])}),
('key_1_0', {'video_chunk2': torch.tensor([2., 3.])})], [[[1.0, 2.0]], [[3.0, 4.0]]]),
]:
with self.subTest(modality=modality, chunk_type=chunk_type):
mock_vectorise.reset_mock()
mock_vectorise.side_effect = side_effect
batch_vectoriser = BatchCachingVectoriser(modality, content_chunks, self.model_config)

embeddings = batch_vectoriser.vectorise([content_chunks[1]])
embeddings = batch_vectoriser.vectorise([content_chunks[1]], key_prefix='key_1')
self.assertEquals([[3.0, 4.0]], embeddings)

@patch('marqo.s2_inference.s2_inference.vectorise')
Expand Down

0 comments on commit 8aafb3b

Please sign in to comment.