From 14890bd7591ec361ff57d136e65bbfa6e078cee4 Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Fri, 1 Mar 2024 22:26:45 +0800 Subject: [PATCH 1/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/consts/consts.py | 4 ++ app/search_vector/service/search_vector.py | 7 +++ app/search_vector/tracing/tracing.py | 55 ++++++++++++++++++++++ requirements.txt | 8 ++++ 4 files changed, 74 insertions(+) create mode 100644 app/search_vector/tracing/tracing.py diff --git a/app/search_vector/consts/consts.py b/app/search_vector/consts/consts.py index 0fec1c0..1471271 100644 --- a/app/search_vector/consts/consts.py +++ b/app/search_vector/consts/consts.py @@ -21,3 +21,7 @@ KAFKA_CONSUMER_VECTOR_INDEX_TOPIC = "search-engine-csv-loader-topic" VECTOR_RECALL_TOPK = 20 + +OTEL_ENDPOINT = "127.0.0.1:4371" + +SERVICE_NAME = "tangseng" diff --git a/app/search_vector/service/search_vector.py b/app/search_vector/service/search_vector.py index e4a7959..9e56d27 100644 --- a/app/search_vector/service/search_vector.py +++ b/app/search_vector/service/search_vector.py @@ -22,12 +22,16 @@ import asyncio from ..consts.consts import VECTOR_RECALL_TOPK +from ..consts.consts import OTEL_ENDPOINT +from ..consts.consts import SERVICE_NAME from idl.pb.search_vector import search_vector_pb2 from ..config.config import DEFAULT_MILVUS_TABLE_NAME, VECTOR_ADDR from ..milvus.operators import do_search from ..etcd_operate.etcd import etcd_client from ..milvus.milvus import milvus_client from idl.pb.search_vector import search_vector_pb2_grpc +from ..tracing.tracing import init_tracer_provider +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): @@ -57,6 +61,9 @@ def SearchVector(self, request, async def serve() -> None: + init_tracer_provider(url=OTEL_ENDPOINT, service_name=SERVICE_NAME) + # 初始化 gRPC 客户端追踪器 + GrpcInstrumentorClient().instrument() server = grpc.aio.server() search_vector_pb2_grpc.add_SearchVectorServiceServicer_to_server( SearchVectorService(), server) diff --git a/app/search_vector/tracing/tracing.py b/app/search_vector/tracing/tracing.py new file mode 100644 index 0000000..be9d03d --- /dev/null +++ b/app/search_vector/tracing/tracing.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.resources import SERVICE_NAME, Resource, SERVICE_NAMESPACE +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.propagate import set_global_textmap +from opentelemetry.propagators.b3 import B3MultiFormat + + +def init_tracer_provider(url, service_name): + # 创建一个新的 OTLP 导出器 + exporter = OTLPSpanExporter( + insecure=True, + endpoint=url, + ) + + resource = Resource(attributes={ + SERVICE_NAME: service_name, + }) + + # 设置全局tracer + provider = TracerProvider( + resource=resource, + ) + provider.add_span_processor(BatchSpanProcessor(exporter)) + trace.set_tracer_provider(provider) + + # 设置全局Propagator + b3_propagator = B3MultiFormat() + set_global_textmap(b3_propagator) + + +def get_trace_id(context): + span_context = trace.get_current_span(context).get_span_context() + if span_context.is_valid: + return span_context.trace_id + return None diff --git a/requirements.txt b/requirements.txt index 5a39e02..4e8cef3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -64,3 +64,11 @@ ujson==5.8.0 urllib3==2.0.4 Werkzeug==2.3.7 zipp==3.17.0 +opentelemetry-api==1.23.0 +opentelemetry-sdk==1.23.0 +opentelemetry-exporter-otlp-proto-grpc==1.23.0 +opentelemetry-propagator-b3==1.23.0 +opentelemetry-instrumentation-grpc==0.44b0 + + + From 1809abe320bf6423c80a5fd4fca27f66917850e3 Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Sat, 2 Mar 2024 19:04:11 +0800 Subject: [PATCH 2/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/consts/consts.py | 2 +- app/search_vector/service/search_vector.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/app/search_vector/consts/consts.py b/app/search_vector/consts/consts.py index 1471271..ce28e2e 100644 --- a/app/search_vector/consts/consts.py +++ b/app/search_vector/consts/consts.py @@ -22,6 +22,6 @@ VECTOR_RECALL_TOPK = 20 -OTEL_ENDPOINT = "127.0.0.1:4371" +OTEL_ENDPOINT = "127.0.0.1:4317" SERVICE_NAME = "tangseng" diff --git a/app/search_vector/service/search_vector.py b/app/search_vector/service/search_vector.py index 9e56d27..b32f1d6 100644 --- a/app/search_vector/service/search_vector.py +++ b/app/search_vector/service/search_vector.py @@ -31,7 +31,7 @@ from ..milvus.milvus import milvus_client from idl.pb.search_vector import search_vector_pb2_grpc from ..tracing.tracing import init_tracer_provider -from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient +from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): @@ -62,8 +62,8 @@ def SearchVector(self, request, async def serve() -> None: init_tracer_provider(url=OTEL_ENDPOINT, service_name=SERVICE_NAME) - # 初始化 gRPC 客户端追踪器 - GrpcInstrumentorClient().instrument() + # 初始化 gRPC 追踪器 + GrpcInstrumentorServer().instrument() server = grpc.aio.server() search_vector_pb2_grpc.add_SearchVectorServiceServicer_to_server( SearchVectorService(), server) From 8a3810670ae43494618b7bc78ee85c9e2aed35c5 Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Sat, 2 Mar 2024 20:00:30 +0800 Subject: [PATCH 3/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/service/search_vector.py | 4 ---- app/search_vector/tracing/__init__.py | 17 +++++++++++++++++ main.py | 8 +++++++- requirements.txt | 2 ++ 4 files changed, 26 insertions(+), 5 deletions(-) create mode 100644 app/search_vector/tracing/__init__.py diff --git a/app/search_vector/service/search_vector.py b/app/search_vector/service/search_vector.py index b32f1d6..89135e8 100644 --- a/app/search_vector/service/search_vector.py +++ b/app/search_vector/service/search_vector.py @@ -22,15 +22,12 @@ import asyncio from ..consts.consts import VECTOR_RECALL_TOPK -from ..consts.consts import OTEL_ENDPOINT -from ..consts.consts import SERVICE_NAME from idl.pb.search_vector import search_vector_pb2 from ..config.config import DEFAULT_MILVUS_TABLE_NAME, VECTOR_ADDR from ..milvus.operators import do_search from ..etcd_operate.etcd import etcd_client from ..milvus.milvus import milvus_client from idl.pb.search_vector import search_vector_pb2_grpc -from ..tracing.tracing import init_tracer_provider from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer @@ -61,7 +58,6 @@ def SearchVector(self, request, async def serve() -> None: - init_tracer_provider(url=OTEL_ENDPOINT, service_name=SERVICE_NAME) # 初始化 gRPC 追踪器 GrpcInstrumentorServer().instrument() server = grpc.aio.server() diff --git a/app/search_vector/tracing/__init__.py b/app/search_vector/tracing/__init__.py new file mode 100644 index 0000000..2456923 --- /dev/null +++ b/app/search_vector/tracing/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + diff --git a/main.py b/main.py index 166f0b9..595d166 100644 --- a/main.py +++ b/main.py @@ -28,14 +28,17 @@ from PIL import Image from flask import Flask, request from torchvision import transforms +from opentelemetry.instrumentation.flask import FlaskInstrumentor from app.search_vector.service.search_vector import serve - +from app.search_vector.consts.consts import OTEL_ENDPOINT +from app.search_vector.consts.consts import SERVICE_NAME from app.search_vector.config.config import DEFAULT_MILVUS_TABLE_NAME, NETWORK_MODEL_NAME from app.search_vector.cirtorch.datasets.datahelpers import imresize from app.search_vector.cirtorch.networks.imageretrievalnet import init_network from app.search_vector.milvus.milvus import milvus_client from app.search_vector.milvus.operators import do_upload, do_search from app.search_vector.utils.logs import LOGGER +from app.search_vector.tracing.tracing import init_tracer_provider app = Flask(__name__) @@ -199,6 +202,9 @@ def init_model(): net, lsh, transform = init_model() if __name__ == "__main__": + init_tracer_provider(url=OTEL_ENDPOINT, service_name=SERVICE_NAME) + # 这个FlaskInstrumentor用于监视http的 + # FlaskInstrumentor().instrument_app(app) # app.run(host=WEBSITE_HOST, port=WEBSITE_PORT, debug=True) # print("start server {}:{}".format(WEBSITE_HOST, WEBSITE_PORT)) asyncio.run(serve()) diff --git a/requirements.txt b/requirements.txt index 4e8cef3..131b74b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -69,6 +69,8 @@ opentelemetry-sdk==1.23.0 opentelemetry-exporter-otlp-proto-grpc==1.23.0 opentelemetry-propagator-b3==1.23.0 opentelemetry-instrumentation-grpc==0.44b0 +opentelemetry-instrumentation-flask==0.44b0 + From 3f25742d827ec1f75a5015c70ce10ab82e67f4a1 Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Sat, 2 Mar 2024 20:49:49 +0800 Subject: [PATCH 4/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/consts/consts.py | 2 +- app/search_vector/service/search_vector.py | 40 +++++++++++----------- requirements.txt | 1 - 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/app/search_vector/consts/consts.py b/app/search_vector/consts/consts.py index ce28e2e..d049a66 100644 --- a/app/search_vector/consts/consts.py +++ b/app/search_vector/consts/consts.py @@ -24,4 +24,4 @@ OTEL_ENDPOINT = "127.0.0.1:4317" -SERVICE_NAME = "tangseng" +SERVICE_NAME = "tangseng-python" diff --git a/app/search_vector/service/search_vector.py b/app/search_vector/service/search_vector.py index 89135e8..cf19cc2 100644 --- a/app/search_vector/service/search_vector.py +++ b/app/search_vector/service/search_vector.py @@ -21,6 +21,7 @@ import logging import asyncio +from opentelemetry import trace from ..consts.consts import VECTOR_RECALL_TOPK from idl.pb.search_vector import search_vector_pb2 from ..config.config import DEFAULT_MILVUS_TABLE_NAME, VECTOR_ADDR @@ -28,7 +29,6 @@ from ..etcd_operate.etcd import etcd_client from ..milvus.milvus import milvus_client from idl.pb.search_vector import search_vector_pb2_grpc -from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): @@ -38,28 +38,28 @@ class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): def SearchVector(self, request, context) -> search_vector_pb2.SearchVectorResponse: - try: - queryies = request.query - doc_ids = [] - for query in queryies: - ids, distants = do_search(DEFAULT_MILVUS_TABLE_NAME, query, - VECTOR_RECALL_TOPK, milvus_client) - print("search vector ids", ids) - doc_ids += ids - print("search vector data", doc_ids) - return search_vector_pb2.SearchVectorResponse(code=200, - doc_ids=doc_ids, - msg='ok', - error='') - except Exception as e: - print("search vector error", e) - return search_vector_pb2.SearchVectorResponse(code=500, - error=str(e)) + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("SearchVector"): + try: + queryies = request.query + doc_ids = [] + for query in queryies: + ids, distants = do_search(DEFAULT_MILVUS_TABLE_NAME, query, + VECTOR_RECALL_TOPK, milvus_client) + print("search vector ids", ids) + doc_ids += ids + print("search vector data", doc_ids) + return search_vector_pb2.SearchVectorResponse(code=200, + doc_ids=doc_ids, + msg='ok', + error='') + except Exception as e: + print("search vector error", e) + return search_vector_pb2.SearchVectorResponse(code=500, + error=str(e)) async def serve() -> None: - # 初始化 gRPC 追踪器 - GrpcInstrumentorServer().instrument() server = grpc.aio.server() search_vector_pb2_grpc.add_SearchVectorServiceServicer_to_server( SearchVectorService(), server) diff --git a/requirements.txt b/requirements.txt index 131b74b..0afecce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -68,7 +68,6 @@ opentelemetry-api==1.23.0 opentelemetry-sdk==1.23.0 opentelemetry-exporter-otlp-proto-grpc==1.23.0 opentelemetry-propagator-b3==1.23.0 -opentelemetry-instrumentation-grpc==0.44b0 opentelemetry-instrumentation-flask==0.44b0 From 7fa3fa3fa54d3c1f9895884f74900b01e2cbe774 Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Sat, 2 Mar 2024 22:30:53 +0800 Subject: [PATCH 5/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/kafka_operate/consumer.py | 7 ++- .../kafka_operate/kafka_operate.py | 50 +++++++++++-------- app/search_vector/service/search_vector.py | 3 +- app/search_vector/tracing/tracing.py | 7 --- main.py | 2 +- vector_index.py | 13 +++-- 6 files changed, 44 insertions(+), 38 deletions(-) diff --git a/app/search_vector/kafka_operate/consumer.py b/app/search_vector/kafka_operate/consumer.py index f584f9e..8b991c1 100644 --- a/app/search_vector/kafka_operate/consumer.py +++ b/app/search_vector/kafka_operate/consumer.py @@ -16,6 +16,7 @@ # under the License. """store vector index from kafka""" +from opentelemetry import trace from ..kafka_operate.kafka_operate import kafka_helper @@ -23,5 +24,7 @@ def store_data_from_kafka(kafka_topic, milvus_table_name): """ store data to mivlus from kakfa for building inverted index """ - kafka_helper.connect_consumer(kafka_topic) - kafka_helper.consume_messages_store_milvus(milvus_table_name) + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("store_data_from_kafka"): + kafka_helper.connect_consumer(kafka_topic) + kafka_helper.consume_messages_store_milvus(milvus_table_name) diff --git a/app/search_vector/kafka_operate/kafka_operate.py b/app/search_vector/kafka_operate/kafka_operate.py index 5ca6ae6..eb07c8c 100644 --- a/app/search_vector/kafka_operate/kafka_operate.py +++ b/app/search_vector/kafka_operate/kafka_operate.py @@ -19,10 +19,13 @@ import json from kafka import KafkaProducer, KafkaConsumer from kafka.errors import KafkaError +from opentelemetry import trace from ..config.config import KAFKA_CLUSTER from ..milvus import milvus from ..milvus.operators import do_upload +tracer = trace.get_tracer(__name__) + class KafkaHelper: """ @@ -39,25 +42,27 @@ def connect_producer(self): """ connect kafka producer """ - try: - self.producer = KafkaProducer( - bootstrap_servers=self.bootstrap_servers) - print("Connected to Kafka producer successfully.") - except KafkaError as e: - print(f"Failed to connect to Kafka producer: {e}") + with tracer.start_as_current_span("connect_producer"): + try: + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers) + print("Connected to Kafka producer successfully.") + except KafkaError as e: + print(f"Failed to connect to Kafka producer: {e}") def connect_consumer(self, topic): """ connect kafka consumer """ - try: - self.consumer = KafkaConsumer( - topic, bootstrap_servers=self.bootstrap_servers) - print( - f"Connected to Kafka consumer successfully. Listening to topic: {topic}" - ) - except KafkaError as e: - print(f"Failed to connect to Kafka consumer: {e}") + with tracer.start_as_current_span("connect_consumer"): + try: + self.consumer = KafkaConsumer( + topic, bootstrap_servers=self.bootstrap_servers) + print( + f"Connected to Kafka consumer successfully. Listening to topic: {topic}" + ) + except KafkaError as e: + print(f"Failed to connect to Kafka consumer: {e}") def send_message(self, topic, msg): """ @@ -88,14 +93,15 @@ def consume_messages_store_milvus(self, milvus_table): """ consume messages from kafka and store in milvus """ - if not self.consumer: - print("No Kafka consumer connected.") - return - print("Consuming messages...") - for msg in self.consumer: - data = json.loads(msg.value.decode('utf-8')) - do_upload(milvus_table, int(data["doc_id"]), data["title"], - data["body"], self.milvus_client) + with tracer.start_as_current_span("connect_consumer"): + if not self.consumer: + print("No Kafka consumer connected.") + return + print("Consuming messages...") + for msg in self.consumer: + data = json.loads(msg.value.decode('utf-8')) + do_upload(milvus_table, int(data["doc_id"]), data["title"], + data["body"], self.milvus_client) def on_send_success(self, record_metadata): """ diff --git a/app/search_vector/service/search_vector.py b/app/search_vector/service/search_vector.py index cf19cc2..0c7f3ce 100644 --- a/app/search_vector/service/search_vector.py +++ b/app/search_vector/service/search_vector.py @@ -30,6 +30,8 @@ from ..milvus.milvus import milvus_client from idl.pb.search_vector import search_vector_pb2_grpc +tracer = trace.get_tracer(__name__) + class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): """ @@ -38,7 +40,6 @@ class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): def SearchVector(self, request, context) -> search_vector_pb2.SearchVectorResponse: - tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("SearchVector"): try: queryies = request.query diff --git a/app/search_vector/tracing/tracing.py b/app/search_vector/tracing/tracing.py index be9d03d..5f73a34 100644 --- a/app/search_vector/tracing/tracing.py +++ b/app/search_vector/tracing/tracing.py @@ -46,10 +46,3 @@ def init_tracer_provider(url, service_name): # 设置全局Propagator b3_propagator = B3MultiFormat() set_global_textmap(b3_propagator) - - -def get_trace_id(context): - span_context = trace.get_current_span(context).get_span_context() - if span_context.is_valid: - return span_context.trace_id - return None diff --git a/main.py b/main.py index 595d166..fcffccd 100644 --- a/main.py +++ b/main.py @@ -203,7 +203,7 @@ def init_model(): if __name__ == "__main__": init_tracer_provider(url=OTEL_ENDPOINT, service_name=SERVICE_NAME) - # 这个FlaskInstrumentor用于监视http的 + # FlaskInstrumentor is to trace http # FlaskInstrumentor().instrument_app(app) # app.run(host=WEBSITE_HOST, port=WEBSITE_PORT, debug=True) # print("start server {}:{}".format(WEBSITE_HOST, WEBSITE_PORT)) diff --git a/vector_index.py b/vector_index.py index 9a42c0f..244a76b 100644 --- a/vector_index.py +++ b/vector_index.py @@ -17,6 +17,7 @@ """the script file is to handle vector index from kafka""" import threading +from opentelemetry import trace from app.search_vector.consts.consts import KAFKA_CONSUMER_VECTOR_INDEX_TOPIC from app.search_vector.config.config import DEFAULT_MILVUS_TABLE_NAME from app.search_vector.kafka_operate.consumer import store_data_from_kafka @@ -28,11 +29,13 @@ def consume_inverted_index(): """ topic = KAFKA_CONSUMER_VECTOR_INDEX_TOPIC table_name = DEFAULT_MILVUS_TABLE_NAME - thread = threading.Thread(target=store_data_from_kafka( - topic, table_name)) # 创建线程对象 - thread.start() # 启动线程 - print("start consume inverted index") - thread.join() # 等待线程结束 + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("consume_inverted_index"): + thread = threading.Thread(target=store_data_from_kafka( + topic, table_name)) # 创建线程对象 + thread.start() # 启动线程 + print("start consume inverted index") + thread.join() # 等待线程结束 if __name__ == "__main__": From 9c6bc611d536772c45f2415607c64d29dfd1afef Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Sat, 2 Mar 2024 22:53:59 +0800 Subject: [PATCH 6/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/tracing/tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/search_vector/tracing/tracing.py b/app/search_vector/tracing/tracing.py index 5f73a34..41a23d5 100644 --- a/app/search_vector/tracing/tracing.py +++ b/app/search_vector/tracing/tracing.py @@ -17,7 +17,7 @@ from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.resources import SERVICE_NAME, Resource, SERVICE_NAMESPACE +from opentelemetry.sdk.resources import SERVICE_NAME from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource From da557efee756550794274c6f827eb11a3f67666d Mon Sep 17 00:00:00 2001 From: lyt122 <2747177214@qq.com> Date: Sun, 3 Mar 2024 14:36:07 +0800 Subject: [PATCH 7/7] feature:add Jaeger to provide distributed tracing --- app/search_vector/kafka_operate/consumer.py | 3 ++- app/search_vector/kafka_operate/kafka_operate.py | 7 ++++--- app/search_vector/service/search_vector.py | 3 ++- vector_index.py | 3 ++- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/app/search_vector/kafka_operate/consumer.py b/app/search_vector/kafka_operate/consumer.py index 8b991c1..8ce3542 100644 --- a/app/search_vector/kafka_operate/consumer.py +++ b/app/search_vector/kafka_operate/consumer.py @@ -16,6 +16,7 @@ # under the License. """store vector index from kafka""" +import inspect from opentelemetry import trace from ..kafka_operate.kafka_operate import kafka_helper @@ -25,6 +26,6 @@ def store_data_from_kafka(kafka_topic, milvus_table_name): store data to mivlus from kakfa for building inverted index """ tracer = trace.get_tracer(__name__) - with tracer.start_as_current_span("store_data_from_kafka"): + with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function): kafka_helper.connect_consumer(kafka_topic) kafka_helper.consume_messages_store_milvus(milvus_table_name) diff --git a/app/search_vector/kafka_operate/kafka_operate.py b/app/search_vector/kafka_operate/kafka_operate.py index eb07c8c..ff20078 100644 --- a/app/search_vector/kafka_operate/kafka_operate.py +++ b/app/search_vector/kafka_operate/kafka_operate.py @@ -16,6 +16,7 @@ # under the License. """kafka operate""" +import inspect import json from kafka import KafkaProducer, KafkaConsumer from kafka.errors import KafkaError @@ -42,7 +43,7 @@ def connect_producer(self): """ connect kafka producer """ - with tracer.start_as_current_span("connect_producer"): + with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function): try: self.producer = KafkaProducer( bootstrap_servers=self.bootstrap_servers) @@ -54,7 +55,7 @@ def connect_consumer(self, topic): """ connect kafka consumer """ - with tracer.start_as_current_span("connect_consumer"): + with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function): try: self.consumer = KafkaConsumer( topic, bootstrap_servers=self.bootstrap_servers) @@ -93,7 +94,7 @@ def consume_messages_store_milvus(self, milvus_table): """ consume messages from kafka and store in milvus """ - with tracer.start_as_current_span("connect_consumer"): + with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function): if not self.consumer: print("No Kafka consumer connected.") return diff --git a/app/search_vector/service/search_vector.py b/app/search_vector/service/search_vector.py index 0c7f3ce..dd96ac9 100644 --- a/app/search_vector/service/search_vector.py +++ b/app/search_vector/service/search_vector.py @@ -16,6 +16,7 @@ # under the License. """search vector grpc service""" +import inspect import json import grpc import logging @@ -40,7 +41,7 @@ class SearchVectorService(search_vector_pb2_grpc.SearchVectorServiceServicer): def SearchVector(self, request, context) -> search_vector_pb2.SearchVectorResponse: - with tracer.start_as_current_span("SearchVector"): + with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function): try: queryies = request.query doc_ids = [] diff --git a/vector_index.py b/vector_index.py index 244a76b..20cd0d9 100644 --- a/vector_index.py +++ b/vector_index.py @@ -16,6 +16,7 @@ # under the License. """the script file is to handle vector index from kafka""" +import inspect import threading from opentelemetry import trace from app.search_vector.consts.consts import KAFKA_CONSUMER_VECTOR_INDEX_TOPIC @@ -30,7 +31,7 @@ def consume_inverted_index(): topic = KAFKA_CONSUMER_VECTOR_INDEX_TOPIC table_name = DEFAULT_MILVUS_TABLE_NAME tracer = trace.get_tracer(__name__) - with tracer.start_as_current_span("consume_inverted_index"): + with tracer.start_as_current_span(inspect.getframeinfo(inspect.currentframe()).function): thread = threading.Thread(target=store_data_from_kafka( topic, table_name)) # 创建线程对象 thread.start() # 启动线程