From 5663e168212ae5044ad18aa285033d8a2e7bf046 Mon Sep 17 00:00:00 2001 From: Eero Tamminen Date: Fri, 6 Dec 2024 05:08:57 +0200 Subject: [PATCH 1/2] Exclude yield/reply time from first token latency metric (#973) While metrics are OK for small number of requests, when megaservice is handling many (hundreds of) _parallel_ requests, it was reporting clearly (~10%) larger first token latency, than the client receiving the tokens from the megaservice. Getting the time before token is yielded, means that reported first token latency can be slightly shorter than it actually is. However, testing with ChatQnA shows latencies to be clearly closer to ones seen by the client (within couple of percent) and typically smaller (i.e. logical). PS. Doing the metrics timing after yielding the token, meant that also time for sending the reply to the client and waiting that to complete, was included to the token time. I suspect that with lot of parallel requests, processing often had switched to other megaservice request processing threads, and getting control back to yielding thread for timing, could be delayed much longer than sending the response to client took. Signed-off-by: Eero Tamminen --- comps/cores/mega/orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 8a75f9cff..803965f6e 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -237,8 +237,8 @@ def generate(): ) token_start = time.time() else: - yield chunk token_start = self.metrics.token_update(token_start, is_first) + yield chunk is_first = False self.metrics.request_update(req_start) self.metrics.pending_update(False) @@ -306,7 +306,7 @@ def token_generator(self, sentence: str, token_start: float, is_first: bool, is_ suffix = "\n\n" tokens = re.findall(r"\s?\S+\s?", sentence, re.UNICODE) for token in tokens: - yield prefix + repr(token.replace("\\n", "\n").encode("utf-8")) + suffix token_start = self.metrics.token_update(token_start, is_first) + yield prefix + repr(token.replace("\\n", "\n").encode("utf-8")) + suffix if is_last: yield "data: [DONE]\n\n" From fbf3017afb8d024007f7e3ca545eb9faa7d29399 Mon Sep 17 00:00:00 2001 From: Yao Qing Date: Fri, 6 Dec 2024 13:18:51 +0800 Subject: [PATCH 2/2] Revert mosec embedding microservice to to use synchronous interface. (#971) * Revert mosec embedding microservice to to use synchronous interface. Signed-off-by: Yao, Qing * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add dependency. Signed-off-by: Yao, Qing --------- Signed-off-by: Yao, Qing Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../mosec/langchain/embedding_mosec.py | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/comps/embeddings/mosec/langchain/embedding_mosec.py b/comps/embeddings/mosec/langchain/embedding_mosec.py index 38e92b5a7..e422d92b6 100644 --- a/comps/embeddings/mosec/langchain/embedding_mosec.py +++ b/comps/embeddings/mosec/langchain/embedding_mosec.py @@ -7,6 +7,7 @@ from typing import List, Optional, Union from langchain_community.embeddings import OpenAIEmbeddings +from langchain_community.embeddings.openai import async_embed_with_retry from comps import ( CustomLogger, @@ -35,7 +36,7 @@ async def _aget_len_safe_embeddings( ) -> List[List[float]]: _chunk_size = chunk_size or self.chunk_size batched_embeddings: List[List[float]] = [] - response = self.client.create(input=texts, **self._invocation_params) + response = await async_embed_with_retry(self, input=texts, **self._invocation_params) if not isinstance(response, dict): response = response.model_dump() batched_embeddings.extend(r["embedding"] for r in response["data"]) @@ -45,7 +46,7 @@ async def _aget_len_safe_embeddings( async def empty_embedding() -> List[float]: nonlocal _cached_empty_embedding if _cached_empty_embedding is None: - average_embedded = self.client.create(input="", **self._invocation_params) + average_embedded = await async_embed_with_retry(self, input="", **self._invocation_params) if not isinstance(average_embedded, dict): average_embedded = average_embedded.model_dump() _cached_empty_embedding = average_embedded["data"][0]["embedding"] @@ -57,6 +58,29 @@ async def get_embedding(e: Optional[List[float]]) -> List[float]: embeddings = await asyncio.gather(*[get_embedding(e) for e in batched_embeddings]) return embeddings + def _get_len_safe_embeddings( + self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None + ) -> List[List[float]]: + _chunk_size = chunk_size or self.chunk_size + batched_embeddings: List[List[float]] = [] + response = self.client.create(input=texts, **self._invocation_params) + if not isinstance(response, dict): + response = response.model_dump() + batched_embeddings.extend(r["embedding"] for r in response["data"]) + + _cached_empty_embedding: Optional[List[float]] = None + + def empty_embedding() -> List[float]: + nonlocal _cached_empty_embedding + if _cached_empty_embedding is None: + average_embedded = self.client.create(input="", **self._invocation_params) + if not isinstance(average_embedded, dict): + average_embedded = average_embedded.model_dump() + _cached_empty_embedding = average_embedded["data"][0]["embedding"] + return _cached_empty_embedding + + return [e if e is not None else empty_embedding() for e in batched_embeddings] + @register_microservice( name="opea_service@embedding_mosec", @@ -68,18 +92,18 @@ async def get_embedding(e: Optional[List[float]]) -> List[float]: output_datatype=EmbedDoc, ) @register_statistics(names=["opea_service@embedding_mosec"]) -async def embedding( +def embedding( input: Union[TextDoc, EmbeddingRequest, ChatCompletionRequest] ) -> Union[EmbedDoc, EmbeddingResponse, ChatCompletionRequest]: if logflag: logger.info(input) start = time.time() if isinstance(input, TextDoc): - embed_vector = await get_embeddings(input.text) + embed_vector = get_embeddings(input.text) embedding_res = embed_vector[0] if isinstance(input.text, str) else embed_vector res = EmbedDoc(text=input.text, embedding=embedding_res) else: - embed_vector = await get_embeddings(input.input) + embed_vector = get_embeddings(input.input) if input.dimensions is not None: embed_vector = [embed_vector[i][: input.dimensions] for i in range(len(embed_vector))] @@ -99,9 +123,9 @@ async def embedding( return res -async def get_embeddings(text: Union[str, List[str]]) -> List[List[float]]: +def get_embeddings(text: Union[str, List[str]]) -> List[List[float]]: texts = [text] if isinstance(text, str) else text - embed_vector = await embeddings.aembed_documents(texts) + embed_vector = embeddings.embed_documents(texts) return embed_vector