Skip to content

Commit

Permalink
Embedding Runtime on NeuralSpeed (opea-project#448)
Browse files Browse the repository at this point in the history
* neural-speed release 0.1

* added docker compose & modified neuralspeed

* updated neural-speed dynamic batching

* updated neural-speed dynamic batching

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* bge done

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update doc & cleancode

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update client_multibatch.py

* release update

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: chen, suyue <[email protected]>
  • Loading branch information
3 people authored Aug 26, 2024
1 parent 9b8798a commit 0292355
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 0 deletions.
35 changes: 35 additions & 0 deletions comps/embeddings/neural-speed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# build Mosec endpoint docker image

```
docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy -t langchain-mosec:neuralspeed -f comps/embeddings/neural-speed/neuralspeed-docker/Dockerfile .
```

# build embedding microservice docker image

```
docker build --build-arg http_proxy=$http_proxy --build-arg https_proxy=$https_proxy -t opea/embedding-langchain-mosec:neuralspeed -f comps/embeddings/neural-speed/docker/Dockerfile .
```

Note: Please contact us to request model files before building images.

# launch Mosec endpoint docker container

```
docker run -d --name="embedding-langchain-mosec-endpoint" -p 6001:8000 langchain-mosec:neuralspeed
```

# launch embedding microservice docker container

```
export MOSEC_EMBEDDING_ENDPOINT=http://{mosec_embedding_host_ip}:6001
docker run -d --name="embedding-langchain-mosec-server" -e http_proxy=$http_proxy -e https_proxy=$https_proxy -p 6000:6000 --ipc=host -e MOSEC_EMBEDDING_ENDPOINT=$MOSEC_EMBEDDING_ENDPOINT opea/embedding-langchain-mosec:neuralspeed
```

# run client test

```
curl localhost:6000/v1/embeddings \
-X POST \
-d '{"text":"Hello, world!"}' \
-H 'Content-Type: application/json'
```
2 changes: 2 additions & 0 deletions comps/embeddings/neural-speed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
30 changes: 30 additions & 0 deletions comps/embeddings/neural-speed/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

FROM langchain/langchain:latest

RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
libgl1-mesa-glx \
libjemalloc-dev \
vim

RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
chown -R user /home/user/

USER user

COPY comps /home/user/comps

RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /home/user/comps/embeddings/neural-speed/requirements.txt

RUN pip3 install llmspec mosec msgspec httpx requests

ENV PYTHONPATH=$PYTHONPATH:/home/user

WORKDIR /home/user/comps/embeddings/neural-speed

ENTRYPOINT ["python", "embedding_neuralspeed_svc.py"]

22 changes: 22 additions & 0 deletions comps/embeddings/neural-speed/docker/docker_compose_embedding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

version: "3.8"

services:
embedding:
image: opea/embedding-langchain-mosec:neuralspeed
container_name: embedding-langchain-mosec-server
ports:
- "6000:6000"
ipc: host
environment:
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
MOSEC_EMBEDDING_ENDPOINT: ${MOSEC_EMBEDDING_ENDPOINT}
LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY}
restart: unless-stopped

networks:
default:
driver: bridge
83 changes: 83 additions & 0 deletions comps/embeddings/neural-speed/embedding_neuralspeed_svc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
import time
from typing import List, Optional

import httpx
import msgspec
import requests
from langchain_community.embeddings import OpenAIEmbeddings
from langsmith import traceable

from comps import (
EmbedDoc,
ServiceType,
TextDoc,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)


class MosecEmbeddings(OpenAIEmbeddings):

def _get_len_safe_embeddings(
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None
) -> List[List[float]]:
_chunk_size = chunk_size or self.chunk_size
batched_embeddings: List[List[float]] = []
response = self.client.create(input=texts, **self._invocation_params)
if not isinstance(response, dict):
response = response.model_dump()
batched_embeddings.extend(r["embedding"] for r in response["data"])

_cached_empty_embedding: Optional[List[float]] = None

def empty_embedding() -> List[float]:
nonlocal _cached_empty_embedding
if _cached_empty_embedding is None:
average_embedded = self.client.create(input="", **self._invocation_params)
if not isinstance(average_embedded, dict):
average_embedded = average_embedded.model_dump()
_cached_empty_embedding = average_embedded["data"][0]["embedding"]
return _cached_empty_embedding

