From c7c8e83244f90216150215ff73e1a429ed95d545 Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Wed, 1 Nov 2023 07:57:31 -0300 Subject: [PATCH 1/8] initial pass add line at the end --- .../secure_flask_session_config.py | 96 +++++++ .../test_secure_flask_session_config.py | 267 ++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 src/core_codemods/secure_flask_session_config.py create mode 100644 tests/codemods/test_secure_flask_session_config.py diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py new file mode 100644 index 00000000..7a561b52 --- /dev/null +++ b/src/core_codemods/secure_flask_session_config.py @@ -0,0 +1,96 @@ +import libcst as cst +from libcst.codemod import Codemod, CodemodContext, ContextAwareVisitor +from libcst.metadata import ParentNodeProvider, ScopeProvider + +from libcst import matchers +from codemodder.codemods.base_codemod import ReviewGuidance +from codemodder.codemods.api import BaseCodemod +from codemodder.codemods.utils_mixin import NameResolutionMixin + + +class SecureFlaskSessionConfig(BaseCodemod): + METADATA_DEPENDENCIES = BaseCodemod.METADATA_DEPENDENCIES + ( + ParentNodeProvider, + ScopeProvider, + ) + NAME = "secure-flask-session-configuration" + SUMMARY = "UTODO" + REVIEW_GUIDANCE = ReviewGuidance.MERGE_AFTER_REVIEW + DESCRIPTION = "TODO" + REFERENCES = [ + { + "url": "todo", + "description": "", + } + ] + + def transform_module_impl(self, tree: cst.Module) -> cst.Module: + flask_visitor = FindConfigCalls(self.context) + tree.visit(flask_visitor) + if not flask_visitor.flask_app_name: + return tree + + # TODO: iterate through flask_visitor.config_access + # TODO: use a list of secure config names, values to only write the needed ones out + result = self.insert_config_line_endof_mod(tree, flask_visitor.flask_app_name) + return result + + def insert_config_line_endof_mod( + self, original_node: cst.Module, app_name: str + ) -> cst.Module: + # TODO: record change + # line_number is the end of the module where we will insert the new flag. + # pos_to_match = self.node_position(original_node) + # line_number = pos_to_match.end.line + # self.changes_in_file.append( + # Change(line_number, DjangoSessionCookieSecureOff.CHANGE_DESCRIPTION) + # ) + # self.file_context.codemod_changes.append( + # Change(line_number, self.CHANGE_DESCRIPTION) + # ) + secure_configs = """SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax'""" + final_line = cst.parse_statement(f"{app_name}.config.update({secure_configs})") + new_body = original_node.body + (final_line,) + return original_node.with_changes(body=new_body) + + +class FindConfigCalls(ContextAwareVisitor, NameResolutionMixin): + """ + Visitor to find calls to flask.Flask and related `.config` accesses. + """ + + METADATA_DEPENDENCIES = (ParentNodeProvider,) + + def __init__(self, context: CodemodContext) -> None: + self.config_access: list = [] + self.flask_app_name = "" + super().__init__(context) + + def _find_config_accesses(self, flask_app_attr: cst.AnnAssign | cst.Assign): + assignments = self.find_assignments(flask_app_attr) + for assignment in assignments: + if assignment.references: + # Flask app instance is accessed + references_to_app = [x.node for x in assignment.references] + for node in references_to_app: + parent = self.get_metadata(ParentNodeProvider, node) + match parent: + case cst.Attribute(): + config = cst.Name(value="config") + if matchers.matches( + parent, matchers.Attribute(value=node, attr=config) + ): + gparent = self.get_metadata(ParentNodeProvider, parent) + self.config_access.append(gparent) + + def leave_Call(self, original_node: cst.Call) -> None: + true_name = self.find_base_name(original_node.func) + if true_name == "flask.Flask": + flask_app_parent = self.get_metadata(ParentNodeProvider, original_node) + match flask_app_parent: + case cst.AnnAssign() | cst.Assign(): + flask_app_attr = flask_app_parent.targets[0].target + self.flask_app_name = flask_app_attr.value + self._find_config_accesses(flask_app_attr) + + return original_node diff --git a/tests/codemods/test_secure_flask_session_config.py b/tests/codemods/test_secure_flask_session_config.py new file mode 100644 index 00000000..e1f23bd2 --- /dev/null +++ b/tests/codemods/test_secure_flask_session_config.py @@ -0,0 +1,267 @@ +import pytest +from core_codemods.secure_flask_session_config import SecureFlaskSessionConfig +from tests.codemods.base_codemod_test import BaseCodemodTest +from textwrap import dedent + + +class TestSecureFlaskSessionConfig(BaseCodemodTest): + codemod = SecureFlaskSessionConfig + + def test_name(self): + assert self.codemod.name() == "secure-flask-session-configuration" + + def test_no_flask_app(self, tmpdir): + input_code = """\ + import flask + + response = flask.Response() + var = "hello" + response.set_cookie("name", "value") + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + + def test_app_not_accessed(self, tmpdir): + input_code = """\ + import flask + + app = flask.Flask(__name__) + # more code + print(1) + """ + expexted_output = """\ + import flask + + app = flask.Flask(__name__) + # more code + print(1) + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + # assert len(self.file_context.codemod_changes) == 1 + + def test_app_defined_separate_module(self, tmpdir): + # TODO: test this as an integration test with two real modules + input_code = """\ + from my_app_module import app + + app.config["SESSION_COOKIE_SECURE"] = False + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + + def test_app_not_assigned(self, tmpdir): + input_code = """\ + import flask + + flask.Flask(__name__) + print(1) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + + def test_app_accessed_config_not_called(self, tmpdir): + input_code = """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + # more code + """ + expexted_output = """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + # more code + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + # assert len(self.file_context.codemod_changes) == 1 + + def test_from_import(self, tmpdir): + input_code = """\ + from flask import Flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + # more code + """ + expexted_output = """\ + from flask import Flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + # more code + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + # assert len(self.file_context.codemod_changes) == 1 + + def test_import_alias(self, tmpdir): + input_code = f"""\ + from flask import Flask as flask_app + app = flask_app(__name__) + app.secret_key = "dev" + # more code + """ + expexted_output = f"""\ + from flask import Flask as flask_app + app = flask_app(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + # more code + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + # assert len(self.file_context.codemod_changes) == 1 + + @pytest.mark.parametrize( + "input_code,expected_output", + [ + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["TESTING"] = True + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["TESTING"] = True + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config.testing = True + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config.testing = True + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["SESSION_COOKIE_SECURE"] = False + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["SESSION_COOKIE_SECURE"] = False + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["SESSION_COOKIE_HTTPONLY"] = False + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["SESSION_COOKIE_HTTPONLY"] = False + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Strict') + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Strict') + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ( + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["SESSION_COOKIE_SECURE"] = False + app.config["SESSION_COOKIE_SECURE"] = True + """, + """\ + import flask + + app = flask.Flask(__name__) + app.secret_key = "dev" + app.config["SESSION_COOKIE_SECURE"] = False + app.config["SESSION_COOKIE_SECURE"] = True + app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + """, + ), + ], + ) + def test_config_accessed(self, tmpdir, input_code, expected_output): + self.run_and_assert(tmpdir, input_code, expected_output) + + @pytest.mark.skip() + def test_func_scope(self, tmpdir): + input_code = """\ + from flask import Flask + app = Flask(__name__) + + @app.route('/') + def hello_world(): + return 'Hello World!' + + def configure(): + app.config['TESTING'] = True + + if __name__ == '__main__': + configure() + app.run() + """ + expexted_output = """\ + # TODO correct fix could be multiple options, + # either within configure() call or after it's called + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + # assert len(self.file_context.codemod_changes) == 1 From 993a4a9974d2defc47d1e30335085e317b5506aa Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Mon, 6 Nov 2023 09:39:50 -0300 Subject: [PATCH 2/8] handle single config line cases --- .../secure_flask_session_config.py | 208 +++++++++++++++++- .../test_secure_flask_session_config.py | 195 +++++++--------- 2 files changed, 282 insertions(+), 121 deletions(-) diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py index 7a561b52..0371854a 100644 --- a/src/core_codemods/secure_flask_session_config.py +++ b/src/core_codemods/secure_flask_session_config.py @@ -24,20 +24,172 @@ class SecureFlaskSessionConfig(BaseCodemod): } ] + SECURE_SESSION_CONFIGS = dict( + # None value indicates unassigned, using default is safe + # values in order of precedence + SESSION_COOKIE_HTTPONLY=[None, True], + SESSION_COOKIE_SECURE=[True], + SESSION_COOKIE_SAMESITE=["Lax", "Strict"], + ) + def transform_module_impl(self, tree: cst.Module) -> cst.Module: flask_visitor = FindConfigCalls(self.context) tree.visit(flask_visitor) if not flask_visitor.flask_app_name: return tree - # TODO: iterate through flask_visitor.config_access - # TODO: use a list of secure config names, values to only write the needed ones out - result = self.insert_config_line_endof_mod(tree, flask_visitor.flask_app_name) + if len(flask_visitor.config_access) == 0: + return self.insert_config_line_endof_mod( + tree, flask_visitor.flask_app_name, self.SECURE_SESSION_CONFIGS + ) + + # Handle single config.update line, reuse it + if len(flask_visitor.config_access) == 1 and isinstance( + single_config := flask_visitor.config_access[0], cst.Call + ): + defined_configs = self._get_configs(single_config) + + configs_to_write = {**self.SECURE_SESSION_CONFIGS.copy(), **defined_configs} + + for key, val in configs_to_write.items(): + if ( + key in self.SECURE_SESSION_CONFIGS + and val in self.SECURE_SESSION_CONFIGS[key] + ): + del configs_to_write[key] + + return self.reuse_config_line( + tree, flask_visitor.flask_app_name, configs_to_write + ) + + # Handle single config['access'] line, add update line excluding configs already set + if len(flask_visitor.config_access) == 1 and isinstance( + single_config := flask_visitor.config_access[0], cst.Assign + ): + defined_config = self._get_config_from_slice(single_config) + defined_key = list(defined_config.keys())[0] + configs_to_write = self.SECURE_SESSION_CONFIGS.copy() + + for key, val in configs_to_write.items(): + if key in defined_config and val in self.SECURE_SESSION_CONFIGS[key]: + del configs_to_write[key] + + # any of the secure sesh in defined config, we want to reuse the line + # but flip it if necessary + if defined_key in self.SECURE_SESSION_CONFIGS: + del configs_to_write[defined_key] + return self.reuse_config_subscript_line( + tree, flask_visitor.flask_app_name, configs_to_write, defined_key + ) + return self.insert_config_line_endof_mod( + tree, flask_visitor.flask_app_name, configs_to_write + ) + + + defined_configs = self.get_defined_configs(flask_visitor.config_access) + breakpoint() + configs_to_write = self.SECURE_SESSION_CONFIGS.copy() + for key, vals in defined_configs.items(): + defined_val = vals[-1] + if key in self.SECURE_SESSION_CONFIGS and defined_val in self.SECURE_SESSION_CONFIGS[key]: + del configs_to_write[key] + + result = self.insert_config_line_endof_mod( + tree, flask_visitor.flask_app_name, configs_to_write + ) return result + def get_defined_configs(self, config_access): + all_defined_configs = {} + for config_line in config_access: + match config_line: + case cst.Call(): + # app.config.update(...) + defined_configs = self._get_configs(config_line) + all_defined_configs.update(defined_configs) + case cst.Assign(): + # app.config['...'] + defined_configs = self._get_config_from_slice(config_line) + all_defined_configs.update(defined_configs) + + return all_defined_configs + def _get_configs(self, config_line: cst.Call): + defined_configs = {} + for arg in config_line.args: + defined_configs[arg.keyword.value] = [true_value(arg.value)] + return defined_configs + + def _get_config_from_slice(self, config_line: cst.Assign): + defined_configs = {} + key = true_value(config_line.targets[0].target.slice[0].slice.value) + defined_configs[key] = [true_value(config_line.value)] + return defined_configs + + def reuse_config_line( + self, original_node: cst.Module, app_name: str, configs: dict + ) -> cst.Module: + if not configs: + return original_node + # TODO: record change + # line_number is the end of the module where we will insert the new flag. + # pos_to_match = self.node_position(original_node) + # line_number = pos_to_match.end.line + # self.changes_in_file.append( + # Change(line_number, DjangoSessionCookieSecureOff.CHANGE_DESCRIPTION) + # ) + # self.file_context.codemod_changes.append( + # Change(line_number, self.CHANGE_DESCRIPTION) + # ) + config_string = ", ".join( + f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" + for key, value in configs.items() + if value and value[0] is not None + ) + # secure_configs = """SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax'""" + final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") + new_body = original_node.body[:-1] + (final_line,) + return original_node.with_changes(body=new_body) + + def reuse_config_subscript_line( + self, original_node: cst.Module, app_name: str, configs: dict, defined_key: str + ) -> cst.Module: + if not configs: + return original_node + # TODO: record change + # line_number is the end of the module where we will insert the new flag. + # pos_to_match = self.node_position(original_node) + # line_number = pos_to_match.end.line + # self.changes_in_file.append( + # Change(line_number, DjangoSessionCookieSecureOff.CHANGE_DESCRIPTION) + # ) + # self.file_context.codemod_changes.append( + # Change(line_number, self.CHANGE_DESCRIPTION) + # ) + config_string = ", ".join( + f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" + for key, value in configs.items() + if value and value[0] is not None + ) + + final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") + secure_val = ( + self.SECURE_SESSION_CONFIGS[defined_key][0] + or self.SECURE_SESSION_CONFIGS[defined_key][1] + ) + final_config_subscript_line = cst.parse_statement( + f"{app_name}.config['{defined_key}'] = {secure_val}" + ) + new_body = original_node.body[:-1] + ( + final_config_subscript_line, + final_line, + ) + return original_node.with_changes(body=new_body) + def insert_config_line_endof_mod( - self, original_node: cst.Module, app_name: str + self, original_node: cst.Module, app_name: str, configs: dict ) -> cst.Module: + if not configs: + return original_node # TODO: record change # line_number is the end of the module where we will insert the new flag. # pos_to_match = self.node_position(original_node) @@ -48,8 +200,14 @@ def insert_config_line_endof_mod( # self.file_context.codemod_changes.append( # Change(line_number, self.CHANGE_DESCRIPTION) # ) - secure_configs = """SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax'""" - final_line = cst.parse_statement(f"{app_name}.config.update({secure_configs})") + config_string = ", ".join( + f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" + for key, value in configs.items() + if value and value[0] is not None + ) + if not config_string: + return original_node + final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") new_body = original_node.body + (final_line,) return original_node.with_changes(body=new_body) @@ -81,7 +239,16 @@ def _find_config_accesses(self, flask_app_attr: cst.AnnAssign | cst.Assign): parent, matchers.Attribute(value=node, attr=config) ): gparent = self.get_metadata(ParentNodeProvider, parent) - self.config_access.append(gparent) + ggparent = self.get_metadata( + ParentNodeProvider, gparent + ) + if matchers.matches(gparent, matchers.Subscript()): + gggparent = self.get_metadata( + ParentNodeProvider, ggparent + ) + self.config_access.append(gggparent) + else: + self.config_access.append(ggparent) def leave_Call(self, original_node: cst.Call) -> None: true_name = self.find_base_name(original_node.func) @@ -94,3 +261,30 @@ def leave_Call(self, original_node: cst.Call) -> None: self._find_config_accesses(flask_app_attr) return original_node + + +def true_value(node: cst.Name | cst.SimpleString): + # todo: move to a more general util + from codemodder.project_analysis.file_parsers.utils import clean_simplestring + + # convert 'True' to True, etc + # '123' to 123 + # leave strs as they are + # Try to convert the string to a boolean, integer, or float + match node: + case cst.SimpleString(): + return clean_simplestring(node) + case cst.Name(): + val = node.value + if val.lower() == "true": + return True + elif val.lower() == "false": + return False + try: + return int(val) + except ValueError: + try: + return float(val) + except ValueError: + # If no conversion worked, return the original string + return val diff --git a/tests/codemods/test_secure_flask_session_config.py b/tests/codemods/test_secure_flask_session_config.py index e1f23bd2..f33cf9b8 100644 --- a/tests/codemods/test_secure_flask_session_config.py +++ b/tests/codemods/test_secure_flask_session_config.py @@ -34,7 +34,7 @@ def test_app_not_accessed(self, tmpdir): app = flask.Flask(__name__) # more code print(1) - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) # assert len(self.file_context.codemod_changes) == 1 @@ -71,7 +71,7 @@ def test_app_accessed_config_not_called(self, tmpdir): app = flask.Flask(__name__) app.secret_key = "dev" - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') # more code """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) @@ -90,7 +90,7 @@ def test_from_import(self, tmpdir): app = flask.Flask(__name__) app.secret_key = "dev" - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') # more code """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) @@ -107,140 +107,107 @@ def test_import_alias(self, tmpdir): from flask import Flask as flask_app app = flask_app(__name__) app.secret_key = "dev" - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') + app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') # more code """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) # assert len(self.file_context.codemod_changes) == 1 @pytest.mark.parametrize( - "input_code,expected_output", + "config_lines,expected_config_lines", [ ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config""", + """app.config +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["TESTING"] = True - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["TESTING"] = True - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config["TESTING"] = True""", + """app.config["TESTING"] = True +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config.testing = True - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config.testing = True - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config.testing = True""", + """app.config.testing = True +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["SESSION_COOKIE_SECURE"] = False - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["SESSION_COOKIE_SECURE"] = False - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["SESSION_COOKIE_HTTPONLY"] = False - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["SESSION_COOKIE_HTTPONLY"] = False - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config.update(SECRET_KEY='123SOMEKEY')""", + """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax', SECRET_KEY='123SOMEKEY')""", ), ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Strict') - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Strict') - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config.update(SESSION_COOKIE_SECURE=True)""", + """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["SESSION_COOKIE_SECURE"] = False - app.config["SESSION_COOKIE_SECURE"] = True - """, - """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config["SESSION_COOKIE_SECURE"] = False - app.config["SESSION_COOKIE_SECURE"] = True - app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """, + """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + ), + ( + """app.config['SESSION_COOKIE_SECURE'] = False""", + """app.config['SESSION_COOKIE_SECURE'] = True +app.config.update(SESSION_COOKIE_SAMESITE='Lax')""", + ), + ( + """app.config['SESSION_COOKIE_HTTPONLY'] = False""", + """app.config['SESSION_COOKIE_HTTPONLY'] = True +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + ), + ( + '''app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" +''', + '''app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" +''', + ), + ( + '''app.config["SESSION_COOKIE_SECURE"] = False +app.config["SESSION_COOKIE_SAMESITE"] = None +''', + '''app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Lax" +''', ), +# ( +# '''app.config["SESSION_COOKIE_SECURE"] = False +# app.config["SESSION_COOKIE_HTTPONLY"] = False +# app.config["SESSION_COOKIE_SAMESITE"] = "Strict" +# ''', +# '''app.config["SESSION_COOKIE_SECURE"] = True +# app.config["SESSION_COOKIE_HTTPONLY"] = True +# app.config["SESSION_COOKIE_SAMESITE"] = "Strict" +# ''', +# ), + # ( + # """app.config["SESSION_COOKIE_SECURE"] = False + # app.config["SESSION_COOKIE_SECURE"] = True + # """, + # """app.config["SESSION_COOKIE_SECURE"] = False + # app.config["SESSION_COOKIE_SECURE"] = True + # app.config.update(SESSION_COOKIE_SAMESITE='Lax') + # """, + # ), ], ) - def test_config_accessed(self, tmpdir, input_code, expected_output): - self.run_and_assert(tmpdir, input_code, expected_output) + def test_config_accessed_variations( + self, tmpdir, config_lines, expected_config_lines + ): + input_code = f"""import flask +app = flask.Flask(__name__) +app.secret_key = "dev" +{config_lines} +""" + expected_output = f"""import flask +app = flask.Flask(__name__) +app.secret_key = "dev" +{expected_config_lines} +""" + self.run_and_assert(tmpdir, dedent(input_code), dedent(expected_output)) @pytest.mark.skip() def test_func_scope(self, tmpdir): From 7c364f3d39a32164b5b927d1319aef23c1d03ff4 Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Mon, 6 Nov 2023 14:38:13 -0300 Subject: [PATCH 3/8] attempt get all configs and after change with_changes --- .../secure_flask_session_config.py | 108 ++++++++---------- 1 file changed, 48 insertions(+), 60 deletions(-) diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py index 0371854a..7944b8b7 100644 --- a/src/core_codemods/secure_flask_session_config.py +++ b/src/core_codemods/secure_flask_session_config.py @@ -38,66 +38,47 @@ def transform_module_impl(self, tree: cst.Module) -> cst.Module: if not flask_visitor.flask_app_name: return tree - if len(flask_visitor.config_access) == 0: + if len(config_access := flask_visitor.config_access) == 0: return self.insert_config_line_endof_mod( tree, flask_visitor.flask_app_name, self.SECURE_SESSION_CONFIGS ) - # Handle single config.update line, reuse it - if len(flask_visitor.config_access) == 1 and isinstance( - single_config := flask_visitor.config_access[0], cst.Call - ): - defined_configs = self._get_configs(single_config) - - configs_to_write = {**self.SECURE_SESSION_CONFIGS.copy(), **defined_configs} - - for key, val in configs_to_write.items(): - if ( - key in self.SECURE_SESSION_CONFIGS - and val in self.SECURE_SESSION_CONFIGS[key] - ): - del configs_to_write[key] - - return self.reuse_config_line( - tree, flask_visitor.flask_app_name, configs_to_write - ) - - # Handle single config['access'] line, add update line excluding configs already set - if len(flask_visitor.config_access) == 1 and isinstance( - single_config := flask_visitor.config_access[0], cst.Assign - ): - defined_config = self._get_config_from_slice(single_config) - defined_key = list(defined_config.keys())[0] - configs_to_write = self.SECURE_SESSION_CONFIGS.copy() + configs_to_write = self.SECURE_SESSION_CONFIGS.copy() - for key, val in configs_to_write.items(): - if key in defined_config and val in self.SECURE_SESSION_CONFIGS[key]: - del configs_to_write[key] + for config_line in config_access: + match config_line: + case cst.Call(): + # app.config.update(...) + # defined_configs = self._get_configs(config_line) + for arg in config_line.args: + key = arg.keyword.value + val = [true_value(arg.value)] + if key in self.SECURE_SESSION_CONFIGS and val not in self.SECURE_SESSION_CONFIGS[key]: + del configs_to_write[key] + secure_val = self.SECURE_SESSION_CONFIGS[key][0] + arg.with_changes(value=cst.parse_expression(f"{secure_val}")) - # any of the secure sesh in defined config, we want to reuse the line - # but flip it if necessary - if defined_key in self.SECURE_SESSION_CONFIGS: - del configs_to_write[defined_key] - return self.reuse_config_subscript_line( - tree, flask_visitor.flask_app_name, configs_to_write, defined_key - ) - return self.insert_config_line_endof_mod( - tree, flask_visitor.flask_app_name, configs_to_write - ) + case cst.Assign(): + # app.config['...'] + defined_config = self._get_config_from_slice(config_line) + (key, vals), = defined_config.items() - defined_configs = self.get_defined_configs(flask_visitor.config_access) - breakpoint() - configs_to_write = self.SECURE_SESSION_CONFIGS.copy() - for key, vals in defined_configs.items(): - defined_val = vals[-1] - if key in self.SECURE_SESSION_CONFIGS and defined_val in self.SECURE_SESSION_CONFIGS[key]: - del configs_to_write[key] + defined_val = true_value(vals[-1]) + if key in self.SECURE_SESSION_CONFIGS and defined_val not in self.SECURE_SESSION_CONFIGS[key]: + del configs_to_write[key] + secure_val = self.SECURE_SESSION_CONFIGS[key][0] + config_line.with_changes(value=cst.parse_expression(f"{secure_val}")) - result = self.insert_config_line_endof_mod( + if isinstance(config_update := config_access[-1], cst.Call): + # If the last config access is of form `app.config.update...` + # reuse that line. + self.reuse_config_line(config_update, configs_to_write) + return tree + # todo: if there is an .update line, add values directly there + return self.insert_config_line_endof_mod( tree, flask_visitor.flask_app_name, configs_to_write ) - return result def get_defined_configs(self, config_access): all_defined_configs = {} @@ -126,10 +107,10 @@ def _get_config_from_slice(self, config_line: cst.Assign): return defined_configs def reuse_config_line( - self, original_node: cst.Module, app_name: str, configs: dict - ) -> cst.Module: + self, config_line: cst.Call, configs: dict + ) -> None: if not configs: - return original_node + return # TODO: record change # line_number is the end of the module where we will insert the new flag. # pos_to_match = self.node_position(original_node) @@ -140,15 +121,22 @@ def reuse_config_line( # self.file_context.codemod_changes.append( # Change(line_number, self.CHANGE_DESCRIPTION) # ) - config_string = ", ".join( - f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" - for key, value in configs.items() - if value and value[0] is not None + # config_string = ", ".join( + # f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" + # for key, value in configs.items() + # if value and value[0] is not None + # ) + # final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") + # new_body = original_node.body[:-1] + (final_line,) + from codemodder.codemods.api.helpers import NewArg + + to_add = [NewArg(name=key, value=str(vals[0]), add_if_missing=True) for key, vals in configs.items() if vals[0] is not None] + + new_args = self.replace_args( + config_line, to_add ) - # secure_configs = """SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax'""" - final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") - new_body = original_node.body[:-1] + (final_line,) - return original_node.with_changes(body=new_body) + # self.update_arg_target(config_line, new_args) + config_line.with_changes(args=new_args) def reuse_config_subscript_line( self, original_node: cst.Module, app_name: str, configs: dict, defined_key: str From a717f733d15f7f21bf874469360e2c0f2c33b3e6 Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Wed, 8 Nov 2023 10:26:00 -0300 Subject: [PATCH 4/8] full tests --- .../secure_flask_session_config.py | 366 +++++++----------- .../test_secure_flask_session_config.py | 87 +++-- 2 files changed, 201 insertions(+), 252 deletions(-) diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py index 7944b8b7..a67d65bf 100644 --- a/src/core_codemods/secure_flask_session_config.py +++ b/src/core_codemods/secure_flask_session_config.py @@ -1,18 +1,23 @@ import libcst as cst -from libcst.codemod import Codemod, CodemodContext, ContextAwareVisitor -from libcst.metadata import ParentNodeProvider, ScopeProvider +from libcst.codemod import Codemod, CodemodContext, ContextAwareTransformer +from libcst.metadata import ParentNodeProvider, ScopeProvider, PositionProvider from libcst import matchers from codemodder.codemods.base_codemod import ReviewGuidance from codemodder.codemods.api import BaseCodemod from codemodder.codemods.utils_mixin import NameResolutionMixin - - -class SecureFlaskSessionConfig(BaseCodemod): - METADATA_DEPENDENCIES = BaseCodemod.METADATA_DEPENDENCIES + ( - ParentNodeProvider, - ScopeProvider, - ) +from codemodder.codemods.base_visitor import UtilsMixin +from codemodder.codemods.base_visitor import BaseTransformer +from codemodder.change import Change +from codemodder.file_context import FileContext +from typing import Union + + +class SecureFlaskSessionConfig(BaseCodemod, Codemod): + # METADATA_DEPENDENCIES = BaseCodemod.METADATA_DEPENDENCIES + ( + # ParentNodeProvider, + # ScopeProvider, + # ) NAME = "secure-flask-session-configuration" SUMMARY = "UTODO" REVIEW_GUIDANCE = ReviewGuidance.MERGE_AFTER_REVIEW @@ -24,234 +29,167 @@ class SecureFlaskSessionConfig(BaseCodemod): } ] - SECURE_SESSION_CONFIGS = dict( - # None value indicates unassigned, using default is safe - # values in order of precedence - SESSION_COOKIE_HTTPONLY=[None, True], - SESSION_COOKIE_SECURE=[True], - SESSION_COOKIE_SAMESITE=["Lax", "Strict"], - ) + METADATA_DEPENDENCIES = (PositionProvider,) def transform_module_impl(self, tree: cst.Module) -> cst.Module: - flask_visitor = FindConfigCalls(self.context) - tree.visit(flask_visitor) - if not flask_visitor.flask_app_name: - return tree + flask_codemod = FixFlaskConfig(self.context, self.file_context) + result_tree = flask_codemod.transform_module(tree) - if len(config_access := flask_visitor.config_access) == 0: - return self.insert_config_line_endof_mod( - tree, flask_visitor.flask_app_name, self.SECURE_SESSION_CONFIGS - ) - - configs_to_write = self.SECURE_SESSION_CONFIGS.copy() - - for config_line in config_access: - match config_line: - case cst.Call(): - # app.config.update(...) - # defined_configs = self._get_configs(config_line) - for arg in config_line.args: - key = arg.keyword.value - val = [true_value(arg.value)] - if key in self.SECURE_SESSION_CONFIGS and val not in self.SECURE_SESSION_CONFIGS[key]: - del configs_to_write[key] - secure_val = self.SECURE_SESSION_CONFIGS[key][0] - arg.with_changes(value=cst.parse_expression(f"{secure_val}")) - - case cst.Assign(): - # app.config['...'] - defined_config = self._get_config_from_slice(config_line) - - (key, vals), = defined_config.items() - - defined_val = true_value(vals[-1]) - if key in self.SECURE_SESSION_CONFIGS and defined_val not in self.SECURE_SESSION_CONFIGS[key]: - del configs_to_write[key] - secure_val = self.SECURE_SESSION_CONFIGS[key][0] - config_line.with_changes(value=cst.parse_expression(f"{secure_val}")) - - if isinstance(config_update := config_access[-1], cst.Call): - # If the last config access is of form `app.config.update...` - # reuse that line. - self.reuse_config_line(config_update, configs_to_write) + if not flask_codemod.flask_app_name: return tree - # todo: if there is an .update line, add values directly there - return self.insert_config_line_endof_mod( - tree, flask_visitor.flask_app_name, configs_to_write - ) - - def get_defined_configs(self, config_access): - all_defined_configs = {} - for config_line in config_access: - match config_line: - case cst.Call(): - # app.config.update(...) - defined_configs = self._get_configs(config_line) - all_defined_configs.update(defined_configs) - case cst.Assign(): - # app.config['...'] - defined_configs = self._get_config_from_slice(config_line) - all_defined_configs.update(defined_configs) - - return all_defined_configs - def _get_configs(self, config_line: cst.Call): - defined_configs = {} - for arg in config_line.args: - defined_configs[arg.keyword.value] = [true_value(arg.value)] - return defined_configs - - def _get_config_from_slice(self, config_line: cst.Assign): - defined_configs = {} - key = true_value(config_line.targets[0].target.slice[0].slice.value) - defined_configs[key] = [true_value(config_line.value)] - return defined_configs - - def reuse_config_line( - self, config_line: cst.Call, configs: dict - ) -> None: - if not configs: - return - # TODO: record change - # line_number is the end of the module where we will insert the new flag. - # pos_to_match = self.node_position(original_node) - # line_number = pos_to_match.end.line - # self.changes_in_file.append( - # Change(line_number, DjangoSessionCookieSecureOff.CHANGE_DESCRIPTION) - # ) - # self.file_context.codemod_changes.append( - # Change(line_number, self.CHANGE_DESCRIPTION) - # ) - # config_string = ", ".join( - # f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" - # for key, value in configs.items() - # if value and value[0] is not None - # ) - # final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") - # new_body = original_node.body[:-1] + (final_line,) - from codemodder.codemods.api.helpers import NewArg - - to_add = [NewArg(name=key, value=str(vals[0]), add_if_missing=True) for key, vals in configs.items() if vals[0] is not None] - - new_args = self.replace_args( - config_line, to_add - ) - # self.update_arg_target(config_line, new_args) - config_line.with_changes(args=new_args) - def reuse_config_subscript_line( - self, original_node: cst.Module, app_name: str, configs: dict, defined_key: str + if flask_codemod.configs_to_write: + return self.insert_secure_configs( + tree, + result_tree, + flask_codemod.flask_app_name, + flask_codemod.configs_to_write, + ) + return result_tree + + def insert_secure_configs( + self, + original_node: cst.Module, + updated_node: cst.Module, + app_name: str, + configs: dict, ) -> cst.Module: if not configs: - return original_node - # TODO: record change - # line_number is the end of the module where we will insert the new flag. - # pos_to_match = self.node_position(original_node) - # line_number = pos_to_match.end.line - # self.changes_in_file.append( - # Change(line_number, DjangoSessionCookieSecureOff.CHANGE_DESCRIPTION) - # ) - # self.file_context.codemod_changes.append( - # Change(line_number, self.CHANGE_DESCRIPTION) - # ) - config_string = ", ".join( - f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" - for key, value in configs.items() - if value and value[0] is not None - ) - - final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") - secure_val = ( - self.SECURE_SESSION_CONFIGS[defined_key][0] - or self.SECURE_SESSION_CONFIGS[defined_key][1] - ) - final_config_subscript_line = cst.parse_statement( - f"{app_name}.config['{defined_key}'] = {secure_val}" - ) - new_body = original_node.body[:-1] + ( - final_config_subscript_line, - final_line, - ) - return original_node.with_changes(body=new_body) + return updated_node - def insert_config_line_endof_mod( - self, original_node: cst.Module, app_name: str, configs: dict - ) -> cst.Module: - if not configs: - return original_node - # TODO: record change - # line_number is the end of the module where we will insert the new flag. - # pos_to_match = self.node_position(original_node) - # line_number = pos_to_match.end.line - # self.changes_in_file.append( - # Change(line_number, DjangoSessionCookieSecureOff.CHANGE_DESCRIPTION) - # ) - # self.file_context.codemod_changes.append( - # Change(line_number, self.CHANGE_DESCRIPTION) - # ) config_string = ", ".join( f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" for key, value in configs.items() if value and value[0] is not None ) if not config_string: - return original_node + return updated_node + + self.report_change_endof_module(original_node) final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") - new_body = original_node.body + (final_line,) - return original_node.with_changes(body=new_body) + new_body = updated_node.body + (final_line,) + return updated_node.with_changes(body=new_body) + + def report_change_endof_module(self, original_node: cst.Module) -> None: + # line_number is the end of the module where we will insert the new line. + pos_to_match = self.node_position(original_node) + line_number = pos_to_match.end.line + self.file_context.codemod_changes.append( + Change(line_number, self.CHANGE_DESCRIPTION) + ) -class FindConfigCalls(ContextAwareVisitor, NameResolutionMixin): +class FixFlaskConfig(BaseTransformer, NameResolutionMixin): """ Visitor to find calls to flask.Flask and related `.config` accesses. """ - METADATA_DEPENDENCIES = (ParentNodeProvider,) + METADATA_DEPENDENCIES = (PositionProvider, ParentNodeProvider) + SECURE_SESSION_CONFIGS = dict( + # None value indicates unassigned, using default is safe + # values in order of precedence + SESSION_COOKIE_HTTPONLY=[None, True], + SESSION_COOKIE_SECURE=[True], + SESSION_COOKIE_SAMESITE=["Lax", "Strict"], + ) - def __init__(self, context: CodemodContext) -> None: - self.config_access: list = [] + def __init__(self, codemod_context: CodemodContext, file_context: FileContext): + super().__init__(codemod_context, []) self.flask_app_name = "" - super().__init__(context) - - def _find_config_accesses(self, flask_app_attr: cst.AnnAssign | cst.Assign): - assignments = self.find_assignments(flask_app_attr) - for assignment in assignments: - if assignment.references: - # Flask app instance is accessed - references_to_app = [x.node for x in assignment.references] - for node in references_to_app: - parent = self.get_metadata(ParentNodeProvider, node) - match parent: - case cst.Attribute(): - config = cst.Name(value="config") - if matchers.matches( - parent, matchers.Attribute(value=node, attr=config) - ): - gparent = self.get_metadata(ParentNodeProvider, parent) - ggparent = self.get_metadata( - ParentNodeProvider, gparent - ) - if matchers.matches(gparent, matchers.Subscript()): - gggparent = self.get_metadata( - ParentNodeProvider, ggparent - ) - self.config_access.append(gggparent) - else: - self.config_access.append(ggparent) - - def leave_Call(self, original_node: cst.Call) -> None: + self.configs_to_write = self.SECURE_SESSION_CONFIGS.copy() + self.file_context = file_context + + def _store_flask_app(self, original_node) -> None: + flask_app_parent = self.get_metadata(ParentNodeProvider, original_node) + match flask_app_parent: + case cst.AnnAssign() | cst.Assign(): + flask_app_attr = flask_app_parent.targets[0].target + self.flask_app_name = flask_app_attr.value + + def _remove_config(self, key): + try: + del self.configs_to_write[key] + except KeyError: + pass + + def _get_secure_config_val(self, key): + val = self.SECURE_SESSION_CONFIGS[key][0] or self.SECURE_SESSION_CONFIGS[key][1] + return cst.parse_expression(f'"{val}"' if isinstance(val, str) else f"{val}") + + @property + def flask_app_is_assigned(self): + return bool(self.flask_app_name) + + def leave_Call(self, original_node: cst.Call, updated_node: cst.Call): true_name = self.find_base_name(original_node.func) if true_name == "flask.Flask": - flask_app_parent = self.get_metadata(ParentNodeProvider, original_node) - match flask_app_parent: - case cst.AnnAssign() | cst.Assign(): - flask_app_attr = flask_app_parent.targets[0].target - self.flask_app_name = flask_app_attr.value - self._find_config_accesses(flask_app_attr) + self._store_flask_app(original_node) + + if self.flask_app_is_assigned and self._is_config_update_call(original_node): + return self.call_node_with_secure_configs(original_node, updated_node) + return updated_node + + def call_node_with_secure_configs( + self, original_node: cst.Call, updated_node: cst.Call + ) -> cst.Call: + new_args = [] + for arg in updated_node.args: + if (key := arg.keyword.value) in self.SECURE_SESSION_CONFIGS: + self._remove_config(key) + if true_value(arg.value) not in self.SECURE_SESSION_CONFIGS[key]: + safe_value = self._get_secure_config_val(key) + arg = arg.with_changes(value=safe_value) + new_args.append(arg) + + if updated_node.args != new_args: + self.report_change(original_node) + return updated_node.with_changes(args=new_args) + + def leave_Assign(self, original_node: cst.Assign, updated_node: cst.Assign): + if self.flask_app_is_assigned and self._is_config_subscript(original_node): + return self.assign_node_with_secure_config(original_node, updated_node) + return updated_node + + def assign_node_with_secure_config( + self, original_node: cst.Assign, updated_node: cst.Assign + ) -> cst.Assign: + key = true_value(updated_node.targets[0].target.slice[0].slice.value) + if key in self.SECURE_SESSION_CONFIGS: + self._remove_config(key) + if true_value(updated_node.value) not in self.SECURE_SESSION_CONFIGS[key]: + safe_value = self._get_secure_config_val(key) + self.report_change(original_node) + return updated_node.with_changes(value=safe_value) + return updated_node + + def _is_config_update_call(self, original_node: cst.Call): + config = cst.Name(value="config") + app_name = cst.Name(value=self.flask_app_name) + app_config_node = cst.Attribute(value=app_name, attr=config) + update = cst.Name(value="update") + return matchers.matches( + original_node.func, matchers.Attribute(value=app_config_node, attr=update) + ) + + def _is_config_subscript(self, original_node: cst.Assign): + config = cst.Name(value="config") + app_name = cst.Name(value=self.flask_app_name) + app_config_node = cst.Attribute(value=app_name, attr=config) + return matchers.matches( + original_node.targets[0].target, matchers.Subscript(value=app_config_node) + ) - return original_node + def report_change(self, original_node): + # TODO: GET POS TO WORK + + # line_number = self.lineno_for_node(original_node) + line_number = self.lineno_for_node(original_node) + self.file_context.codemod_changes.append( + Change(line_number, SecureFlaskSessionConfig.CHANGE_DESCRIPTION) + ) -def true_value(node: cst.Name | cst.SimpleString): +def true_value(node: cst.Name | cst.SimpleString) -> str | int | bool: # todo: move to a more general util from codemodder.project_analysis.file_parsers.utils import clean_simplestring @@ -268,11 +206,5 @@ def true_value(node: cst.Name | cst.SimpleString): return True elif val.lower() == "false": return False - try: - return int(val) - except ValueError: - try: - return float(val) - except ValueError: - # If no conversion worked, return the original string - return val + return val + return "" diff --git a/tests/codemods/test_secure_flask_session_config.py b/tests/codemods/test_secure_flask_session_config.py index f33cf9b8..6c7fd2a8 100644 --- a/tests/codemods/test_secure_flask_session_config.py +++ b/tests/codemods/test_secure_flask_session_config.py @@ -19,6 +19,7 @@ def test_no_flask_app(self, tmpdir): response.set_cookie("name", "value") """ self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 def test_app_not_accessed(self, tmpdir): input_code = """\ @@ -37,7 +38,7 @@ def test_app_not_accessed(self, tmpdir): app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - # assert len(self.file_context.codemod_changes) == 1 + assert len(self.file_context.codemod_changes) == 1 def test_app_defined_separate_module(self, tmpdir): # TODO: test this as an integration test with two real modules @@ -57,6 +58,7 @@ def test_app_not_assigned(self, tmpdir): print(1) """ self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 def test_app_accessed_config_not_called(self, tmpdir): input_code = """\ @@ -75,7 +77,7 @@ def test_app_accessed_config_not_called(self, tmpdir): # more code """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - # assert len(self.file_context.codemod_changes) == 1 + assert len(self.file_context.codemod_changes) == 1 def test_from_import(self, tmpdir): input_code = """\ @@ -94,7 +96,7 @@ def test_from_import(self, tmpdir): # more code """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - # assert len(self.file_context.codemod_changes) == 1 + assert len(self.file_context.codemod_changes) == 1 def test_import_alias(self, tmpdir): input_code = f"""\ @@ -111,7 +113,7 @@ def test_import_alias(self, tmpdir): # more code """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - # assert len(self.file_context.codemod_changes) == 1 + assert len(self.file_context.codemod_changes) == 1 @pytest.mark.parametrize( "config_lines,expected_config_lines", @@ -135,17 +137,32 @@ def test_import_alias(self, tmpdir): """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), + ( + """app.config.update(SECRET_KEY='123SOMEKEY') +var = 1""", + """app.config.update(SECRET_KEY='123SOMEKEY') +var = 1 +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + ), ( """app.config.update(SECRET_KEY='123SOMEKEY')""", - """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax', SECRET_KEY='123SOMEKEY')""", + """app.config.update(SECRET_KEY='123SOMEKEY') +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( """app.config.update(SESSION_COOKIE_SECURE=True)""", - """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_SECURE=True) +app.config.update(SESSION_COOKIE_SAMESITE='Lax')""", ), ( """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", - """app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True) +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + ), + ( + """app.config.update(SESSION_COOKIE_HTTPONLY=False)""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True) +app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( """app.config['SESSION_COOKIE_SECURE'] = False""", @@ -158,40 +175,40 @@ def test_import_alias(self, tmpdir): app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - '''app.config["SESSION_COOKIE_SECURE"] = True + """app.config["SESSION_COOKIE_SECURE"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" -''', - '''app.config["SESSION_COOKIE_SECURE"] = True +""", + """app.config["SESSION_COOKIE_SECURE"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" -''', +""", ), ( - '''app.config["SESSION_COOKIE_SECURE"] = False + """app.config["SESSION_COOKIE_SECURE"] = False app.config["SESSION_COOKIE_SAMESITE"] = None -''', - '''app.config["SESSION_COOKIE_SECURE"] = True +""", + """app.config["SESSION_COOKIE_SECURE"] = True app.config["SESSION_COOKIE_SAMESITE"] = "Lax" -''', +""", + ), + ( + """app.config["SESSION_COOKIE_SECURE"] = False +app.config["SESSION_COOKIE_HTTPONLY"] = False +app.config["SESSION_COOKIE_SAMESITE"] = "Strict" +""", + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_HTTPONLY"] = True +app.config["SESSION_COOKIE_SAMESITE"] = "Strict" +""", + ), + ( + """app.config["SESSION_COOKIE_SECURE"] = False +app.config["SESSION_COOKIE_SECURE"] = True +""", + """app.config["SESSION_COOKIE_SECURE"] = True +app.config["SESSION_COOKIE_SECURE"] = True +app.config.update(SESSION_COOKIE_SAMESITE='Lax') +""", ), -# ( -# '''app.config["SESSION_COOKIE_SECURE"] = False -# app.config["SESSION_COOKIE_HTTPONLY"] = False -# app.config["SESSION_COOKIE_SAMESITE"] = "Strict" -# ''', -# '''app.config["SESSION_COOKIE_SECURE"] = True -# app.config["SESSION_COOKIE_HTTPONLY"] = True -# app.config["SESSION_COOKIE_SAMESITE"] = "Strict" -# ''', -# ), - # ( - # """app.config["SESSION_COOKIE_SECURE"] = False - # app.config["SESSION_COOKIE_SECURE"] = True - # """, - # """app.config["SESSION_COOKIE_SECURE"] = False - # app.config["SESSION_COOKIE_SECURE"] = True - # app.config.update(SESSION_COOKIE_SAMESITE='Lax') - # """, - # ), ], ) def test_config_accessed_variations( @@ -231,4 +248,4 @@ def configure(): # either within configure() call or after it's called """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - # assert len(self.file_context.codemod_changes) == 1 + assert len(self.file_context.codemod_changes) == 1 From 7e6af55f5290f818c3ce4c264e5347f1535c2d3f Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Mon, 20 Nov 2023 13:43:37 -0300 Subject: [PATCH 5/8] do not add default configs --- .../file_parsers/setup_py_file_parser.py | 2 +- .../project_analysis/file_parsers/utils.py | 7 - src/codemodder/utils/utils.py | 21 +++ .../secure_flask_session_config.py | 156 ++++++++---------- .../test_secure_flask_session_config.py | 80 ++------- 5 files changed, 104 insertions(+), 162 deletions(-) create mode 100644 src/codemodder/utils/utils.py diff --git a/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py b/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py index 2fbd78d7..371d0a59 100644 --- a/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py +++ b/src/codemodder/project_analysis/file_parsers/setup_py_file_parser.py @@ -1,11 +1,11 @@ from codemodder.project_analysis.file_parsers.package_store import PackageStore +from codemodder.utils.utils import clean_simplestring from pathlib import Path import libcst as cst from libcst import matchers from packaging.requirements import Requirement from .base_parser import BaseParser -from .utils import clean_simplestring class SetupPyParser(BaseParser): diff --git a/src/codemodder/project_analysis/file_parsers/utils.py b/src/codemodder/project_analysis/file_parsers/utils.py index 4513523e..e69de29b 100644 --- a/src/codemodder/project_analysis/file_parsers/utils.py +++ b/src/codemodder/project_analysis/file_parsers/utils.py @@ -1,7 +0,0 @@ -import libcst as cst - - -def clean_simplestring(node: cst.SimpleString | str) -> str: - if isinstance(node, str): - return node.strip('"') - return node.raw_value diff --git a/src/codemodder/utils/utils.py b/src/codemodder/utils/utils.py new file mode 100644 index 00000000..6ec7f6e6 --- /dev/null +++ b/src/codemodder/utils/utils.py @@ -0,0 +1,21 @@ +import libcst as cst + + +def clean_simplestring(node: cst.SimpleString | str) -> str: + if isinstance(node, str): + return node.strip('"') + return node.raw_value + + +def true_value(node: cst.Name | cst.SimpleString) -> str | int | bool: + match node: + case cst.SimpleString(): + return clean_simplestring(node) + case cst.Name(): + val = node.value + if val.lower() == "true": + return True + elif val.lower() == "false": + return False + return val + return "" diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py index a67d65bf..72f6e391 100644 --- a/src/core_codemods/secure_flask_session_config.py +++ b/src/core_codemods/secure_flask_session_config.py @@ -1,36 +1,33 @@ import libcst as cst -from libcst.codemod import Codemod, CodemodContext, ContextAwareTransformer -from libcst.metadata import ParentNodeProvider, ScopeProvider, PositionProvider +from libcst.codemod import Codemod, CodemodContext +from libcst.metadata import ParentNodeProvider, PositionProvider from libcst import matchers from codemodder.codemods.base_codemod import ReviewGuidance from codemodder.codemods.api import BaseCodemod from codemodder.codemods.utils_mixin import NameResolutionMixin -from codemodder.codemods.base_visitor import UtilsMixin +from codemodder.utils.utils import true_value from codemodder.codemods.base_visitor import BaseTransformer from codemodder.change import Change from codemodder.file_context import FileContext -from typing import Union class SecureFlaskSessionConfig(BaseCodemod, Codemod): - # METADATA_DEPENDENCIES = BaseCodemod.METADATA_DEPENDENCIES + ( - # ParentNodeProvider, - # ScopeProvider, - # ) NAME = "secure-flask-session-configuration" - SUMMARY = "UTODO" + SUMMARY = "Flip Insecure `Flask` Session Configurations" REVIEW_GUIDANCE = ReviewGuidance.MERGE_AFTER_REVIEW - DESCRIPTION = "TODO" + DESCRIPTION = "Flip Flask session configuration if defined as insecure." REFERENCES = [ { - "url": "todo", + "url": "https://owasp.org/www-community/controls/SecureCookieAttribute", "description": "", - } + }, + { + "url": "https://cheatsheetseries.owasp.org/cheatsheets/Session_Management_Cheat_Sheet.html", + "description": "", + }, ] - METADATA_DEPENDENCIES = (PositionProvider,) - def transform_module_impl(self, tree: cst.Module) -> cst.Module: flask_codemod = FixFlaskConfig(self.context, self.file_context) result_tree = flask_codemod.transform_module(tree) @@ -38,45 +35,47 @@ def transform_module_impl(self, tree: cst.Module) -> cst.Module: if not flask_codemod.flask_app_name: return tree - if flask_codemod.configs_to_write: - return self.insert_secure_configs( - tree, - result_tree, - flask_codemod.flask_app_name, - flask_codemod.configs_to_write, - ) + # Later: if we want to write at the end of the module any + # default insecure configs. + # if flask_codemod.configs_to_write: + # return self.insert_secure_configs( + # tree, + # result_tree, + # flask_codemod.flask_app_name, + # flask_codemod.configs_to_write, + # ) return result_tree - def insert_secure_configs( - self, - original_node: cst.Module, - updated_node: cst.Module, - app_name: str, - configs: dict, - ) -> cst.Module: - if not configs: - return updated_node - - config_string = ", ".join( - f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" - for key, value in configs.items() - if value and value[0] is not None - ) - if not config_string: - return updated_node - - self.report_change_endof_module(original_node) - final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") - new_body = updated_node.body + (final_line,) - return updated_node.with_changes(body=new_body) - - def report_change_endof_module(self, original_node: cst.Module) -> None: - # line_number is the end of the module where we will insert the new line. - pos_to_match = self.node_position(original_node) - line_number = pos_to_match.end.line - self.file_context.codemod_changes.append( - Change(line_number, self.CHANGE_DESCRIPTION) - ) + # def insert_secure_configs( + # self, + # original_node: cst.Module, + # updated_node: cst.Module, + # app_name: str, + # configs: dict, + # ) -> cst.Module: + # if not configs: + # return updated_node + # + # config_string = ", ".join( + # f"{key}='{value[0]}'" if isinstance(value[0], str) else f"{key}={value[0]}" + # for key, value in configs.items() + # if value and value[0] is not None + # ) + # if not config_string: + # return updated_node + # + # self.report_change_endof_module(original_node) + # final_line = cst.parse_statement(f"{app_name}.config.update({config_string})") + # new_body = updated_node.body + (final_line,) + # return updated_node.with_changes(body=new_body) + # + # def report_change_endof_module(self, original_node: cst.Module) -> None: + # # line_number is the end of the module where we will insert the new line. + # pos_to_match = self.node_position(original_node) + # line_number = pos_to_match.end.line + # self.file_context.codemod_changes.append( + # Change(line_number, self.CHANGE_DESCRIPTION) + # ) class FixFlaskConfig(BaseTransformer, NameResolutionMixin): @@ -85,18 +84,19 @@ class FixFlaskConfig(BaseTransformer, NameResolutionMixin): """ METADATA_DEPENDENCIES = (PositionProvider, ParentNodeProvider) - SECURE_SESSION_CONFIGS = dict( + SECURE_SESSION_CONFIGS = { # None value indicates unassigned, using default is safe # values in order of precedence - SESSION_COOKIE_HTTPONLY=[None, True], - SESSION_COOKIE_SECURE=[True], - SESSION_COOKIE_SAMESITE=["Lax", "Strict"], - ) + "SESSION_COOKIE_HTTPONLY": [None, True], + "SESSION_COOKIE_SECURE": [True], + "SESSION_COOKIE_SAMESITE": ["Lax", "Strict"], + } def __init__(self, codemod_context: CodemodContext, file_context: FileContext): super().__init__(codemod_context, []) self.flask_app_name = "" - self.configs_to_write = self.SECURE_SESSION_CONFIGS.copy() + # Later: if we want to store configs to write later + # self.configs_to_write = self.SECURE_SESSION_CONFIGS.copy() self.file_context = file_context def _store_flask_app(self, original_node) -> None: @@ -106,11 +106,11 @@ def _store_flask_app(self, original_node) -> None: flask_app_attr = flask_app_parent.targets[0].target self.flask_app_name = flask_app_attr.value - def _remove_config(self, key): - try: - del self.configs_to_write[key] - except KeyError: - pass + # def _remove_config(self, key): + # try: + # del self.configs_to_write[key] + # except KeyError: + # pass def _get_secure_config_val(self, key): val = self.SECURE_SESSION_CONFIGS[key][0] or self.SECURE_SESSION_CONFIGS[key][1] @@ -135,8 +135,8 @@ def call_node_with_secure_configs( new_args = [] for arg in updated_node.args: if (key := arg.keyword.value) in self.SECURE_SESSION_CONFIGS: - self._remove_config(key) - if true_value(arg.value) not in self.SECURE_SESSION_CONFIGS[key]: + # self._remove_config(key) + if true_value(arg.value) not in self.SECURE_SESSION_CONFIGS[key]: # type: ignore safe_value = self._get_secure_config_val(key) arg = arg.with_changes(value=safe_value) new_args.append(arg) @@ -155,8 +155,8 @@ def assign_node_with_secure_config( ) -> cst.Assign: key = true_value(updated_node.targets[0].target.slice[0].slice.value) if key in self.SECURE_SESSION_CONFIGS: - self._remove_config(key) - if true_value(updated_node.value) not in self.SECURE_SESSION_CONFIGS[key]: + # self._remove_config(key) + if true_value(updated_node.value) not in self.SECURE_SESSION_CONFIGS[key]: # type: ignore safe_value = self._get_secure_config_val(key) self.report_change(original_node) return updated_node.with_changes(value=safe_value) @@ -180,31 +180,7 @@ def _is_config_subscript(self, original_node: cst.Assign): ) def report_change(self, original_node): - # TODO: GET POS TO WORK - - # line_number = self.lineno_for_node(original_node) line_number = self.lineno_for_node(original_node) self.file_context.codemod_changes.append( Change(line_number, SecureFlaskSessionConfig.CHANGE_DESCRIPTION) ) - - -def true_value(node: cst.Name | cst.SimpleString) -> str | int | bool: - # todo: move to a more general util - from codemodder.project_analysis.file_parsers.utils import clean_simplestring - - # convert 'True' to True, etc - # '123' to 123 - # leave strs as they are - # Try to convert the string to a boolean, integer, or float - match node: - case cst.SimpleString(): - return clean_simplestring(node) - case cst.Name(): - val = node.value - if val.lower() == "true": - return True - elif val.lower() == "false": - return False - return val - return "" diff --git a/tests/codemods/test_secure_flask_session_config.py b/tests/codemods/test_secure_flask_session_config.py index 6c7fd2a8..f38b7501 100644 --- a/tests/codemods/test_secure_flask_session_config.py +++ b/tests/codemods/test_secure_flask_session_config.py @@ -21,25 +21,6 @@ def test_no_flask_app(self, tmpdir): self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) assert len(self.file_context.codemod_changes) == 0 - def test_app_not_accessed(self, tmpdir): - input_code = """\ - import flask - - app = flask.Flask(__name__) - # more code - print(1) - """ - expexted_output = """\ - import flask - - app = flask.Flask(__name__) - # more code - print(1) - app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - """ - self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - assert len(self.file_context.codemod_changes) == 1 - def test_app_defined_separate_module(self, tmpdir): # TODO: test this as an integration test with two real modules input_code = """\ @@ -68,16 +49,8 @@ def test_app_accessed_config_not_called(self, tmpdir): app.secret_key = "dev" # more code """ - expexted_output = """\ - import flask - - app = flask.Flask(__name__) - app.secret_key = "dev" - app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - # more code - """ - self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) - assert len(self.file_context.codemod_changes) == 1 + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 def test_from_import(self, tmpdir): input_code = """\ @@ -85,32 +58,32 @@ def test_from_import(self, tmpdir): app = flask.Flask(__name__) app.secret_key = "dev" - # more code + app.config.update(SESSION_COOKIE_SECURE=False) """ expexted_output = """\ from flask import Flask app = flask.Flask(__name__) app.secret_key = "dev" - app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') - # more code + app.config.update(SESSION_COOKIE_SECURE=True) """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) assert len(self.file_context.codemod_changes) == 1 def test_import_alias(self, tmpdir): - input_code = f"""\ + input_code = """\ from flask import Flask as flask_app app = flask_app(__name__) app.secret_key = "dev" # more code + app.config.update(SESSION_COOKIE_SECURE=False) """ - expexted_output = f"""\ + expexted_output = """\ from flask import Flask as flask_app app = flask_app(__name__) app.secret_key = "dev" - app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax') # more code + app.config.update(SESSION_COOKIE_SECURE=True) """ self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) assert len(self.file_context.codemod_changes) == 1 @@ -120,59 +93,39 @@ def test_import_alias(self, tmpdir): [ ( """app.config""", - """app.config -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config""", ), ( """app.config["TESTING"] = True""", - """app.config["TESTING"] = True -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config["TESTING"] = True""", ), ( """app.config.testing = True""", - """app.config.testing = True -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.testing = True""", ), ( """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", """app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", ), ( - """app.config.update(SECRET_KEY='123SOMEKEY') -var = 1""", - """app.config.update(SECRET_KEY='123SOMEKEY') -var = 1 -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", - ), - ( - """app.config.update(SECRET_KEY='123SOMEKEY')""", - """app.config.update(SECRET_KEY='123SOMEKEY') -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", - ), - ( """app.config.update(SESSION_COOKIE_SECURE=True)""", - """app.config.update(SESSION_COOKIE_SECURE=True) -app.config.update(SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_SECURE=True)""", ), ( """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", - """app.config.update(SESSION_COOKIE_HTTPONLY=True) -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", ), ( """app.config.update(SESSION_COOKIE_HTTPONLY=False)""", - """app.config.update(SESSION_COOKIE_HTTPONLY=True) -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config.update(SESSION_COOKIE_HTTPONLY=True)""", ), ( """app.config['SESSION_COOKIE_SECURE'] = False""", - """app.config['SESSION_COOKIE_SECURE'] = True -app.config.update(SESSION_COOKIE_SAMESITE='Lax')""", + """app.config['SESSION_COOKIE_SECURE'] = True""", ), ( """app.config['SESSION_COOKIE_HTTPONLY'] = False""", - """app.config['SESSION_COOKIE_HTTPONLY'] = True -app.config.update(SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE='Lax')""", + """app.config['SESSION_COOKIE_HTTPONLY'] = True""", ), ( """app.config["SESSION_COOKIE_SECURE"] = True @@ -206,7 +159,6 @@ def test_import_alias(self, tmpdir): """, """app.config["SESSION_COOKIE_SECURE"] = True app.config["SESSION_COOKIE_SECURE"] = True -app.config.update(SESSION_COOKIE_SAMESITE='Lax') """, ), ], From 04517ad7ed63168fe9f5d889e1e1a4fc3d6a7f43 Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Mon, 20 Nov 2023 13:53:39 -0300 Subject: [PATCH 6/8] add integration test --- .../test_secure_flask_session_config.py | 16 ++++++++++++++++ src/core_codemods/__init__.py | 2 ++ ..._python_secure-flask-session-configuration.md | 13 +++++++++++++ tests/samples/flask_app.py | 6 ++++++ 4 files changed, 37 insertions(+) create mode 100644 integration_tests/test_secure_flask_session_config.py create mode 100644 src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md create mode 100644 tests/samples/flask_app.py diff --git a/integration_tests/test_secure_flask_session_config.py b/integration_tests/test_secure_flask_session_config.py new file mode 100644 index 00000000..f138ba18 --- /dev/null +++ b/integration_tests/test_secure_flask_session_config.py @@ -0,0 +1,16 @@ +from core_codemods.secure_flask_session_config import SecureFlaskSessionConfig +from integration_tests.base_test import ( + BaseIntegrationTest, + original_and_expected_from_code_path, +) + + +class TestSecureFlaskSessionConfig(BaseIntegrationTest): + codemod = SecureFlaskSessionConfig + code_path = "tests/samples/flask_app.py" + original_code, expected_new_code = original_and_expected_from_code_path( + code_path, [(2, "app.config['SESSION_COOKIE_HTTPONLY'] = True\n")] + ) + expected_diff = "--- \n+++ \n@@ -1,6 +1,6 @@\n from flask import Flask\n app = Flask(__name__)\n-app.config['SESSION_COOKIE_HTTPONLY'] = False\n+app.config['SESSION_COOKIE_HTTPONLY'] = True\n @app.route('/')\n def hello_world():\n return 'Hello World!'\n" + expected_line_change = "3" + change_description = SecureFlaskSessionConfig.CHANGE_DESCRIPTION diff --git a/src/core_codemods/__init__.py b/src/core_codemods/__init__.py index ee5dcd18..120915ce 100644 --- a/src/core_codemods/__init__.py +++ b/src/core_codemods/__init__.py @@ -27,6 +27,7 @@ from .use_generator import UseGenerator from .use_walrus_if import UseWalrusIf from .with_threading_lock import WithThreadingLock +from .secure_flask_session_config import SecureFlaskSessionConfig registry = CodemodCollection( origin="pixee", @@ -60,5 +61,6 @@ UseWalrusIf, WithThreadingLock, SQLQueryParameterization, + SecureFlaskSessionConfig, ], ) diff --git a/src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md b/src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md new file mode 100644 index 00000000..3c17e42e --- /dev/null +++ b/src/core_codemods/docs/pixee_python_secure-flask-session-configuration.md @@ -0,0 +1,13 @@ +Flask applications can configure sessions behavior at the application level. +This codemod looks for Flask application configuration that set `SESSION_COOKIE_HTTPONLY`, `SESSION_COOKIE_SECURE`, or `SESSION_COOKIE_SAMESITE` to an insecure value and changes it to a secure one. + +The changes from this codemod look like this: + +```diff + from flask import Flask + app = Flask(__name__) +- app.config['SESSION_COOKIE_HTTPONLY'] = False +- app.config.update(SESSION_COOKIE_SECURE=False) ++ app.config['SESSION_COOKIE_HTTPONLY'] = True ++ app.config.update(SESSION_COOKIE_SECURE=True) +``` diff --git a/tests/samples/flask_app.py b/tests/samples/flask_app.py new file mode 100644 index 00000000..879f6d37 --- /dev/null +++ b/tests/samples/flask_app.py @@ -0,0 +1,6 @@ +from flask import Flask +app = Flask(__name__) +app.config['SESSION_COOKIE_HTTPONLY'] = False +@app.route('/') +def hello_world(): + return 'Hello World!' From ec2e0cc3611d2c4587847b7a7172a728f0841659 Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Tue, 21 Nov 2023 08:21:37 -0300 Subject: [PATCH 7/8] add docs --- src/codemodder/scripts/generate_docs.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/codemodder/scripts/generate_docs.py b/src/codemodder/scripts/generate_docs.py index 4f39cfb6..bf253c4f 100644 --- a/src/codemodder/scripts/generate_docs.py +++ b/src/codemodder/scripts/generate_docs.py @@ -134,6 +134,10 @@ class DocMetadata: importance="Low", guidance_explained="We believe this replacement is safe and leads to better performance.", ), + "secure-flask-session-configuration": DocMetadata( + importance="Medium", + guidance_explained="Our change fixes explicitly insecure session configuration for a Flask application. However, there may be valid cases to use these insecure configurations, such as for testing or backward compatibility.", + ), } From 3bbbfeec04948c33dd9a376be6278a633b7a0ca2 Mon Sep 17 00:00:00 2001 From: clavedeluna Date: Tue, 21 Nov 2023 13:50:07 -0300 Subject: [PATCH 8/8] better error handling --- src/codemodder/utils/utils.py | 17 +++++++++ .../secure_flask_session_config.py | 22 +++++++----- .../test_secure_flask_session_config.py | 36 +++++++++++++++++-- 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/src/codemodder/utils/utils.py b/src/codemodder/utils/utils.py index 6ec7f6e6..4f223778 100644 --- a/src/codemodder/utils/utils.py +++ b/src/codemodder/utils/utils.py @@ -19,3 +19,20 @@ def true_value(node: cst.Name | cst.SimpleString) -> str | int | bool: return False return val return "" + + +def extract_targets_of_assignment( + assignment: cst.AnnAssign | cst.Assign | cst.WithItem | cst.NamedExpr, +) -> list[cst.BaseExpression]: + match assignment: + case cst.AnnAssign(): + if assignment.target: + return [assignment.target] + case cst.Assign(): + return [t.target for t in assignment.targets] + case cst.NamedExpr(): + return [assignment.target] + case cst.WithItem(): + if assignment.asname: + return [assignment.asname.name] + return [] diff --git a/src/core_codemods/secure_flask_session_config.py b/src/core_codemods/secure_flask_session_config.py index 72f6e391..0bc4c7ee 100644 --- a/src/core_codemods/secure_flask_session_config.py +++ b/src/core_codemods/secure_flask_session_config.py @@ -6,7 +6,7 @@ from codemodder.codemods.base_codemod import ReviewGuidance from codemodder.codemods.api import BaseCodemod from codemodder.codemods.utils_mixin import NameResolutionMixin -from codemodder.utils.utils import true_value +from codemodder.utils.utils import extract_targets_of_assignment, true_value from codemodder.codemods.base_visitor import BaseTransformer from codemodder.change import Change from codemodder.file_context import FileContext @@ -103,8 +103,12 @@ def _store_flask_app(self, original_node) -> None: flask_app_parent = self.get_metadata(ParentNodeProvider, original_node) match flask_app_parent: case cst.AnnAssign() | cst.Assign(): - flask_app_attr = flask_app_parent.targets[0].target - self.flask_app_name = flask_app_attr.value + targets = extract_targets_of_assignment(flask_app_parent) + # TODO: handle other assignments ex. l[0] = Flask(...) , a.b = Flask(...) + if targets and matchers.matches( + first_target := targets[0], matchers.Name() + ): + self.flask_app_name = first_target.value # def _remove_config(self, key): # try: @@ -163,18 +167,18 @@ def assign_node_with_secure_config( return updated_node def _is_config_update_call(self, original_node: cst.Call): - config = cst.Name(value="config") - app_name = cst.Name(value=self.flask_app_name) - app_config_node = cst.Attribute(value=app_name, attr=config) + config = matchers.Name(value="config") + app_name = matchers.Name(value=self.flask_app_name) + app_config_node = matchers.Attribute(value=app_name, attr=config) update = cst.Name(value="update") return matchers.matches( original_node.func, matchers.Attribute(value=app_config_node, attr=update) ) def _is_config_subscript(self, original_node: cst.Assign): - config = cst.Name(value="config") - app_name = cst.Name(value=self.flask_app_name) - app_config_node = cst.Attribute(value=app_name, attr=config) + config = matchers.Name(value="config") + app_name = matchers.Name(value=self.flask_app_name) + app_config_node = matchers.Attribute(value=app_name, attr=config) return matchers.matches( original_node.targets[0].target, matchers.Subscript(value=app_config_node) ) diff --git a/tests/codemods/test_secure_flask_session_config.py b/tests/codemods/test_secure_flask_session_config.py index f38b7501..0700e868 100644 --- a/tests/codemods/test_secure_flask_session_config.py +++ b/tests/codemods/test_secure_flask_session_config.py @@ -56,14 +56,14 @@ def test_from_import(self, tmpdir): input_code = """\ from flask import Flask - app = flask.Flask(__name__) + app = Flask(__name__) app.secret_key = "dev" app.config.update(SESSION_COOKIE_SECURE=False) """ expexted_output = """\ from flask import Flask - app = flask.Flask(__name__) + app = Flask(__name__) app.secret_key = "dev" app.config.update(SESSION_COOKIE_SECURE=True) """ @@ -88,6 +88,38 @@ def test_import_alias(self, tmpdir): self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) assert len(self.file_context.codemod_changes) == 1 + def test_annotated_assign(self, tmpdir): + input_code = """\ + import flask + app: flask.Flask = flask.Flask(__name__) + app.secret_key = "dev" + # more code + app.config.update(SESSION_COOKIE_SECURE=False) + """ + expexted_output = """\ + import flask + app: flask.Flask = flask.Flask(__name__) + app.secret_key = "dev" + # more code + app.config.update(SESSION_COOKIE_SECURE=True) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(expexted_output)) + assert len(self.file_context.codemod_changes) == 1 + + def test_other_assignment_type(self, tmpdir): + input_code = """\ + import flask + class AppStore: + pass + store = AppStore() + store.app = flask.Flask(__name__) + store.app.secret_key = "dev" + # more code + store.app.config.update(SESSION_COOKIE_SECURE=False) + """ + self.run_and_assert(tmpdir, dedent(input_code), dedent(input_code)) + assert len(self.file_context.codemod_changes) == 0 + @pytest.mark.parametrize( "config_lines,expected_config_lines", [