Skip to content

Commit

Permalink
Merge pull request #4 from infn-datacloud/3-use-socketio
Browse files Browse the repository at this point in the history
3 use socketio
  • Loading branch information
giosava94 authored Jun 20, 2024
2 parents 5b75e62 + 619af80 commit a4359c7
Show file tree
Hide file tree
Showing 18 changed files with 667 additions and 163 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,17 @@ Once OPA is up and running we can interrogate its endpoints to evaluate the inpu
```bash
curl localhost:8181/v1/data/fedmgr/user_roles -d @v1-data-input.json -H 'Content-Type: application/json'
```

### Testing SocketIO with python client

In the `examples` folder we provide a python script with a Socket.IO client. It tries to connect to the `/site_admin` namespace and emit a message.

Since endpoints requires a valid token, this example expects the access token as an input to the `--token` (or `-t`) argument. Alternatively you can set the `$SOCKETIO_CLIENT_TOKEN` env var.

```bash
python examples/client.py -t <TOKEN>
```

> For the example client to work you need a running instance of the application.
You can copy and edit that file to make more complexes examples or tests.
44 changes: 44 additions & 0 deletions examples/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
import os

import socketio
from socketio import ClientNamespace

parser = argparse.ArgumentParser(description="Pass token.")
parser.add_argument(
"--token",
"-t",
type=str,
default=os.environ.get("SOCKETIO_CLIENT_TOKEN", None),
help="Access token to use. If not set use the value in the SOCKETIO_CLIENT_TOKEN",
)

args = parser.parse_args()


class SiteAdminNameSpace(ClientNamespace):
def on_connect(self):
print(f"Connection to namespace {self.namespace} established")
self.emit("list_provider_federation_requests", {"username": ""})

def on_connect_error(self, data):
print(f"Failed to connect to namespace {self.namespace}", data)

def on_list_provider_federation_requests(self, data):
print(f"Message received with {data}")
self.disconnect()

def on_disconnect(self):
print(f"Disconnected from namespace {self.namespace}")


sio = socketio.Client()
sio.register_namespace(SiteAdminNameSpace("/site_admin"))


sio.connect(
"http://localhost:8000",
transports=["websocket", "polling"],
auth={"token": args.token},
)
sio.wait()
120 changes: 25 additions & 95 deletions fed_mng/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,101 +3,12 @@

import requests
from fastapi import status
from fastapi.security import HTTPBearer
from flaat.config import AccessLevel
from flaat.fastapi import Flaat
from flaat.requirements import AllOf, HasSubIss, IsTrue
from flaat.user_infos import UserInfos
from requests.exceptions import ConnectionError, Timeout
from sqlmodel import Session, select

from fed_mng.config import get_settings
from fed_mng.db import engine
from fed_mng.models import (
Admin,
SiteAdmin,
SiteTester,
SLAModerator,
User,
UserGroupManager,
)

security = HTTPBearer()
lazy_security = HTTPBearer(auto_error=False)


def is_user(user_infos: UserInfos) -> bool:
"""Target user has write access on Federation-Registry."""
with Session(engine) as session:
email = user_infos.user_info.get("email")
user = session.exec(select(User).filter(User.email == email)).first()
return user is not None


def is_admin(user_infos: UserInfos) -> bool:
"""Target user has write access on Federation-Registry."""
with Session(engine) as session:
email = user_infos.user_info.get("email")
user = session.exec(
select(Admin).join(User).filter(User.email == email)
).first()
return user is not None


def is_site_admin(user_infos: UserInfos) -> bool:
"""Target user has write access on Federation-Registry."""
with Session(engine) as session:
email = user_infos.user_info.get("email")
user = session.exec(
select(SiteAdmin).join(User).filter(User.email == email)
).first()
return user is not None


def is_user_group_manager(user_infos: UserInfos) -> bool:
"""Target user has write access on Federation-Registry."""
with Session(engine) as session:
email = user_infos.user_info.get("email")
user = session.exec(
select(UserGroupManager).join(User).filter(User.email == email)
).first()
return user is not None


def is_site_tester(user_infos: UserInfos) -> bool:
"""Target user has write access on Federation-Registry."""
with Session(engine) as session:
email = user_infos.user_info.get("email")
user = session.exec(
select(SiteTester).join(User).filter(User.email == email)
).first()
return user is not None


def is_sla_moderator(user_infos: UserInfos) -> bool:
"""Target user has write access on Federation-Registry."""
with Session(engine) as session:
email = user_infos.user_info.get("email")
user = session.exec(
select(SLAModerator).join(User).filter(User.email == email)
).first()
return user is not None


flaat = Flaat()
user_requirements = [HasSubIss(), IsTrue(is_user)]
flaat.set_access_levels(
[
AccessLevel("user", AllOf(*user_requirements)),
AccessLevel("admin", AllOf(*user_requirements, IsTrue(is_admin))),
AccessLevel("site_admin", AllOf(*user_requirements, IsTrue(is_site_admin))),
AccessLevel("site_tester", AllOf(*user_requirements, IsTrue(is_site_tester))),
AccessLevel("sla_mod", AllOf(*user_requirements, IsTrue(is_sla_moderator))),
AccessLevel(
"group_mgr", AllOf(*user_requirements, IsTrue(is_user_group_manager))
),
],
)
flaat.set_trusted_OP_list(get_settings().TRUSTED_IDP_LIST)
flaat.set_request_timeout(30)