return [e if e is not None else empty_embedding() for e in batched_embeddings]


@register_microservice(
name="opea_service@embedding_mosec",
service_type=ServiceType.EMBEDDING,
endpoint="/v1/embeddings",
host="0.0.0.0",
port=6000,
input_datatype=TextDoc,
output_datatype=EmbedDoc,
)
@traceable(run_type="embedding")
@register_statistics(names=["opea_service@embedding_mosec"])
def embedding(input: TextDoc) -> EmbedDoc:
start = time.time()
req = {
"query": input.text,
}
request_url = MOSEC_EMBEDDING_ENDPOINT + "/inference"
resp = requests.post(request_url, data=msgspec.msgpack.encode(req))

embed_vector = msgspec.msgpack.decode(resp.content)["embeddings"]
res = EmbedDoc(text=req["query"][0], embedding=embed_vector)
statistics_dict["opea_service@embedding_mosec"].append_latency(time.time() - start, None)
return res


if __name__ == "__main__":
MOSEC_EMBEDDING_ENDPOINT = os.environ.get("MOSEC_EMBEDDING_ENDPOINT", "http://127.0.0.1:6001")
os.environ["OPENAI_API_BASE"] = MOSEC_EMBEDDING_ENDPOINT
os.environ["OPENAI_API_KEY"] = "Dummy key"
MODEL_ID = os.environ.get("MODEL_ID", "BAAI/bge-base-en-v1.5")
embeddings = MosecEmbeddings(model=MODEL_ID)
print("NeuralSpeed Embedding Microservice Initialized.")
opea_microservices["opea_service@embedding_mosec"].start()
26 changes: 26 additions & 0 deletions comps/embeddings/neural-speed/neuralspeed-docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

From ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive

ENV GLIBC_TUNABLES glibc.cpu.x86_shstk=permissive

COPY comps /root/comps
COPY neural_speed-0.1.dev117+gafc0030.d20240815-cp310-cp310-linux_x86_64.whl /root/
COPY bge-base-q8.bin /root/

RUN apt update && apt install -y python3 python3-pip
RUN pip3 install -r /root/comps/embeddings/neural-speed/neuralspeed-docker/requirements.txt
RUN pip3 install llmspec mosec msgspec httpx requests
RUN pip3 install /root/neural_speed-0.1.dev117+gafc0030.d20240815-cp310-cp310-linux_x86_64.whl

RUN cd /root/ && export HF_ENDPOINT=https://hf-mirror.com && huggingface-cli download --resume-download BAAI/bge-base-en-v1.5 --local-dir /root/bge-base-en-v1.5


ENV LD_PRELOAD=/root/libstdc++.so.6


WORKDIR /root/comps/embeddings/neural-speed/neuralspeed-docker

CMD ["python3", "server.py"]
31 changes: 31 additions & 0 deletions comps/embeddings/neural-speed/neuralspeed-docker/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
from http import HTTPStatus

import httpx
import msgspec
import requests

input_text = "what a nice day"
req = {
"query": input_text,
}

httpx_response = httpx.post("http://127.0.0.1:6001/inference", content=msgspec.msgpack.encode(req))

requests_response = requests.post("http://127.0.0.1:6001/inference", data=msgspec.msgpack.encode(req))

MOSEC_EMBEDDING_ENDPOINT = os.environ.get("MOSEC_EMBEDDING_ENDPOINT", "http://127.0.0.1:6001")

request_url = MOSEC_EMBEDDING_ENDPOINT + "/inference"
print(f"request_url = {request_url}")
resp_3 = requests.post(request_url, data=msgspec.msgpack.encode(req))

