From e1a18e7ff2192f15cfe3417773173035740df3f0 Mon Sep 17 00:00:00 2001 From: teisnp Date: Tue, 11 Jul 2023 11:53:36 +0200 Subject: [PATCH 1/6] added normalize option to multiprocess encode --- sentence_transformers/SentenceTransformer.py | 25 +++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/sentence_transformers/SentenceTransformer.py b/sentence_transformers/SentenceTransformer.py index e44e573a5..098372d42 100644 --- a/sentence_transformers/SentenceTransformer.py +++ b/sentence_transformers/SentenceTransformer.py @@ -250,7 +250,13 @@ def stop_multi_process_pool(pool): pool['output'].close() - def encode_multi_process(self, sentences: List[str], pool: Dict[str, object], batch_size: int = 32, chunk_size: int = None): + def encode_multi_process( + self, + sentences: List[str], + pool: Dict[str, object], + batch_size: int = 32, + chunk_size: int = None, + normalize_embeddings: bool = False): """ This method allows to run encode() on multiple GPUs. The sentences are chunked into smaller packages and sent to individual processes, which encode these on the different GPUs. This method is only suitable @@ -275,12 +281,12 @@ def encode_multi_process(self, sentences: List[str], pool: Dict[str, object], ba for sentence in sentences: chunk.append(sentence) if len(chunk) >= chunk_size: - input_queue.put([last_chunk_id, batch_size, chunk]) + input_queue.put([last_chunk_id, batch_size, chunk, normalize_embeddings]) last_chunk_id += 1 chunk = [] if len(chunk) > 0: - input_queue.put([last_chunk_id, batch_size, chunk]) + input_queue.put([last_chunk_id, batch_size, chunk, normalize_embeddings]) last_chunk_id += 1 output_queue = pool['output'] @@ -295,8 +301,15 @@ def _encode_multi_process_worker(target_device: str, model, input_queue, results """ while True: try: - id, batch_size, sentences = input_queue.get() - embeddings = model.encode(sentences, device=target_device, show_progress_bar=False, convert_to_numpy=True, batch_size=batch_size) + id, batch_size, sentences, normalize = input_queue.get() + embeddings = model.encode( + sentences, + device=target_device, + show_progress_bar=False, + convert_to_numpy=True, + batch_size=batch_size, + normalize_embeddings=normalize) + results_queue.put([id, embeddings]) except queue.Empty: break @@ -911,4 +924,4 @@ def max_seq_length(self, value): """ Property to set the maximal input sequence length for the model. Longer inputs will be truncated. """ - self._first_module().max_seq_length = value + self._first_module().max_seq_length = value \ No newline at end of file From f7b37f4fa44eb34a047013a15343a31f79cdd472 Mon Sep 17 00:00:00 2001 From: Tom Aarsen Date: Wed, 13 Dec 2023 15:03:44 +0100 Subject: [PATCH 2/6] Rename some variables --- sentence_transformers/SentenceTransformer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sentence_transformers/SentenceTransformer.py b/sentence_transformers/SentenceTransformer.py index 2a4752f66..2dbb7e990 100644 --- a/sentence_transformers/SentenceTransformer.py +++ b/sentence_transformers/SentenceTransformer.py @@ -320,16 +320,17 @@ def _encode_multi_process_worker(target_device: str, model, input_queue, results """ while True: try: - id, batch_size, sentences, normalize = input_queue.get() + chunk_id, batch_size, sentences, normalize_embeddings = input_queue.get() embeddings = model.encode( sentences, device=target_device, show_progress_bar=False, convert_to_numpy=True, batch_size=batch_size, - normalize_embeddings=normalize) + normalize_embeddings=normalize_embeddings, + ) - results_queue.put([id, embeddings]) + results_queue.put([chunk_id, embeddings]) except queue.Empty: break From 8186bc48d08982d591e7f249f0ae33a0295a3281 Mon Sep 17 00:00:00 2001 From: Tom Aarsen Date: Wed, 13 Dec 2023 15:03:56 +0100 Subject: [PATCH 3/6] Add missing docstring --- sentence_transformers/SentenceTransformer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sentence_transformers/SentenceTransformer.py b/sentence_transformers/SentenceTransformer.py index 2dbb7e990..4a33df122 100644 --- a/sentence_transformers/SentenceTransformer.py +++ b/sentence_transformers/SentenceTransformer.py @@ -285,6 +285,8 @@ def encode_multi_process( :param pool: A pool of workers started with SentenceTransformer.start_multi_process_pool :param batch_size: Encode sentences with batch size :param chunk_size: Sentences are chunked and sent to the individual processes. If none, it determine a sensible size. + :param normalize_embeddings: Whether to normalize returned vectors to have length 1. In that case, + the faster dot-product (util.dot_score) instead of cosine similarity can be used. :return: Numpy matrix with all embeddings """ From 4561191c7bea0b7be0677a03057b65ee85a33f3d Mon Sep 17 00:00:00 2001 From: Tom Aarsen Date: Wed, 13 Dec 2023 15:04:13 +0100 Subject: [PATCH 4/6] Update logger text --- sentence_transformers/SentenceTransformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentence_transformers/SentenceTransformer.py b/sentence_transformers/SentenceTransformer.py index 4a33df122..3d6549da5 100644 --- a/sentence_transformers/SentenceTransformer.py +++ b/sentence_transformers/SentenceTransformer.py @@ -235,7 +235,7 @@ def start_multi_process_pool(self, target_devices: List[str] = None): if torch.cuda.is_available(): target_devices = ['cuda:{}'.format(i) for i in range(torch.cuda.device_count())] else: - logger.info("CUDA is not available. Start 4 CPU worker") + logger.info("CUDA is not available. Starting 4 CPU workers") target_devices = ['cpu']*4 logger.info("Start multi-process pool on devices: {}".format(', '.join(map(str, target_devices)))) From 05d96a57575bbbf842bc528a0a33b00bdb5495ee Mon Sep 17 00:00:00 2001 From: Tom Aarsen Date: Wed, 13 Dec 2023 15:04:30 +0100 Subject: [PATCH 5/6] Moving to CPU is required, otherwise all weights become 0.0 --- sentence_transformers/SentenceTransformer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sentence_transformers/SentenceTransformer.py b/sentence_transformers/SentenceTransformer.py index 3d6549da5..234b5807e 100644 --- a/sentence_transformers/SentenceTransformer.py +++ b/sentence_transformers/SentenceTransformer.py @@ -240,6 +240,8 @@ def start_multi_process_pool(self, target_devices: List[str] = None): logger.info("Start multi-process pool on devices: {}".format(', '.join(map(str, target_devices)))) + self.to("cpu") + self.share_memory() ctx = mp.get_context('spawn') input_queue = ctx.Queue() output_queue = ctx.Queue() From bdf06df2565f34ca06b830543575df7ea637869d Mon Sep 17 00:00:00 2001 From: Tom Aarsen Date: Wed, 13 Dec 2023 15:13:12 +0100 Subject: [PATCH 6/6] Update test_encode_multi_process tests --- sentence_transformers/SentenceTransformer.py | 4 +-- tests/conftest.py | 8 +++++ tests/test_multi_process.py | 35 ++++++++++---------- 3 files changed, 27 insertions(+), 20 deletions(-) create mode 100644 tests/conftest.py diff --git a/sentence_transformers/SentenceTransformer.py b/sentence_transformers/SentenceTransformer.py index 234b5807e..ad642b853 100644 --- a/sentence_transformers/SentenceTransformer.py +++ b/sentence_transformers/SentenceTransformer.py @@ -247,8 +247,8 @@ def start_multi_process_pool(self, target_devices: List[str] = None): output_queue = ctx.Queue() processes = [] - for cuda_id in target_devices: - p = ctx.Process(target=SentenceTransformer._encode_multi_process_worker, args=(cuda_id, self, input_queue, output_queue), daemon=True) + for device_id in target_devices: + p = ctx.Process(target=SentenceTransformer._encode_multi_process_worker, args=(device_id, self, input_queue, output_queue), daemon=True) p.start() processes.append(p) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 000000000..0839508ae --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,8 @@ + +from sentence_transformers import SentenceTransformer +import pytest + + +@pytest.fixture() +def model() -> SentenceTransformer: + return SentenceTransformer("sentence-transformers-testing/stsb-bert-tiny-safetensors") diff --git a/tests/test_multi_process.py b/tests/test_multi_process.py index 62f343e12..103c124d8 100644 --- a/tests/test_multi_process.py +++ b/tests/test_multi_process.py @@ -3,31 +3,30 @@ """ -import unittest from sentence_transformers import SentenceTransformer import numpy as np +import pytest -class ComputeMultiProcessTest(unittest.TestCase): - def setUp(self): - self.model = SentenceTransformer('paraphrase-distilroberta-base-v1') +@pytest.mark.parametrize("normalize_embeddings", (False, True)) +def test_encode_multi_process(model: SentenceTransformer, normalize_embeddings: bool) -> None: + sentences = ["This is sentence {}".format(i) for i in range(40)] - def test_multi_gpu_encode(self): - # Start the multi-process pool on all available CUDA devices - pool = self.model.start_multi_process_pool(['cpu', 'cpu']) + # Start the multi-process pool on e.g. two CPU devices & compute the embeddings using the pool + pool = model.start_multi_process_pool(['cpu', 'cpu']) + emb = model.encode_multi_process(sentences, pool, chunk_size=10, normalize_embeddings=normalize_embeddings) + model.stop_multi_process_pool(pool) + assert emb.shape == (len(sentences), 128) - sentences = ["This is sentence {}".format(i) for i in range(1000)] + # Make sure the embeddings aren't just all 0 + assert emb.sum() != 0.0 - # Compute the embeddings using the multi-process pool - emb = self.model.encode_multi_process(sentences, pool, chunk_size=50) - self.model.stop_multi_process_pool(pool) - assert emb.shape == (len(sentences), 768) + # Compare against normal embeddings + emb_normal = model.encode(sentences, normalize_embeddings=normalize_embeddings) + diff = np.max(np.abs(emb - emb_normal)) + assert diff < 0.001 - emb_normal = self.model.encode(sentences) - - - diff = np.max(np.abs(emb - emb_normal)) - print("Max multi proc diff", diff) - assert diff < 0.001 + # Ensure that after normalizing, the means are all almost 0, and otherwise not + assert np.all(np.abs(emb.mean(1)) < 0.01) == normalize_embeddings