From ade2d927fab069b0f17e185f98a014d66fb8652d Mon Sep 17 00:00:00 2001 From: lkk Date: Tue, 10 Dec 2024 06:49:07 +0000 Subject: [PATCH] refine service code. --- comps/cores/mega/http_service.py | 43 ++++++++++++- comps/cores/mega/micro_service.py | 102 ++++++------------------------ 2 files changed, 59 insertions(+), 86 deletions(-) diff --git a/comps/cores/mega/http_service.py b/comps/cores/mega/http_service.py index 283540f49..6109dfdd9 100644 --- a/comps/cores/mega/http_service.py +++ b/comps/cores/mega/http_service.py @@ -8,6 +8,8 @@ from fastapi import FastAPI from prometheus_fastapi_instrumentator import Instrumentator from uvicorn import Config, Server +import asyncio +import multiprocessing from .base_service import BaseService from .base_statistics import collect_all_statistics @@ -83,6 +85,11 @@ async def _get_statistics(): return app + def add_startup_event(self, func): + @self.app.on_event("startup") + async def startup_event(): + asyncio.create_task(func) + async def initialize_server(self): """Initialize and return HTTP server.""" self.logger.info("Setting up HTTP server") @@ -110,11 +117,9 @@ async def start_server(self, **kwargs): """ await self.main_loop() - app = self.app - self.server = UviServer( config=Config( - app=app, + app=self.app, host=self.host_address, port=self.primary_port, log_level="info", @@ -137,6 +142,35 @@ async def terminate_server(self): await self.server.shutdown() self.logger.info("Server termination completed") + def _async_setup(self): + self.event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.event_loop) + self.event_loop.run_until_complete(self.initialize_server()) + + def run(self): + """Running method to block the main thread. + + This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy. + """ + self.event_loop.run_until_complete(self.execute_server()) + + def start(self, in_single_process=False): + print(in_single_process) + if in_single_process: + # Resolve HPU segmentation fault and potential tokenizer issues by limiting to same process + self.run() + else: + self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name) + self.process.start() + + def stop(self): + self.event_loop.run_until_complete(self.terminate_server()) + self.event_loop.stop() + self.event_loop.close() + self.server.logger.close() + if self.process.is_alive(): + self.process.terminate() + @staticmethod def check_server_readiness(ctrl_address: str, timeout: float = 1.0, logger=None, **kwargs) -> bool: """Check if server status is ready. @@ -170,3 +204,6 @@ async def async_check_server_readiness(ctrl_address: str, timeout: float = 1.0, :return: True if status is ready else False. """ return HTTPService.check_server_readiness(ctrl_address, timeout, logger=logger) + + def add_route(self, endpoint, handler, methods=["POST"]): + self.app.router.add_api_route(endpoint, handler, methods=methods) diff --git a/comps/cores/mega/micro_service.py b/comps/cores/mega/micro_service.py index 33d90aa64..d7448bd6f 100644 --- a/comps/cores/mega/micro_service.py +++ b/comps/cores/mega/micro_service.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio -import multiprocessing import os from collections import defaultdict, deque from enum import Enum @@ -12,6 +11,7 @@ from .constants import ServiceRoleType, ServiceType from .logger import CustomLogger from .utils import check_ports_availability +from .http_service import HTTPService opea_microservices = {} @@ -19,7 +19,7 @@ logflag = os.getenv("LOGFLAG", False) -class MicroService: +class MicroService(HTTPService): """MicroService class to create a microservice.""" def __init__( @@ -67,24 +67,32 @@ def __init__( self.uvicorn_kwargs["ssl_certfile"] = ssl_certfile if not use_remote_service: + + if self.protocol.lower() == "http": + if not (check_ports_availability(self.host, self.port)): + raise RuntimeError(f"port:{self.port}") + self.provider = provider self.provider_endpoint = provider_endpoint self.endpoints = [] - self.server = self._get_server() - self.app = self.server.app + runtime_args = { + "protocol": self.protocol, + "host": self.host, + "port": self.port, + "title": self.name, + "description": "OPEA Microservice Infrastructure", + } + + super().__init__(uvicorn_kwargs=self.uvicorn_kwargs, runtime_args=runtime_args) + # create a batch request processor loop if using dynamic batching if self.dynamic_batching: self.buffer_lock = asyncio.Lock() self.request_buffer = defaultdict(deque) + self.add_startup_event(self._dynamic_batch_processor()) - @self.app.on_event("startup") - async def startup_event(): - asyncio.create_task(self._dynamic_batch_processor()) - - self.event_loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.event_loop) - self.event_loop.run_until_complete(self._async_setup()) + self._async_setup() async def _dynamic_batch_processor(self): if logflag: @@ -125,82 +133,10 @@ def _validate_env(self): "set use_remote_service to False if you want to use a local micro service!" ) - def _get_server(self): - """Get the server instance based on the protocol. - - This method currently only supports HTTP services. It creates an instance of the HTTPService class with the - necessary arguments. - In the future, it will also support gRPC services. - """ - self._validate_env() - from .http_service import HTTPService - - runtime_args = { - "protocol": self.protocol, - "host": self.host, - "port": self.port, - "title": self.name, - "description": "OPEA Microservice Infrastructure", - } - - return HTTPService(uvicorn_kwargs=self.uvicorn_kwargs, runtime_args=runtime_args) - - async def _async_setup(self): - """The async method setup the runtime. - - This method is responsible for setting up the server. It first checks if the port is available, then it gets - the server instance and initializes it. - """ - self._validate_env() - if self.protocol.lower() == "http": - if not (check_ports_availability(self.host, self.port)): - raise RuntimeError(f"port:{self.port}") - - await self.server.initialize_server() - - async def _async_run_forever(self): - """Running method of the server.""" - self._validate_env() - await self.server.execute_server() - - def run(self): - """Running method to block the main thread. - - This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy. - """ - self._validate_env() - self.event_loop.run_until_complete(self._async_run_forever()) - - def start(self, in_single_process=False): - self._validate_env() - if in_single_process: - # Resolve HPU segmentation fault and potential tokenizer issues by limiting to same process - self.run() - else: - self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name) - self.process.start() - - async def _async_teardown(self): - """Shutdown the server.""" - self._validate_env() - await self.server.terminate_server() - - def stop(self): - self._validate_env() - self.event_loop.run_until_complete(self._async_teardown()) - self.event_loop.stop() - self.event_loop.close() - self.server.logger.close() - if self.process.is_alive(): - self.process.terminate() - @property def endpoint_path(self): return f"{self.protocol}://{self.host}:{self.port}{self.endpoint}" - def add_route(self, endpoint, handler, methods=["POST"]): - self.app.router.add_api_route(endpoint, handler, methods=methods) - def register_microservice( name: str,