Skip to content

Commit

Permalink
add downsampling for ops metrics, add databases metrics in meta stats…
Browse files Browse the repository at this point in the history
…, optimize service mount api, close connections on adapt_async query functions, fix must_create in db session.save()
  • Loading branch information
voidZXL committed Nov 14, 2024
1 parent 3c57ace commit f747565
Show file tree
Hide file tree
Showing 21 changed files with 503 additions and 156 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ __pycache__/
*.sqlite3
db
db_ops
blog_ops
db-journal
/tests/server/db
/tests/server/db_ops
Expand Down
34 changes: 34 additions & 0 deletions examples/mini_blog/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by UtilMeta 2.6.0 on 2024-11-13 05:54
# generator spec: openapi 3.1.0
# generator class: utilmeta.core.cli.specs.openapi.OpenAPIClientGenerator
from utilmeta.core import api, cli, response, request
import utype


class schemas:
class UserSchema(utype.Schema):
__options__ = utype.Options(addition=True)
username: str = utype.Field(required=False, max_length=20)
articles_num: int = utype.Field(ge=0, required=False)

class ArticleSchema(utype.Schema):
__options__ = utype.Options(addition=True)
id: int = utype.Field(required=False, ge=1, le=2147483647)
author: "schemas.UserSchema" = utype.Field(required=False)
content: str = utype.Field(required=False)


class responses:
class article_get_response(response.Response):
content_type = "application/json"
result: schemas.ArticleSchema


class APIClient(cli.Client):
@api.get("/article", tags=["article"], description="")
def article_get(
self, id: int = request.QueryParam(required=True)
) -> responses.article_get_response[200]: pass


client = APIClient(base_url="http://127.0.0.1:8002")
7 changes: 7 additions & 0 deletions examples/mini_blog/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from utilmeta import UtilMeta
from utilmeta.core.server.backends.django import DjangoSettings
from utilmeta.core.orm import DatabaseConnections, Database
from utilmeta.ops import Operations


