diff --git a/superset/security/analytics_db_safety.py b/superset/security/analytics_db_safety.py index bfa93613e1f07..29583b5255201 100644 --- a/superset/security/analytics_db_safety.py +++ b/superset/security/analytics_db_safety.py @@ -14,6 +14,8 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import re + from flask_babel import lazy_gettext as _ from sqlalchemy.engine.url import URL from sqlalchemy.exc import NoSuchModuleError @@ -24,18 +26,20 @@ # list of unsafe SQLAlchemy dialects BLOCKLIST = { # sqlite creates a local DB, which allows mapping server's filesystem - "sqlite", + re.compile(r"sqlite(?:\+[^\s]*)?$"), # shillelagh allows opening local files (eg, 'SELECT * FROM "csv:///etc/passwd"') - "shillelagh", - "shillelagh+apsw", + re.compile(r"shillelagh$"), + re.compile(r"shillelagh\+apsw$"), } def check_sqlalchemy_uri(uri: URL) -> None: - if uri.drivername in BLOCKLIST: + for blocklist_regex in BLOCKLIST: + if not re.match(blocklist_regex, uri.drivername): + continue try: dialect = uri.get_dialect().__name__ - except NoSuchModuleError: + except (NoSuchModuleError, ValueError): dialect = uri.drivername raise SupersetSecurityException( diff --git a/tests/integration_tests/security/analytics_db_safety_tests.py b/tests/integration_tests/security/analytics_db_safety_tests.py index f6518fe93564d..7e36268e30286 100644 --- a/tests/integration_tests/security/analytics_db_safety_tests.py +++ b/tests/integration_tests/security/analytics_db_safety_tests.py @@ -14,30 +14,78 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Optional import pytest from sqlalchemy.engine.url import make_url from superset.exceptions import SupersetSecurityException from superset.security.analytics_db_safety import check_sqlalchemy_uri -from tests.integration_tests.base_tests import SupersetTestCase -class TestDBConnections(SupersetTestCase): - def test_check_sqlalchemy_uri_ok(self): - check_sqlalchemy_uri(make_url("postgres://user:password@test.com")) - - def test_check_sqlalchemy_url_sqlite(self): - with pytest.raises(SupersetSecurityException) as excinfo: - check_sqlalchemy_uri(make_url("sqlite:///home/superset/bad.db")) - assert ( - str(excinfo.value) - == "SQLiteDialect_pysqlite cannot be used as a data source for security reasons." - ) - +@pytest.mark.parametrize( + "sqlalchemy_uri, error, error_message", + [ + ("postgres://user:password@test.com", False, None), + ( + "sqlite:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "sqlite+pysqlite:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "sqlite+aiosqlite:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "sqlite+pysqlcipher:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "sqlite+:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "sqlite+new+driver:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "sqlite+new+:///home/superset/bad.db", + True, + "SQLiteDialect_pysqlite cannot be used as a data source for security reasons.", + ), + ( + "shillelagh:///home/superset/bad.db", + True, + "shillelagh cannot be used as a data source for security reasons.", + ), + ( + "shillelagh+apsw:///home/superset/bad.db", + True, + "shillelagh cannot be used as a data source for security reasons.", + ), + ("shillelagh+:///home/superset/bad.db", False, None), + ( + "shillelagh+something:///home/superset/bad.db", + False, + None, + ), + ], +) +def test_check_sqlalchemy_uri( + sqlalchemy_uri: str, error: bool, error_message: Optional[str] +): + if error: with pytest.raises(SupersetSecurityException) as excinfo: - check_sqlalchemy_uri(make_url("shillelagh:///home/superset/bad.db")) - assert ( - str(excinfo.value) - == "shillelagh cannot be used as a data source for security reasons." - ) + check_sqlalchemy_uri(make_url(sqlalchemy_uri)) + assert str(excinfo.value) == error_message + else: + check_sqlalchemy_uri(make_url(sqlalchemy_uri))