Skip to content

Commit

Permalink
Refs #253, enable 2FA for users in sites and related user groups.
Browse files Browse the repository at this point in the history
  • Loading branch information
doumdi committed Oct 1, 2024
1 parent 863b861 commit 4a6bbc6
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 27 deletions.
64 changes: 59 additions & 5 deletions teraserver/python/modules/DatabaseModule/DBManager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import event, inspect
from sqlalchemy.orm import joinedload
from sqlalchemy import event, inspect, update
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlite3 import Connection as SQLite3Connection
Expand Down Expand Up @@ -92,6 +93,58 @@ def start_cleanup_task(self) -> task:
return task.deferLater(reactor, seconds_to_midnight, self.cleanup_database)
# return task.deferLater(reactor, 5, self.cleanup_database)

def setup_events_for_2fa_sites(self):
"""
We need to validate that 2FA is enabled for all users in the site when the flag is set.
This can occur on multiple occasion, when the site is created, updated and also when user
groups are modified.
"""
@event.listens_for(TeraSite, 'after_update')
@event.listens_for(TeraSite, 'after_insert')
def site_updated_or_inserted(mapper, connection, target: TeraSite):
# Check if 2FA is enabled for this site
if target.site_2fa_required:
# Efficiently load all related users with joinedload
service_roles = TeraServiceRole.query.options(
joinedload(TeraServiceRole.service_role_user_groups).joinedload(
TeraUserGroup.user_group_users
)
).filter(TeraServiceRole.id_site == target.id_site).all()

# Get all users
user_ids = set()
for role in service_roles:
for group in role.service_role_user_groups:
for user in group.user_group_users:
user_ids.add(user.id_user)

# Perform a bulk update for all users at once
if user_ids:
connection.execute(
update(TeraUser)
.where(TeraUser.id_user.in_(user_ids))
.values(user_2fa_enabled=True)
)
@event.listens_for(TeraUserGroup, 'after_update')
@event.listens_for(TeraUserGroup, 'after_insert')
def user_group_updated_or_inserted(mapper, connection, target: TeraUserGroup):
# Check if 2FA is enabled for a related site
for role in target.user_group_services_roles:
if role.id_site and role.service_role_site.site_2fa_required:
# Efficiently load all related users with joinedload
user_ids = set()
for group in target.user_group_users:
user_ids.add(group.id_user)

# Perform a bulk update for all users at once
if user_ids:
connection.execute(
update(TeraUser)
.where(TeraUser.id_user.in_(user_ids))
.values(user_2fa_enabled=True)
)


def setup_events_for_class(self, cls, event_name):
import json

Expand Down Expand Up @@ -282,6 +335,9 @@ def setup_events(self):
for name in EventNameClassMap:
self.setup_events_for_class(EventNameClassMap[name], name)

# Setup events for 2FA sites
self.setup_events_for_2fa_sites()

def open(self, echo=False):

self.db_uri = 'postgresql://%(username)s:%(password)s@%(url)s:%(port)s/%(name)s' % self.config.db_config
Expand Down Expand Up @@ -522,9 +578,7 @@ def _set_sqlite_pragma(dbapi_connection, connection_record):
print(manager)
manager.open_local(dict(), echo=True, ram=True)
manager.create_defaults(config, test=True)
user = TeraUser()
user.query.all()
user_instance = TeraUser()
user_instance.query.all()
test = TeraUser.query.all()
print(test)


22 changes: 0 additions & 22 deletions teraserver/python/modules/FlaskModule/API/user/UserQuerySites.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,28 +161,6 @@ def post(self):
# TODO: Publish update to everyone who is subscribed to sites update...
update_site = TeraSite.get_site_by_id(json_site['id_site'])

try:
# Manage 2FA for site
if update_site.site_2fa_required:
# Enable 2FA for all users in this site
# Query service roles for this site
service_roles = TeraServiceRole.query.filter(TeraServiceRole.id_site == update_site.id_site).all()
for role in service_roles:
# Get the user group for this role
for group in role.service_role_user_groups:
# Get all users in this group
for user in group.user_group_users:
# Enable 2FA for this user
TeraUser.update(user.id_user, {'user_2fa_enabled': True})

except exc.SQLAlchemyError as e:
import sys
print(sys.exc_info())
self.module.logger.log_error(self.module.module_name,
UserQuerySites.__name__,
'post', 500, 'Database error', str(e))
return gettext('Database error'), 500

return jsonify([update_site.to_json()])

@api.doc(description='Delete a specific site',
Expand Down

0 comments on commit 4a6bbc6

Please sign in to comment.