def configure(service: UtilMeta):
service.use(Operations(
route='ops',
database=Database(
name='blog_ops'
),
))
service.use(DjangoSettings(
apps=['blog'],
secret_key='YOUR_SECRET_KEY'
Expand Down
2 changes: 1 addition & 1 deletion utilmeta/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
__website__ = 'https://utilmeta.com'
__homepage__ = 'https://utilmeta.com/py'
__author__ = 'Xulin Zhou (@voidZXL)'
__version__ = '2.6.0'
__version__ = '2.6.1'


def version_info() -> str:
Expand Down
4 changes: 4 additions & 0 deletions utilmeta/core/auth/session/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,17 @@ def db_save(self, must_create=False, force: bool = True):
if self.session_key is None:
return self.create()
obj = self.load_object(must_create)
if not obj.pk:
must_create = True
obj.save(force_insert=must_create, force_update=not must_create and force)

# @awaitable(db_save)
async def adb_save(self, must_create=False, force: bool = True):
if self.session_key is None:
return await self.acreate()
obj = await self.aload_object(must_create)
if not obj.pk:
must_create = True
await obj.asave(force_insert=must_create, force_update=not must_create and force)

def save(self, must_create: bool = False):
Expand Down
6 changes: 3 additions & 3 deletions utilmeta/core/orm/backends/django/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_values(self):
self.clear_pks()
return self.values

@awaitable(get_values, bind_service=True)
@awaitable(get_values, bind_service=True, close_conn=True)
async def get_values(self):
if self.queryset.query.is_empty():
return []
Expand Down Expand Up @@ -659,7 +659,7 @@ def commit_data(self, data: dict):
raise self.get_integrity_error(e) from e
return self.queryset

@awaitable(commit_data, bind_service=True)
@awaitable(commit_data, bind_service=True, close_conn=True)
async def commit_data(self, data: dict):
data, _, _ = self.process_data(data, with_relations=False)
for p in {PK, ID, *self.parser.pk_names}:
Expand Down Expand Up @@ -759,7 +759,7 @@ def save_data(self,
)
return pk

@awaitable(save_data, bind_service=True)
@awaitable(save_data, bind_service=True, close_conn=True)
async def save_data(self,
data,
must_create: bool = False,
Expand Down
6 changes: 6 additions & 0 deletions utilmeta/core/orm/databases/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ def local(self):
return True
return localhost(self.host)

@property
def location(self):
if self.is_sqlite:
return self.name
return f'{self.host}:{self.port}'

@property
def is_sqlite(self):
return 'sqlite' in self.engine
Expand Down
69 changes: 57 additions & 12 deletions utilmeta/core/server/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def __init__(
self.meta_path = None
self.project_dir = Path(os.getcwd())
self.meta_config = {}
self.root_api = api
self.root_url = str(route or '').strip('/')

if self.root_url:
Expand Down Expand Up @@ -95,6 +94,10 @@ def __init__(
self._application = None
self._auto_created = False
self._ready = False
self._unmounted_apis = {}
self._root_api = None
self._root_api_ref = None
self.root_api = api

import utilmeta
try:
Expand All @@ -117,6 +120,29 @@ def __init__(
self.load_meta()
self._pool = None

@property
def root_api(self):
try:
return self.resolve()
except ValueError:
return None

@root_api.setter
def root_api(self, api):
if inspect.isclass(api) and issubclass(api, API):
for route, sub_api in self._unmounted_apis.items():
try:
api.__mount__(sub_api, route=route)
except ValueError as e:
warnings.warn(f'utilmeta.service: mount {sub_api} to service failed with error: {e}')
self._unmounted_apis = {}
self._root_api = api
elif isinstance(api, str):
self._root_api_ref = api
elif api:
raise TypeError(f'Invalid root API for UtilMeta service: {api}, should be a API class'
f' inheriting utilmeta.core.api.API or a string reference to that class')

def load_meta(self):
self.meta_path = search_file('utilmeta.ini') or search_file('meta.ini')

Expand Down Expand Up @@ -359,30 +385,49 @@ def deco(_api):
self.adaptor.mount(api, route=route)
return

if self.root_api:
if self._root_api:
if getattr(self.root_api, '__ref__', str(self.root_api)) != getattr(api, '__ref__', str(api)):
raise ValueError(f'UtilMeta: root api conflicted: {api}, {self.root_api}, '
f'you can only mount a service once')
return
self.root_api = api
self.root_url = str(route).strip('/')

def mount_to_api(self, api, route: str):
if not inspect.isclass(api) and issubclass(api, API):
raise TypeError(f'Invalid API: {api}')
route = str(route).strip('/')
try:
root_api = self.resolve()
except ValueError:
# if API is not loaded, we lazy-mount
pass
else:
try:
root_api.__mount__(api, route=route)
except ValueError:
# router already exists
pass
return
self._unmounted_apis[route] = api

# def mount_ws(self, ws: Union[str, Callable], route: str = ''):
# pass

def resolve(self):
if callable(self.root_api):
return self.root_api
if isinstance(self.root_api, str):
if '.' not in self.root_api:
def resolve(self) -> Type[API]:
if self._root_api:
return self._root_api
if self._root_api_ref:
ref = self._root_api_ref
if '.' not in ref:
# in current module
root_api = getattr(self.module, self.root_api)
root_api = getattr(self.module, ref)
else:
root_api = import_obj(self.root_api)
root_api = import_obj(ref)
self.root_api = root_api
if not callable(self.root_api):
raise ValueError(f'utilMeta: api not mount')
return self.root_api
if not self._root_api:
raise ValueError('utilmeta.service: RootAPI not mounted')
return self._root_api

def print_info(self):
from utilmeta import __version__
Expand Down
6 changes: 3 additions & 3 deletions utilmeta/ops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, *args, **kwargs):
logger.make_events_only(True)

# @orm.Atomic(config.db_alias)
@adapt_async
@adapt_async(close_conn=config.db_alias)
def post(self, data: SupervisorData = request.Body):
save_supervisor(data)
return dict(
Expand All @@ -79,7 +79,7 @@ def get(self):
timestamp=int(self.request.time.timestamp() * 1000),
)

@adapt_async
@adapt_async(close_conn=config.db_alias)
@opsRequire('service.config')
def patch(self, data: SupervisorPatch = request.Body):
supervisor: SupervisorObject = supervisor_var.getter(self.request)
Expand All @@ -106,7 +106,7 @@ def patch(self, data: SupervisorPatch = request.Body):
**self.get()
)

@adapt_async
@adapt_async(close_conn=config.db_alias)
@opsRequire('service.delete')
def delete(self):
supervisor: SupervisorObject = supervisor_var.getter(self.request)
Expand Down
9 changes: 6 additions & 3 deletions utilmeta/ops/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ def get_tables(self) -> List[TableSchema]:
# scope: data.view:[TABLE_IDENT]
@api.post('query')
@opsRequire('data.query')
@adapt_async
@adapt_async(close_conn=True)
# close all connections
def query_data(self, query: QuerySchema = request.Body):
try:
unsliced_qs = self.adaptor.get_queryset(**query.query)
Expand All @@ -123,7 +124,8 @@ def query_data(self, query: QuerySchema = request.Body):

@api.post('create')
@opsRequire('data.create')
@adapt_async
@adapt_async(close_conn=True)
# close all connections
def create_data(self, data: CreateDataSchema = request.Body):
objs = []
for val in data.data:
Expand All @@ -134,7 +136,8 @@ def create_data(self, data: CreateDataSchema = request.Body):

@api.post('update')
@opsRequire('data.update')
@adapt_async
@adapt_async(close_conn=True)
# close all connections
def update_data(self, data: UpdateDataSchema = request.Body):
objs = []
fields = set()
Expand Down
10 changes: 5 additions & 5 deletions utilmeta/ops/api/log.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .utils import SupervisorObject, supervisor_var, WrappedResponse, opsRequire
from .utils import SupervisorObject, supervisor_var, WrappedResponse, opsRequire, config
import utype

from utilmeta.core import api, orm
Expand Down Expand Up @@ -65,7 +65,7 @@ class LogQuery(orm.Query[ServiceLog]):

@opsRequire('log.view')
@api.get
@adapt_async
@adapt_async(close_conn=config.db_alias)
def service(self, query: LogQuery):
base_qs = ServiceLog.objects.filter(
service=self.supervisor.service,
Expand All @@ -81,7 +81,7 @@ def service(self, query: LogQuery):

@opsRequire('log.view')
@api.get('service/values')
@adapt_async
@adapt_async(close_conn=config.db_alias)
def service_log_values(self, query: LogQuery):
base_qs = ServiceLog.objects.filter(
service=self.supervisor.service,
Expand All @@ -101,7 +101,7 @@ def service_log_values(self, query: LogQuery):
return result

@opsRequire('log.delete')
@adapt_async
@adapt_async(close_conn=config.db_alias)
def delete(self, query: LogQuery):
qs = query.get_queryset(
ServiceLog.objects.filter(
Expand All @@ -113,7 +113,7 @@ def delete(self, query: LogQuery):

@opsRequire('log.view')
@api.get
@adapt_async
@adapt_async(close_conn=config.db_alias)
def realtime(
self,
within: int = utype.Param(3600, ge=1, le=7200),
Expand Down
Loading

0 comments on commit f747565

Please sign in to comment.