Expand All @@ -123,10 +34,29 @@ def get_user_roles(token: str) -> list[str]:
if resp.status_code == status.HTTP_200_OK:
return resp.json().get("result", [])
elif resp.status_code == status.HTTP_400_BAD_REQUEST:
print("Bad request sent to OPA server.")
raise ConnectionRefusedError(
"Authentication failed: Bad request sent to OPA server."
)
elif resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR:
print("OPA server internal error.")
return []
except (Timeout, ConnectionError):
print("OPA server is not reachable.")
return []
raise ConnectionRefusedError(
"Authentication failed: OPA server internal error."
)
else:
raise ConnectionRefusedError(
f"Authentication failed: OPA unexpected response code \
'{resp.status_code}'."
)
except (Timeout, ConnectionError) as e:
raise ConnectionRefusedError(
"Authentication failed: OPA server is not reachable."
) from e


def has_role(token: str, role: str) -> bool:
"""Validate received token and verify needed rights.
Contact OPA to verify if the target user has the requested role.
"""
flaat.get_user_infos_from_access_token(token)
user_roles = get_user_roles(token)
return user_roles.get(f"is_{role}", False)
74 changes: 51 additions & 23 deletions fed_mng/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
"""Entry point for the Federation-Manager web app."""

import uvicorn
from fastapi import FastAPI

# from fastapi.middleware.cors import CORSMiddleware
from fed_mng.config import get_settings
from fed_mng.db import lifespan
from fed_mng.router import router_v1

# from fed_mng.db import lifespan
from fed_mng.socketio.admin import AdminNamespace
from fed_mng.socketio.site_admin import SiteAdminNamespace
from fed_mng.socketio.site_tester import SiteTesterNamespace
from fed_mng.socketio.sla_mod import SLAModeratorNamespace
from fed_mng.socketio.socket_manager import SocketManager
from fed_mng.socketio.user_group_mgr import UserGroupManagerNamespace

# from fed_mng.router import router_v1


summary = "Federation-Manager of the DataCloud project"
description = """
Expand All @@ -21,36 +32,53 @@
"url": settings.MAINTAINER_URL,
"email": settings.MAINTAINER_EMAIL,
}
tags_metadata = [
{
"name": settings.API_V1_STR,
"description": "API version 1, see link on the right",
"externalDocs": {
"description": "API version 1 documentation",
"url": f"{settings.DOC_V1_URL}",
},
},
]
# tags_metadata = [
# {
# "name": settings.API_V1_STR,
# "description": "API version 1, see link on the right",
# "externalDocs": {
# "description": "API version 1 documentation",
# "url": f"{settings.DOC_V1_URL}",
# },
# },
# ]

app = FastAPI(
contact=contact,
description=description,
openapi_tags=tags_metadata,
# openapi_tags=tags_metadata,
summary=summary,
title=settings.PROJECT_NAME,
version=version,
lifespan=lifespan,
# lifespan=lifespan,
)

sub_app_v1 = FastAPI(
contact=contact,
description=description,
summary=summary,
title=settings.PROJECT_NAME,
version=version,
)
sub_app_v1.include_router(router_v1)
app.mount(settings.API_V1_STR, sub_app_v1)
# sub_app_v1 = FastAPI(
# contact=contact,
# description=description,
# summary=summary,
# title=settings.PROJECT_NAME,
# version=version,
# )
# sub_app_v1.include_router(router_v1)
# app.mount(settings.API_V1_STR, sub_app_v1)

# Adding the CORS middleware will overwrite SocketManager's CORS settings
# Make sure to add the CORS middleware before SocketManager
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
sio = SocketManager(app=app)

sio.register_namespace(SiteAdminNamespace("/site_admin"))
sio.register_namespace(SiteTesterNamespace("/site_tester"))
sio.register_namespace(UserGroupManagerNamespace("/user_group_mgr"))
sio.register_namespace(SLAModeratorNamespace("/sla_mod"))
sio.register_namespace(AdminNamespace("/admin"))


if __name__ == "__main__":
Expand Down
Empty file added fed_mng/socketio/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions fed_mng/socketio/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Any, Literal

from socketio import AsyncNamespace

from fed_mng.socketio.utils import validate_auth_on_connect


class AdminNamespace(AsyncNamespace):
async def on_connect(
self, sid: str, environ: dict[str, Any], auth: dict[Literal["token"], str]
):
"""When connecting evaluate user authentication."""
print(f"Connecting to namespace: {self.namespace}")
print(f"SID: {sid}")
print(f"Environment variables: {environ}")
print(f"Auth data: {auth}")
validate_auth_on_connect(auth=auth, target_role=self.namespace[1:])
print(f"Connected to namespace '{self.namespace}' with sid '{sid}'")

async def on_disconnect(self, sid):
"""Close connection
Args:
sid (_type_): _description_
"""
print("disconnect from namespace:", self.namespace, sid)
Loading

0 comments on commit a4359c7

Please sign in to comment.