diff --git a/.coveragerc b/.coveragerc index 3e44610..069dbc7 100644 --- a/.coveragerc +++ b/.coveragerc @@ -2,3 +2,6 @@ relative_files = True source = fed_mng/ branch = True + +[report] +exclude_lines = if __name__ == .__main__.: diff --git a/fed_mng/auth.py b/fed_mng/auth.py new file mode 100644 index 0000000..993182a --- /dev/null +++ b/fed_mng/auth.py @@ -0,0 +1,97 @@ +"""Authentication and authorization rules.""" +from fastapi.security import HTTPBearer +from flaat.config import AccessLevel +from flaat.fastapi import Flaat +from flaat.requirements import AllOf, HasSubIss, IsTrue +from flaat.user_infos import UserInfos +from sqlmodel import Session, select + +from fed_mng.config import get_settings +from fed_mng.main import engine +from fed_mng.models import ( + Admin, + SiteAdmin, + SiteTester, + SLAModerator, + User, + UserGroupManager, +) + +security = HTTPBearer() +lazy_security = HTTPBearer(auto_error=False) + + +def is_user(user_infos: UserInfos) -> bool: + """Target user has write access on Federation-Registry.""" + with Session(engine) as session: + email = user_infos.user_info.get("email") + user = session.exec(select(User).filter(User.email == email)).first() + return user is not None + + +def is_admin(user_infos: UserInfos) -> bool: + """Target user has write access on Federation-Registry.""" + with Session(engine) as session: + email = user_infos.user_info.get("email") + user = session.exec( + select(Admin).join(User).filter(User.email == email) + ).first() + return user is not None + + +def is_site_admin(user_infos: UserInfos) -> bool: + """Target user has write access on Federation-Registry.""" + with Session(engine) as session: + email = user_infos.user_info.get("email") + user = session.exec( + select(SiteAdmin).join(User).filter(User.email == email) + ).first() + return user is not None + + +def is_user_group_manager(user_infos: UserInfos) -> bool: + """Target user has write access on Federation-Registry.""" + with Session(engine) as session: + email = user_infos.user_info.get("email") + user = session.exec( + select(UserGroupManager).join(User).filter(User.email == email) + ).first() + return user is not None + + +def is_site_tester(user_infos: UserInfos) -> bool: + """Target user has write access on Federation-Registry.""" + with Session(engine) as session: + email = user_infos.user_info.get("email") + user = session.exec( + select(SiteTester).join(User).filter(User.email == email) + ).first() + return user is not None + + +def is_sla_moderator(user_infos: UserInfos) -> bool: + """Target user has write access on Federation-Registry.""" + with Session(engine) as session: + email = user_infos.user_info.get("email") + user = session.exec( + select(SLAModerator).join(User).filter(User.email == email) + ).first() + return user is not None + + +flaat = Flaat() +user_requirements = [HasSubIss(), IsTrue(is_user)] +flaat.set_access_levels( + [ + AccessLevel("user", AllOf(*user_requirements)), + AccessLevel("admin", AllOf(*user_requirements, IsTrue(is_admin))), + AccessLevel("site_admin", AllOf(*user_requirements, IsTrue(is_site_admin))), + AccessLevel("site_tester", AllOf(*user_requirements, IsTrue(is_site_tester))), + AccessLevel("sla_mod", AllOf(*user_requirements, IsTrue(is_sla_moderator))), + AccessLevel( + "group_mgr", AllOf(*user_requirements, IsTrue(is_user_group_manager)) + ), + ], +) +flaat.set_trusted_OP_list(get_settings().TRUSTED_IDP_LIST) +flaat.set_request_timeout(30) diff --git a/fed_mng/config.py b/fed_mng/config.py index dd14ef8..1d5258c 100644 --- a/fed_mng/config.py +++ b/fed_mng/config.py @@ -27,6 +27,8 @@ def start_with_single_slash(cls, v: str) -> str: MAINTAINER_URL: AnyHttpUrl | None = None MAINTAINER_EMAIL: EmailStr | None = None + TRUSTED_IDP_LIST: list[AnyHttpUrl] = [] + DOC_V1_URL: AnyHttpUrl | None = None @validator("DOC_V1_URL", pre=True) diff --git a/fed_mng/main.py b/fed_mng/main.py index 11348ba..5080570 100644 --- a/fed_mng/main.py +++ b/fed_mng/main.py @@ -1,6 +1,10 @@ """Entry point for the Federation-Manager web app.""" +from contextlib import asynccontextmanager +from typing import Any, Generator + import uvicorn from fastapi import FastAPI +from sqlmodel import Session, SQLModel, create_engine from fed_mng.config import get_settings from fed_mng.router import router_v1 @@ -51,5 +55,19 @@ app.mount(settings.API_V1_STR, sub_app_v1) +connect_args = {"check_same_thread": False} +engine = create_engine("sqlite://", echo=True, connect_args=connect_args) + + +@asynccontextmanager +async def lifespan() -> None: + SQLModel.metadata.create_all(engine) + + +def get_session() -> Generator[Session, Any, None]: + with Session(engine) as session: + yield session + + if __name__ == "__main__": - uvicorn.run(app, host="0.0.0.0") + uvicorn.run(app, host="0.0.0.0", lifespan=lifespan) diff --git a/tests/conftest.py b/tests/conftest.py index a4b5de7..5377ff0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,15 +2,18 @@ import pytest from sqlalchemy import Engine, event -from sqlmodel import Session, SQLModel, create_engine +from sqlmodel import Session, SQLModel +from fed_mng.main import engine from fed_mng.models import ( SLA, + Admin, IdentityProvider, Provider, Region, ResourceUsage, SiteAdmin, + SiteTester, SLAModerator, SLANegotiation, User, @@ -36,7 +39,8 @@ def set_sqlite_pragma(dbapi_connection, connection_record) -> None: @pytest.fixture(scope="session") def db_engine() -> Generator[Engine, Any, None]: """Define the database engine and create all tables.""" - engine = create_engine("sqlite:///:memory:") + engine.url = "sqlite://" + engine.echo = False SQLModel.metadata.create_all(engine) yield engine SQLModel.metadata.drop_all(engine) @@ -62,6 +66,15 @@ def db_user(db_session: Session) -> User: return user +@pytest.fixture(scope="function") +def db_admin(db_session: Session, db_user: User) -> Admin: + admin = Admin(id=db_user.id) + db_session.add(admin) + db_session.commit() + db_session.refresh(admin) + return admin + + @pytest.fixture(scope="function") def db_site_admin(db_session: Session, db_user: User) -> SiteAdmin: site_admin = SiteAdmin(id=db_user.id) @@ -71,6 +84,15 @@ def db_site_admin(db_session: Session, db_user: User) -> SiteAdmin: return site_admin +@pytest.fixture(scope="function") +def db_site_tester(db_session: Session, db_user: User) -> SiteTester: + site_tester = SiteTester(id=db_user.id) + db_session.add(site_tester) + db_session.commit() + db_session.refresh(site_tester) + return site_tester + + @pytest.fixture(scope="function") def db_sla_moderator(db_session: Session, db_user: User) -> SLAModerator: sla_moderator = SLAModerator(id=db_user.id) diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..5bac1f8 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,124 @@ +from flaat import UserInfos +from sqlmodel import Session, select + +from fed_mng.auth import ( + is_admin, + is_site_admin, + is_site_tester, + is_sla_moderator, + is_user_group_manager, +) +from fed_mng.models import ( + Admin, + SiteAdmin, + SiteTester, + SLAModerator, + User, + UserGroupManager, +) +from tests.utils import random_email + + +def test_is_admin(db_session: Session, db_admin: Admin) -> None: + user: User = db_session.exec(select(User).filter(User.id == db_admin.id)).first() + user_info = UserInfos( + access_token_info=None, + user_info={"email": user.email}, + introspection_info=None, + ) + assert is_admin(user_info) + + +def test_is_not_admin() -> None: + user_info = UserInfos( + access_token_info=None, + user_info={"email": random_email()}, + introspection_info=None, + ) + assert not is_admin(user_info) + + +def test_is_site_admin(db_session: Session, db_site_admin: SiteAdmin) -> None: + user: User = db_session.exec( + select(User).filter(User.id == db_site_admin.id) + ).first() + user_info = UserInfos( + access_token_info=None, + user_info={"email": user.email}, + introspection_info=None, + ) + assert is_site_admin(user_info) + + +def test_is_not_site_admin() -> None: + user_info = UserInfos( + access_token_info=None, + user_info={"email": random_email()}, + introspection_info=None, + ) + assert not is_site_admin(user_info) + + +def test_is_site_tester(db_session: Session, db_site_tester: SiteTester) -> None: + user: User = db_session.exec( + select(User).filter(User.id == db_site_tester.id) + ).first() + user_info = UserInfos( + access_token_info=None, + user_info={"email": user.email}, + introspection_info=None, + ) + assert is_site_tester(user_info) + + +def test_is_not_site_tester() -> None: + user_info = UserInfos( + access_token_info=None, + user_info={"email": random_email()}, + introspection_info=None, + ) + assert not is_site_tester(user_info) + + +def test_is_sla_moderator(db_session: Session, db_sla_moderator: SLAModerator) -> None: + user: User = db_session.exec( + select(User).filter(User.id == db_sla_moderator.id) + ).first() + user_info = UserInfos( + access_token_info=None, + user_info={"email": user.email}, + introspection_info=None, + ) + assert is_sla_moderator(user_info) + + +def test_is_not_sla_moderator() -> None: + user_info = UserInfos( + access_token_info=None, + user_info={"email": random_email()}, + introspection_info=None, + ) + assert not is_sla_moderator(user_info) + + +def test_is_user_group_manager( + db_session: Session, db_user_group_manager: UserGroupManager +) -> None: + user: User = db_session.exec( + select(User).filter(User.id == db_user_group_manager.id) + ).first() + user_info = UserInfos( + access_token_info=None, + user_info={"email": user.email}, + introspection_info=None, + ) + assert is_user_group_manager(user_info) + + +def test_is_not_user_group_manager() -> None: + user_info = UserInfos( + access_token_info=None, + user_info={"email": random_email()}, + introspection_info=None, + ) + assert not is_user_group_manager(user_info)