Skip to content

Commit

Permalink
refine service code.
Browse files Browse the repository at this point in the history
  • Loading branch information
lkk12014402 committed Dec 10, 2024
1 parent 84ba13c commit ade2d92
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 86 deletions.
43 changes: 40 additions & 3 deletions comps/cores/mega/http_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from fastapi import FastAPI
from prometheus_fastapi_instrumentator import Instrumentator
from uvicorn import Config, Server
import asyncio
import multiprocessing

from .base_service import BaseService
from .base_statistics import collect_all_statistics
Expand Down Expand Up @@ -83,6 +85,11 @@ async def _get_statistics():

return app

def add_startup_event(self, func):
@self.app.on_event("startup")
async def startup_event():
asyncio.create_task(func)

async def initialize_server(self):
"""Initialize and return HTTP server."""
self.logger.info("Setting up HTTP server")
Expand Down Expand Up @@ -110,11 +117,9 @@ async def start_server(self, **kwargs):
"""
await self.main_loop()

app = self.app

self.server = UviServer(
config=Config(
app=app,
app=self.app,
host=self.host_address,
port=self.primary_port,
log_level="info",
Expand All @@ -137,6 +142,35 @@ async def terminate_server(self):
await self.server.shutdown()
self.logger.info("Server termination completed")

def _async_setup(self):
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.event_loop.run_until_complete(self.initialize_server())

def run(self):
"""Running method to block the main thread.
This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy.
"""
self.event_loop.run_until_complete(self.execute_server())

def start(self, in_single_process=False):
print(in_single_process)
if in_single_process:
# Resolve HPU segmentation fault and potential tokenizer issues by limiting to same process
self.run()
else:
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
self.process.start()

def stop(self):
self.event_loop.run_until_complete(self.terminate_server())
self.event_loop.stop()
self.event_loop.close()
self.server.logger.close()
if self.process.is_alive():
self.process.terminate()

@staticmethod
def check_server_readiness(ctrl_address: str, timeout: float = 1.0, logger=None, **kwargs) -> bool:
"""Check if server status is ready.
Expand Down Expand Up @@ -170,3 +204,6 @@ async def async_check_server_readiness(ctrl_address: str, timeout: float = 1.0,
:return: True if status is ready else False.
"""
return HTTPService.check_server_readiness(ctrl_address, timeout, logger=logger)

def add_route(self, endpoint, handler, methods=["POST"]):
self.app.router.add_api_route(endpoint, handler, methods=methods)
102 changes: 19 additions & 83 deletions comps/cores/mega/micro_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: Apache-2.0

import asyncio
import multiprocessing
import os
from collections import defaultdict, deque
from enum import Enum
Expand All @@ -12,14 +11,15 @@
from .constants import ServiceRoleType, ServiceType
from .logger import CustomLogger
from .utils import check_ports_availability
from .http_service import HTTPService

opea_microservices = {}

logger = CustomLogger("micro_service")
logflag = os.getenv("LOGFLAG", False)


class MicroService:
class MicroService(HTTPService):
"""MicroService class to create a microservice."""

def __init__(
Expand Down Expand Up @@ -67,24 +67,32 @@ def __init__(
self.uvicorn_kwargs["ssl_certfile"] = ssl_certfile

if not use_remote_service:

if self.protocol.lower() == "http":
if not (check_ports_availability(self.host, self.port)):
raise RuntimeError(f"port:{self.port}")

self.provider = provider
self.provider_endpoint = provider_endpoint
self.endpoints = []

self.server = self._get_server()
self.app = self.server.app
runtime_args = {
"protocol": self.protocol,
"host": self.host,
"port": self.port,
"title": self.name,
"description": "OPEA Microservice Infrastructure",
}

super().__init__(uvicorn_kwargs=self.uvicorn_kwargs, runtime_args=runtime_args)

# create a batch request processor loop if using dynamic batching
if self.dynamic_batching:
self.buffer_lock = asyncio.Lock()
self.request_buffer = defaultdict(deque)
self.add_startup_event(self._dynamic_batch_processor())

@self.app.on_event("startup")
async def startup_event():
asyncio.create_task(self._dynamic_batch_processor())

self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.event_loop.run_until_complete(self._async_setup())
self._async_setup()

async def _dynamic_batch_processor(self):
if logflag:
Expand Down Expand Up @@ -125,82 +133,10 @@ def _validate_env(self):
"set use_remote_service to False if you want to use a local micro service!"
)

def _get_server(self):
"""Get the server instance based on the protocol.
This method currently only supports HTTP services. It creates an instance of the HTTPService class with the
necessary arguments.
In the future, it will also support gRPC services.
"""
self._validate_env()
from .http_service import HTTPService

runtime_args = {
"protocol": self.protocol,
"host": self.host,
"port": self.port,
"title": self.name,
"description": "OPEA Microservice Infrastructure",
}

return HTTPService(uvicorn_kwargs=self.uvicorn_kwargs, runtime_args=runtime_args)

async def _async_setup(self):
"""The async method setup the runtime.
This method is responsible for setting up the server. It first checks if the port is available, then it gets
the server instance and initializes it.
"""
self._validate_env()
if self.protocol.lower() == "http":
if not (check_ports_availability(self.host, self.port)):
raise RuntimeError(f"port:{self.port}")

await self.server.initialize_server()

async def _async_run_forever(self):
"""Running method of the server."""
self._validate_env()
await self.server.execute_server()

def run(self):
"""Running method to block the main thread.
This method runs the event loop until a Future is done. It is designed to be called in the main thread to keep it busy.
"""
self._validate_env()
self.event_loop.run_until_complete(self._async_run_forever())

def start(self, in_single_process=False):
self._validate_env()
if in_single_process:
# Resolve HPU segmentation fault and potential tokenizer issues by limiting to same process
self.run()
else:
self.process = multiprocessing.Process(target=self.run, daemon=False, name=self.name)
self.process.start()

async def _async_teardown(self):
"""Shutdown the server."""
self._validate_env()
await self.server.terminate_server()

def stop(self):
self._validate_env()
self.event_loop.run_until_complete(self._async_teardown())
self.event_loop.stop()
self.event_loop.close()
self.server.logger.close()
if self.process.is_alive():
self.process.terminate()

@property
def endpoint_path(self):
return f"{self.protocol}://{self.host}:{self.port}{self.endpoint}"

def add_route(self, endpoint, handler, methods=["POST"]):
self.app.router.add_api_route(endpoint, handler, methods=methods)


def register_microservice(
name: str,
Expand Down

0 comments on commit ade2d92

Please sign in to comment.