diff --git a/dashboard/src2/components/settings/DeveloperSettings.vue b/dashboard/src2/components/settings/DeveloperSettings.vue
index e514fc9d6e..1eec5b06ff 100644
--- a/dashboard/src2/components/settings/DeveloperSettings.vue
+++ b/dashboard/src2/components/settings/DeveloperSettings.vue
@@ -78,6 +78,7 @@
@@ -114,7 +115,7 @@
diff --git a/dashboard/src2/components/settings/RoleConfigureDialog.vue b/dashboard/src2/components/settings/RoleConfigureDialog.vue
index 1dbf16ce4a..62cb58da7e 100644
--- a/dashboard/src2/components/settings/RoleConfigureDialog.vue
+++ b/dashboard/src2/components/settings/RoleConfigureDialog.vue
@@ -105,6 +105,11 @@
label="Allow Server Creation"
:disabled="adminAccess"
/>
+
@@ -252,6 +257,19 @@ export default {
{ onSuccess: this.$session.roles.reload }
);
}
+ },
+ allowWebhookConfiguration: {
+ get() {
+ return !!this.role?.allow_webhook_configuration;
+ },
+ set(value) {
+ this.$resources.role.setValue.submit(
+ {
+ allow_webhook_configuration: value
+ },
+ { onSuccess: this.$session.roles.reload }
+ );
+ }
}
},
methods: {
diff --git a/dashboard/src2/data/session.js b/dashboard/src2/data/session.js
index e357a5820b..a8e3b553e7 100644
--- a/dashboard/src2/data/session.js
+++ b/dashboard/src2/data/session.js
@@ -46,6 +46,11 @@ export let session = reactive({
? session.roles.data.some(role => role.allow_billing)
: true
),
+ hasWebhookConfigurationAccess: computed(() =>
+ session.roles.data.length
+ ? session.roles.data.some(role => role.allow_webhook_configuration)
+ : true
+ ),
hasAppsAccess: computed(() =>
session.roles.data.length
? session.roles.data.some(role => role.allow_apps)
diff --git a/press/api/account.py b/press/api/account.py
index e6e88e032b..5b6d53fe16 100644
--- a/press/api/account.py
+++ b/press/api/account.py
@@ -1,9 +1,9 @@
-# -*- coding: utf-8 -*-
# Copyright (c) 2019, Frappe and contributors
# For license information, please see license.txt
+from __future__ import annotations
import json
-from typing import Union
+from typing import TYPE_CHECKING
import frappe
import pyotp
@@ -21,7 +21,6 @@
from pypika.terms import ValueWrapper
from press.api.site import protected
-from press.press.doctype.account_request.account_request import AccountRequest
from press.press.doctype.team.team import (
Team,
get_child_team_members,
@@ -32,6 +31,9 @@
from press.utils import get_country_info, get_current_team, is_user_part_of_team
from press.utils.telemetry import capture
+if TYPE_CHECKING:
+ from press.press.doctype.account_request.account_request import AccountRequest
+
@frappe.whitelist(allow_guest=True)
def signup(email, product=None, referrer=None):
@@ -41,9 +43,7 @@ def signup(email, product=None, referrer=None):
frappe.set_user("Administrator")
email = email.strip().lower()
- exists, enabled = frappe.db.get_value(
- "Team", {"user": email}, ["name", "enabled"]
- ) or [0, 0]
+ exists, enabled = frappe.db.get_value("Team", {"user": email}, ["name", "enabled"]) or [0, 0]
account_request = None
if exists and not enabled:
@@ -66,15 +66,14 @@ def signup(email, product=None, referrer=None):
frappe.set_user(current_user)
if account_request:
return account_request.name
+ return None
@frappe.whitelist(allow_guest=True)
def verify_otp(account_request: str, otp: str):
account_request: "AccountRequest" = frappe.get_doc("Account Request", account_request)
# ensure no team has been created with this email
- if not account_request.product_trial and frappe.db.exists(
- "Team", {"user": account_request.email}
- ):
+ if not account_request.product_trial and frappe.db.exists("Team", {"user": account_request.email}):
frappe.throw("Invalid OTP. Please try again.")
if account_request.otp != otp:
frappe.throw("Invalid OTP. Please try again.")
@@ -86,16 +85,14 @@ def verify_otp(account_request: str, otp: str):
def resend_otp(account_request: str):
account_request: "AccountRequest" = frappe.get_doc("Account Request", account_request)
# ensure no team has been created with this email
- if not account_request.product_trial and frappe.db.exists(
- "Team", {"user": account_request.email}
- ):
+ if not account_request.product_trial and frappe.db.exists("Team", {"user": account_request.email}):
frappe.throw("Invalid Email")
account_request.reset_otp()
account_request.send_verification_email()
@frappe.whitelist(allow_guest=True)
-def setup_account(
+def setup_account( # noqa: C901
key,
first_name=None,
last_name=None,
@@ -180,9 +177,7 @@ def send_login_link(email):
key = frappe.generate_hash("Login Link", 20)
minutes = 10
- frappe.cache().set_value(
- f"one_time_login_key:{key}", email, expires_in_sec=minutes * 60
- )
+ frappe.cache().set_value(f"one_time_login_key:{key}", email, expires_in_sec=minutes * 60)
link = get_url(f"/api/method/press.api.account.login_using_key?key={key}")
@@ -228,7 +223,7 @@ def active_servers():
@frappe.whitelist()
-def disable_account(totp_code: str = None):
+def disable_account(totp_code: str | None):
user = frappe.session.user
team = get_current_team(get_doc=True)
@@ -246,6 +241,7 @@ def disable_account(totp_code: str = None):
return "Active Servers"
team.disable_account()
+ return None
@frappe.whitelist()
@@ -275,8 +271,7 @@ def delete_team(team):
"confirmed": [
(
"Confirmed",
- f"The process for deletion of your team {team} has been initiated."
- " Sorry to see you go :(",
+ f"The process for deletion of your team {team} has been initiated." " Sorry to see you go :(",
),
{"indicator_color": "green"},
],
@@ -322,9 +317,7 @@ def validate_request_key(key, timezone=None):
{"name": account_request.product_trial},
pluck="name",
)
- product_trial_doc = (
- frappe.get_doc("Product Trial", product_trial) if product_trial else None
- )
+ product_trial_doc = frappe.get_doc("Product Trial", product_trial) if product_trial else None
if not (account_request.is_saas_signup() or account_request.invited_by_parent_team):
capture("clicked_verify_link", "fc_signup", account_request.email)
return {
@@ -352,6 +345,7 @@ def validate_request_key(key, timezone=None):
if product_trial_doc
else None,
}
+ return None
@frappe.whitelist(allow_guest=True)
@@ -384,10 +378,11 @@ def get_account_request_from_key(key):
ar = frappe.get_doc("Account Request", {"request_key": key})
if ar.creation > frappe.utils.add_to_date(None, hours=-hours):
return ar
- elif ar.subdomain and ar.saas_app:
+ if ar.subdomain and ar.saas_app:
domain = frappe.db.get_value("Saas Settings", ar.saas_app, "domain")
if frappe.db.get_value("Site", ar.subdomain + "." + domain, "status") == "Active":
return ar
+ return None
@frappe.whitelist()
@@ -395,12 +390,9 @@ def get():
cached = frappe.cache.get_value("cached-account.get", user=frappe.session.user)
if cached:
return cached
- else:
- value = _get()
- frappe.cache.set_value(
- "cached-account.get", value, user=frappe.session.user, expires_in_sec=60
- )
- return value
+ value = _get()
+ frappe.cache.set_value("cached-account.get", value, user=frappe.session.user, expires_in_sec=60)
+ return value
def _get():
@@ -410,9 +402,7 @@ def _get():
team_doc = get_current_team(get_doc=True)
- parent_teams = [
- d.parent for d in frappe.db.get_all("Team Member", {"user": user}, ["parent"])
- ]
+ parent_teams = [d.parent for d in frappe.db.get_all("Team Member", {"user": user}, ["parent"])]
teams = []
if parent_teams:
@@ -431,9 +421,7 @@ def _get():
{"erpnext_partner": 1, "partner_email": team_doc.partner_email},
"billing_name",
)
- number_of_sites = frappe.db.count(
- "Site", {"team": team_doc.name, "status": ("!=", "Archived")}
- )
+ number_of_sites = frappe.db.count("Site", {"team": team_doc.name, "status": ("!=", "Archived")})
return {
"user": frappe.get_doc("User", user),
@@ -473,8 +461,7 @@ def current_team():
def get_permissions():
user = frappe.session.user
groups = tuple(
- frappe.get_all("Press Permission Group User", {"user": user}, pluck="parent")
- + ["1", "2"]
+ [*frappe.get_all("Press Permission Group User", {"user": user}, pluck="parent"), "1", "2"]
) # [1, 2] is for avoiding singleton tuples
docperms = frappe.db.sql(
f"""
@@ -485,9 +472,7 @@ def get_permissions():
""",
as_dict=True,
)
- return {
- perm.document_name: perm.actions.split(",") for perm in docperms if perm.actions
- }
+ return {perm.document_name: perm.actions.split(",") for perm in docperms if perm.actions}
@frappe.whitelist()
@@ -525,9 +510,7 @@ def signup_settings(product=None, fetch_countries=False, timezone=None):
if fetch_countries:
data["countries"] = frappe.db.get_all("Country", pluck="name")
- data["country"] = get_country_info().get("country") or get_country_from_timezone(
- timezone
- )
+ data["country"] = get_country_info().get("country") or get_country_from_timezone(timezone)
return data
@@ -535,9 +518,7 @@ def signup_settings(product=None, fetch_countries=False, timezone=None):
@frappe.whitelist(allow_guest=True)
def guest_feature_flags():
return {
- "enable_google_oauth": frappe.db.get_single_value(
- "Press Settings", "enable_google_oauth"
- ),
+ "enable_google_oauth": frappe.db.get_single_value("Press Settings", "enable_google_oauth"),
}
@@ -547,8 +528,7 @@ def create_child_team(title):
current_team = get_current_team(True)
if title in [
- d.team_title
- for d in frappe.get_all("Team", {"parent_team": current_team.name}, ["team_title"])
+ d.team_title for d in frappe.get_all("Team", {"parent_team": current_team.name}, ["team_title"])
]:
frappe.throw(f"Child Team {title} already exists.")
elif title == "Parent Team":
@@ -686,7 +666,7 @@ def send_reset_password_email(email: str):
now=True,
)
else:
- frappe.throw("User {0} does not exist".format(valid_email))
+ frappe.throw(f"User {valid_email} does not exist")
@frappe.whitelist(allow_guest=True)
@@ -712,9 +692,7 @@ def remove_team_member(user_email):
@frappe.whitelist()
def remove_child_team(child_team):
team = frappe.get_doc("Team", child_team)
- sites = frappe.get_all(
- "Site", {"status": ("!=", "Archived"), "team": team.name}, pluck="name"
- )
+ sites = frappe.get_all("Site", {"status": ("!=", "Archived"), "team": team.name}, pluck="name")
if sites:
frappe.throw("Child team has Active Sites")
@@ -736,9 +714,7 @@ def can_switch_to_team(team):
@frappe.whitelist()
def switch_team(team):
- user_is_part_of_team = frappe.db.exists(
- "Team Member", {"parent": team, "user": frappe.session.user}
- )
+ user_is_part_of_team = frappe.db.exists("Team Member", {"parent": team, "user": frappe.session.user})
user_is_system_user = frappe.session.data.user_type == "System User"
if user_is_part_of_team or user_is_system_user:
frappe.db.set_value("Team", {"user": frappe.session.user}, "last_used_team", team)
@@ -747,6 +723,7 @@ def switch_team(team):
"team": frappe.get_doc("Team", team),
"team_members": get_team_members(team),
}
+ return None
@frappe.whitelist()
@@ -807,14 +784,14 @@ def get_site_count(team):
@frappe.whitelist()
def user_prompts():
if frappe.local.dev_server:
- return
+ return None
team = get_current_team(True)
doc = frappe.get_doc("Team", team.name)
onboarding = doc.get_onboarding()
if not onboarding["complete"]:
- return
+ return None
if not doc.billing_address:
return [
@@ -822,14 +799,13 @@ def user_prompts():
"Update your billing details so that we can show it in your monthly invoice.",
]
- gstin, country = frappe.db.get_value(
- "Address", doc.billing_address, ["gstin", "country"]
- )
+ gstin, country = frappe.db.get_value("Address", doc.billing_address, ["gstin", "country"])
if country == "India" and not gstin:
return [
"UpdateBillingDetails",
"If you have a registered GSTIN number, you are required to update it, so that we can generate a GST Invoice.",
]
+ return None
@frappe.whitelist()
@@ -894,7 +870,7 @@ def redirect_to(location):
)
-def get_frappe_io_auth_url() -> Union[str, None]:
+def get_frappe_io_auth_url() -> str | None:
"""Get auth url for oauth login with frappe.io."""
try:
@@ -902,7 +878,7 @@ def get_frappe_io_auth_url() -> Union[str, None]:
"Social Login Key", filters={"enable_social_login": 1, "provider_name": "Frappe"}
)
except DoesNotExistError:
- return
+ return None
if (
provider.base_url
@@ -911,16 +887,13 @@ def get_frappe_io_auth_url() -> Union[str, None]:
and provider.get_password("client_secret")
):
return get_oauth2_authorize_url(provider.name, redirect_to="")
+ return None
@frappe.whitelist()
def get_emails():
team = get_current_team()
- data = frappe.get_all(
- "Communication Email", filters={"parent": team}, fields=["type", "value"]
- )
-
- return data
+ return frappe.get_all("Communication Email", filters={"parent": team}, fields=["type", "value"])
@frappe.whitelist()
@@ -928,7 +901,7 @@ def update_emails(data):
from frappe.utils import validate_email_address
data = {x["type"]: x["value"] for x in json.loads(data)}
- for key, value in data.items():
+ for _key, value in data.items():
validate_email_address(value, throw=True)
team_doc = get_current_team(get_doc=True)
@@ -940,9 +913,7 @@ def update_emails(data):
@frappe.whitelist()
def add_key(key):
- frappe.get_doc(
- {"doctype": "User SSH Key", "user": frappe.session.user, "ssh_public_key": key}
- ).insert()
+ frappe.get_doc({"doctype": "User SSH Key", "user": frappe.session.user, "ssh_public_key": key}).insert()
@frappe.whitelist()
@@ -1011,9 +982,7 @@ def get_permission_options(name, ptype):
available_actions,
)
- doctypes = frappe.get_all(
- "Press Method Permission", pluck="document_type", distinct=True
- )
+ doctypes = frappe.get_all("Press Method Permission", pluck="document_type", distinct=True)
options = []
for doctype in doctypes:
@@ -1096,9 +1065,7 @@ def update_permissions(user, ptype, updated):
@frappe.whitelist()
def groups():
- return frappe.get_all(
- "Press Permission Group", {"team": get_current_team()}, ["name", "title"]
- )
+ return frappe.get_all("Press Permission Group", {"team": get_current_team()}, ["name", "title"])
@frappe.whitelist()
@@ -1159,12 +1126,10 @@ def get_permission_roles():
PressRole.allow_site_creation,
PressRole.allow_bench_creation,
PressRole.allow_server_creation,
+ PressRole.allow_webhook_configuration,
)
.join(PressRoleUser)
- .on(
- (PressRole.name == PressRoleUser.parent)
- & (PressRoleUser.user == frappe.session.user)
- )
+ .on((PressRole.name == PressRoleUser.parent) & (PressRoleUser.user == frappe.session.user))
.where(PressRole.team == get_current_team())
.run(as_dict=True)
)
@@ -1194,8 +1159,8 @@ def verify_2fa(user, totp_code):
if verified:
return verified
- else:
- frappe.throw("Invalid 2FA code", frappe.AuthenticationError)
+ frappe.throw("Invalid 2FA code", frappe.AuthenticationError)
+ return None
@frappe.whitelist()
@@ -1203,9 +1168,7 @@ def get_2fa_qr_code_url():
"""Get the QR code URL for 2FA provisioning"""
if frappe.db.exists("User 2FA", frappe.session.user):
- user_totp_secret = get_decrypted_password(
- "User 2FA", frappe.session.user, "totp_secret"
- )
+ user_totp_secret = get_decrypted_password("User 2FA", frappe.session.user, "totp_secret")
else:
user_totp_secret = pyotp.random_base32()
frappe.get_doc(
@@ -1226,9 +1189,7 @@ def enable_2fa(totp_code):
"""Enable 2FA for the user after verifying the TOTP code"""
if frappe.db.exists("User 2FA", frappe.session.user):
- user_totp_secret = get_decrypted_password(
- "User 2FA", frappe.session.user, "totp_secret"
- )
+ user_totp_secret = get_decrypted_password("User 2FA", frappe.session.user, "totp_secret")
else:
frappe.throw(f"2FA is not enabled for {frappe.session.user}")
@@ -1243,9 +1204,7 @@ def disable_2fa(totp_code):
"""Disable 2FA for the user after verifying the TOTP code"""
if frappe.db.exists("User 2FA", frappe.session.user):
- user_totp_secret = get_decrypted_password(
- "User 2FA", frappe.session.user, "totp_secret"
- )
+ user_totp_secret = get_decrypted_password("User 2FA", frappe.session.user, "totp_secret")
else:
frappe.throw(f"2FA is not enabled for {frappe.session.user}")
diff --git a/press/api/client.py b/press/api/client.py
index e8c483149e..908a3427aa 100644
--- a/press/api/client.py
+++ b/press/api/client.py
@@ -72,7 +72,6 @@
"Dashboard Banner",
"App Release Approval Request",
"Press Webhook",
- "Press Webhook Log",
]
ALLOWED_DOCTYPES_FOR_SUPPORT = [
diff --git a/press/api/webhook.py b/press/api/webhook.py
index 69606b02f1..92d6338511 100644
--- a/press/api/webhook.py
+++ b/press/api/webhook.py
@@ -7,6 +7,8 @@
import frappe
+from press.press.doctype.press_role.press_role import check_role_permissions
+
@frappe.whitelist(allow_guest=True)
def available_events():
@@ -20,6 +22,7 @@ def available_events():
@frappe.whitelist()
def add(endpoint: str, secret: str, events: list[str]):
+ check_role_permissions("Press Webhook")
doc = frappe.new_doc("Press Webhook")
doc.endpoint = endpoint
doc.secret = secret
@@ -31,6 +34,7 @@ def add(endpoint: str, secret: str, events: list[str]):
@frappe.whitelist()
def update(name: str, endpoint: str, secret: str, events: list[str]):
+ check_role_permissions("Press Webhook")
doc = frappe.get_doc("Press Webhook", name)
doc.endpoint = endpoint
if secret:
@@ -45,6 +49,7 @@ def update(name: str, endpoint: str, secret: str, events: list[str]):
@frappe.whitelist()
def attempts(webhook: str):
+ check_role_permissions("Press Webhook Log")
doc = frappe.get_doc("Press Webhook", webhook)
doc.has_permission("read")
@@ -70,6 +75,7 @@ def attempts(webhook: str):
@frappe.whitelist()
def attempt(name: str):
+ check_role_permissions("Press Webhook Attempt")
doc = frappe.get_doc("Press Webhook Attempt", name)
doc.has_permission("read")
data = doc.as_dict()
diff --git a/press/press/doctype/press_role/press_role.json b/press/press/doctype/press_role/press_role.json
index 57120f320f..4566f0c4bc 100644
--- a/press/press/doctype/press_role/press_role.json
+++ b/press/press/doctype/press_role/press_role.json
@@ -18,6 +18,7 @@
"allow_site_creation",
"allow_bench_creation",
"allow_server_creation",
+ "allow_webhook_configuration",
"section_break_zdiv",
"users"
],
@@ -100,11 +101,17 @@
"fieldname": "admin_access",
"fieldtype": "Check",
"label": "Admin Access"
+ },
+ {
+ "default": "0",
+ "fieldname": "allow_webhook_configuration",
+ "fieldtype": "Check",
+ "label": "Allow Webhook Configuration"
}
],
"index_web_pages_for_search": 1,
"links": [],
- "modified": "2024-09-04 10:13:47.505718",
+ "modified": "2024-09-26 15:51:19.122128",
"modified_by": "Administrator",
"module": "Press",
"name": "Press Role",
diff --git a/press/press/doctype/press_role/press_role.py b/press/press/doctype/press_role/press_role.py
index 6d35658f1b..b137eedf8d 100644
--- a/press/press/doctype/press_role/press_role.py
+++ b/press/press/doctype/press_role/press_role.py
@@ -1,5 +1,6 @@
# Copyright (c) 2024, Frappe and contributors
# For license information, please see license.txt
+from __future__ import annotations
import frappe
from frappe.model.document import Document
@@ -15,6 +16,7 @@ class PressRole(Document):
if TYPE_CHECKING:
from frappe.types import DF
+
from press.press.doctype.press_role_user.press_role_user import PressRoleUser
admin_access: DF.Check
@@ -24,12 +26,13 @@ class PressRole(Document):
allow_partner: DF.Check
allow_server_creation: DF.Check
allow_site_creation: DF.Check
+ allow_webhook_configuration: DF.Check
team: DF.Link
title: DF.Data
users: DF.Table[PressRoleUser]
# end: auto-generated types
- dashboard_fields = [
+ dashboard_fields = (
"title",
"users",
"admin_access",
@@ -39,14 +42,13 @@ class PressRole(Document):
"allow_site_creation",
"allow_bench_creation",
"allow_server_creation",
+ "allow_webhook_configuration",
"team",
- ]
+ )
def before_insert(self):
if frappe.db.exists("Press Role", {"title": self.title, "team": self.team}):
- frappe.throw(
- "Role with title {0} already exists".format(self.title), frappe.DuplicateEntryError
- )
+ frappe.throw(f"Role with title {self.title} already exists", frappe.DuplicateEntryError)
if not frappe.local.system_user() and frappe.session.user != frappe.db.get_value(
"Team", self.team, "user"
@@ -68,6 +70,7 @@ def validate(self):
self.allow_site_creation = 1
self.allow_bench_creation = 1
self.allow_server_creation = 1
+ self.allow_webhook_configuration = 1
@dashboard_whitelist()
def add_user(self, user):
@@ -123,7 +126,15 @@ def check_role_permissions(doctype: str, name: str | None = None) -> list[str] |
"""
from press.utils import has_role
- if doctype not in ["Site", "Release Group", "Server", "Marketplace App"]:
+ if doctype not in [
+ "Site",
+ "Release Group",
+ "Server",
+ "Marketplace App",
+ "Press Webhook",
+ "Press Webhook Log",
+ "Press Webhook Attempt",
+ ]:
return []
if frappe.local.system_user() or has_role("Press Support Agent"):
@@ -140,11 +151,21 @@ def check_role_permissions(doctype: str, name: str | None = None) -> list[str] |
.where(PressRole.team == frappe.local.team().name)
)
- if doctype == "Marketplace App":
- if roles := query.select(PressRole.allow_apps).run(as_dict=1):
- # throw error if any of the roles don't have permission for apps
- if not any(perm.allow_apps for perm in roles):
- frappe.throw("Not permitted", frappe.PermissionError)
+ if (
+ doctype == "Marketplace App"
+ and (roles := query.select(PressRole.allow_apps).run(as_dict=1))
+ and not any(perm.allow_apps for perm in roles)
+ ):
+ # throw error if any of the roles don't have permission for apps
+ frappe.throw("Not permitted", frappe.PermissionError)
+
+ elif (
+ doctype in ["Press Webhook", "Press Webhook Log", "Press Webhook Attempt"]
+ and (roles := query.select(PressRole.allow_webhook_configuration).run(as_dict=1))
+ and not any(perm.allow_webhook_configuration for perm in roles)
+ ):
+ # throw error if any of the roles don't have permission for webhooks
+ frappe.throw("Not permitted", frappe.PermissionError)
elif doctype in ["Site", "Release Group", "Server"]:
field = doctype.lower().replace(" ", "_")