if httpx_response.status_code == HTTPStatus.OK and requests_response.status_code == HTTPStatus.OK:
print(f"OK: \n {msgspec.msgpack.decode(httpx_response.content)}")
print(f"OK: \n {msgspec.msgpack.decode(requests_response.content)}")
print(f"OK: \n {msgspec.msgpack.decode(resp_3.content)}")
else:
print(f"err[{httpx_response.status_code}] {httpx_response.text}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from http import HTTPStatus
from threading import Thread

import httpx
import msgspec

req = {
"query": "Return the ‘thread identifier’ of the current thread. This is a nonzero integer. Its value has no direct meaning; it is intended as a magic cookie to be used e.g. to index a dictionary of thread-specific data. Thread identifiers may be recycled when a thread exits and another thread is created.",
}
reqs = []
BATCH = 32
for i in range(BATCH):
reqs.append(msgspec.msgpack.encode(req))


def post_func(threadIdx):
resp = httpx.post("http://127.0.0.1:6001/inference", content=reqs[threadIdx])
ret = f"thread {threadIdx} \n"
if resp.status_code == HTTPStatus.OK:
ret += f"OK: {msgspec.msgpack.decode(resp.content)['embeddings'][:16]}"
else:
ret += f"err[{resp.status_code}] {resp.text}"
print(ret)


threads = []
for i in range(BATCH):
t = Thread(
target=post_func,
args=[
i,
],
)
threads.append(t)

for i in range(BATCH):
threads[i].start()
16 changes: 16 additions & 0 deletions comps/embeddings/neural-speed/neuralspeed-docker/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--extra-index-url https://download.pytorch.org/whl/cpu
accelerate
cmake
datasets
huggingface_hub
matplotlib
numpy
peft
protobuf<3.20
py-cpuinfo
sentencepiece
tiktoken
torch
transformers
transformers_stream_generator
zipfile38
81 changes: 81 additions & 0 deletions comps/embeddings/neural-speed/neuralspeed-docker/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import time
from typing import Any, List

import numpy
from mosec import Server, Worker, get_logger
from mosec.mixin import TypedMsgPackMixin
from msgspec import Struct
from neural_speed import Model
from transformers import AutoTokenizer

logger = get_logger()

INFERENCE_BATCH_SIZE = 32
INFERENCE_MAX_WAIT_TIME = 30
INFERENCE_WORKER_NUM = 1
INFERENCE_CONTEXT = 512

TorchModel = "/root/bge-base-en-v1.5"
NS_Bin = "/root/bge-base-q8.bin"

NS_Model = "bert"


class Request(Struct, kw_only=True):
query: str


class Response(Struct, kw_only=True):
embeddings: List[float]


class Inference(TypedMsgPackMixin, Worker):

def __init__(self):
super().__init__()
self.tokenizer = AutoTokenizer.from_pretrained(TorchModel)
self.model = Model()
self.model.init_from_bin(
NS_Model,
NS_Bin,
batch_size=INFERENCE_BATCH_SIZE,
n_ctx=INFERENCE_CONTEXT + 2,
)

def forward(self, data: List[Request]) -> List[Response]:
batch = len(data)
sequences = [d.query for d in data]
inputs = self.tokenizer(
sequences,
padding=True,
truncation=True,
max_length=INFERENCE_CONTEXT,
return_tensors="pt",
)
st = time.time()
ns_outputs = self.model(
**inputs,
reinit=True,
logits_all=True,
continuous_batching=False,
ignore_padding=True,
)
logger.info(f"batch {batch} input shape {inputs.input_ids.shape} time {time.time()-st}")
ns_outputs = ns_outputs[:, 0]
ns_outputs = ns_outputs / numpy.linalg.norm(ns_outputs, axis=1, keepdims=True)
resps = []
for i in range(batch):
resp = Response(embeddings=ns_outputs[i].tolist())
resps.append(resp)
return resps


if __name__ == "__main__":
server = Server()
server.append_worker(
Inference, max_batch_size=INFERENCE_BATCH_SIZE, max_wait_time=INFERENCE_MAX_WAIT_TIME, num=INFERENCE_WORKER_NUM
)
server.run()
11 changes: 11 additions & 0 deletions comps/embeddings/neural-speed/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
docarray[full]
fastapi
langchain
langchain_community
openai
opentelemetry-api
opentelemetry-exporter-otlp
opentelemetry-sdk
prometheus-fastapi-instrumentator
shortuuid
uvicorn

0 comments on commit 0292355

Please sign in to comment.