From 3c8aa325518258db40e496fda71ee51a6817e2c4 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Tue, 23 Oct 2018 16:14:45 +0200 Subject: [PATCH 1/8] Don't force fuzzing auth headers for API endpoints --- .../data/parsers/doc/open_api/requests.py | 22 +++++++++++++++++-- .../open_api/tests/example_specifications.py | 7 ++++++ .../doc/open_api/tests/test_requests.py | 21 ++++++++++++++++-- 3 files changed, 46 insertions(+), 4 deletions(-) diff --git a/w3af/core/data/parsers/doc/open_api/requests.py b/w3af/core/data/parsers/doc/open_api/requests.py index 9bf4fa5a54..41d815555f 100644 --- a/w3af/core/data/parsers/doc/open_api/requests.py +++ b/w3af/core/data/parsers/doc/open_api/requests.py @@ -83,19 +83,37 @@ def get_fuzzable_request(self, discover_fuzzable_headers=False): def _get_parameter_headers(self): """ Looks for all parameters which are passed to the endpoint via headers. + The method filters out headers with auth info + which are defined 'securityDefinitions' section of the API spec. :return: A list of unique header names. """ parameter_headers = set() - for parameter_name in self.parameters: - parameter = self.parameters[parameter_name] + for parameter_name, parameter in self.parameters.iteritems(): if parameter.location == 'header': + + # Skip auth headers. + if self._is_auth_header(parameter.name): + continue + parameter_headers.add(parameter.name) om.out.debug('Found a parameter header for %s endpoint: %s' % (self.operation.path_name, parameter.name)) return list(parameter_headers) + def _is_auth_header(self, name): + """ + :param name: Header name. + :return: True if this is an auth header, False otherwise. + """ + for key, auth in self.spec.security_definitions.iteritems(): + if hasattr(auth, 'location') and hasattr(auth, 'name') \ + and auth.location == 'header' and auth.name == name: + return True + + return False + def _bravado_construct_request(self): """ An example request dict from swagger's pet store application looks like: diff --git a/w3af/core/data/parsers/doc/open_api/tests/example_specifications.py b/w3af/core/data/parsers/doc/open_api/tests/example_specifications.py index dc45cb00f8..cd67bfd0f4 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/example_specifications.py +++ b/w3af/core/data/parsers/doc/open_api/tests/example_specifications.py @@ -79,6 +79,13 @@ def get_specification(self): return file('%s/data/multiple_paths_and_headers.json' % CURRENT_PATH).read() +class PetstoreModel(object): + + @staticmethod + def get_specification(): + return file('%s/data/swagger.json' % CURRENT_PATH).read() + + class IntParamPath(object): def get_specification(self): spec = APISpec( diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_requests.py b/w3af/core/data/parsers/doc/open_api/tests/test_requests.py index a845a6191b..a9c9a782d7 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_requests.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_requests.py @@ -37,11 +37,13 @@ ComplexDereferencedNestedModel, DereferencedPetStore, NestedModel, - ArrayModelItems) + ArrayModelItems, PetstoreModel) class TestRequests(unittest.TestCase): - def generate_response(self, specification_as_string): + + @staticmethod + def generate_response(specification_as_string): url = URL('http://www.w3af.com/swagger.json') headers = Headers([('content-type', 'application/json')]) return HTTPResponse(200, specification_as_string, headers, @@ -380,3 +382,18 @@ def test_dereferenced_pet_store(self): self.assertEqual(fuzzable_request.get_uri().url_string, e_url) self.assertEqual(fuzzable_request.get_headers(), e_headers) self.assertEqual(fuzzable_request.get_data(), e_data) + + def test_no_forcing_fuzzing_auth_headers(self): + specification_as_string = PetstoreModel().get_specification() + http_response = self.generate_response(specification_as_string) + handler = SpecificationHandler(http_response) + + data = [d for d in handler.get_api_information()] + + self.assertTrue(len(data) > 0) + for data_i in data: + factory = RequestFactory(*data_i) + fuzzable_request = factory.get_fuzzable_request(discover_fuzzable_headers=True) + fuzzing_headers = fuzzable_request.get_force_fuzzing_headers() + self.assertNotIn('api_key', fuzzing_headers) + self.assertNotIn('Authorization', fuzzing_headers) From 13a5372a5546df44e8b5f7d1b72225b498bb8a28 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Thu, 25 Oct 2018 14:01:42 +0200 Subject: [PATCH 2/8] Set fuzzable headers in OpenAPI class --- w3af/core/data/parsers/doc/open_api/main.py | 48 ++++++++++++++++- .../data/parsers/doc/open_api/requests.py | 52 ++----------------- .../parsers/doc/open_api/tests/test_main.py | 17 +++++- .../doc/open_api/tests/test_requests.py | 16 +----- 4 files changed, 68 insertions(+), 65 deletions(-) diff --git a/w3af/core/data/parsers/doc/open_api/main.py b/w3af/core/data/parsers/doc/open_api/main.py index d8a0785fae..fd59c40f0f 100644 --- a/w3af/core/data/parsers/doc/open_api/main.py +++ b/w3af/core/data/parsers/doc/open_api/main.py @@ -149,7 +149,12 @@ def parse(self): for data in specification_handler.get_api_information(): try: request_factory = RequestFactory(*data) - fuzzable_request = request_factory.get_fuzzable_request(self.discover_fuzzable_headers) + fuzzable_request = request_factory.get_fuzzable_request() + + if self.discover_fuzzable_headers: + operation = data[4] + headers = self._get_parameter_headers(operation) + fuzzable_request.set_force_fuzzing_headers(headers) except Exception, e: # # This is a strange situation because parsing of the OpenAPI @@ -180,7 +185,46 @@ def parse(self): self.api_calls.append(fuzzable_request) - def _should_audit(self, fuzzable_request): + def _get_parameter_headers(self, operation): + """ + Looks for all parameters which are passed to the endpoint via headers. + The method filters out headers with auth info + which are defined in 'securityDefinitions' section of the API spec. + + :param operation: An instance of Operation class + which represents the API endpoint. + :return: A list of unique header names. + """ + parameter_headers = set() + for parameter_name, parameter in operation.params.iteritems(): + if parameter.location == 'header': + + # Skip auth headers. + if self._is_auth_header(parameter.name, operation.swagger_spec): + continue + + parameter_headers.add(parameter.name) + om.out.debug('Found a parameter header for %s endpoint: %s' + % (operation.path_name, parameter.name)) + + return list(parameter_headers) + + @staticmethod + def _is_auth_header(name, spec): + """ + :param name: Header name. + :param spec: API specification. + :return: True if this is an auth header, False otherwise. + """ + for key, auth in spec.security_definitions.iteritems(): + if hasattr(auth, 'location') and hasattr(auth, 'name') \ + and auth.location == 'header' and auth.name == name: + return True + + return False + + @staticmethod + def _should_audit(fuzzable_request): """ We want to make sure that w3af doesn't delete all the items from the REST API, so we ignore DELETE calls. diff --git a/w3af/core/data/parsers/doc/open_api/requests.py b/w3af/core/data/parsers/doc/open_api/requests.py index 41d815555f..237758c810 100644 --- a/w3af/core/data/parsers/doc/open_api/requests.py +++ b/w3af/core/data/parsers/doc/open_api/requests.py @@ -56,13 +56,10 @@ def __init__(self, spec, api_resource_name, resource, operation_name, self.operation = operation self.parameters = parameters - def get_fuzzable_request(self, discover_fuzzable_headers=False): + def get_fuzzable_request(self): """ Creates a fuzzable request by querying different parts of the spec parameters, operation, etc. - - :param discover_fuzzable_headers: If it's set to true, - then all fuzzable headers will be added to the fuzzable request. :return: A fuzzable request. """ method = self.get_method() @@ -70,49 +67,10 @@ def get_fuzzable_request(self, discover_fuzzable_headers=False): headers = self.get_headers() data_container = self.get_data_container(headers) - fuzzable_request = FuzzableRequest(uri, - headers=headers, - post_data=data_container, - method=method) - - if discover_fuzzable_headers: - fuzzable_request.set_force_fuzzing_headers(self._get_parameter_headers()) - - return fuzzable_request - - def _get_parameter_headers(self): - """ - Looks for all parameters which are passed to the endpoint via headers. - The method filters out headers with auth info - which are defined 'securityDefinitions' section of the API spec. - - :return: A list of unique header names. - """ - parameter_headers = set() - for parameter_name, parameter in self.parameters.iteritems(): - if parameter.location == 'header': - - # Skip auth headers. - if self._is_auth_header(parameter.name): - continue - - parameter_headers.add(parameter.name) - om.out.debug('Found a parameter header for %s endpoint: %s' - % (self.operation.path_name, parameter.name)) - - return list(parameter_headers) - - def _is_auth_header(self, name): - """ - :param name: Header name. - :return: True if this is an auth header, False otherwise. - """ - for key, auth in self.spec.security_definitions.iteritems(): - if hasattr(auth, 'location') and hasattr(auth, 'name') \ - and auth.location == 'header' and auth.name == name: - return True - - return False + return FuzzableRequest(uri, + headers=headers, + post_data=data_container, + method=method) def _bravado_construct_request(self): """ diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_main.py b/w3af/core/data/parsers/doc/open_api/tests/test_main.py index 8be74587fb..399f3e5d16 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_main.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_main.py @@ -30,6 +30,7 @@ from w3af.core.data.parsers.doc.open_api import OpenAPI from w3af.core.data.url.HTTPResponse import HTTPResponse +from w3af.core.data.parsers.doc.open_api.tests.example_specifications import PetstoreModel # Order them to be able to easily assert things def by_path(fra, frb): @@ -434,7 +435,21 @@ def test_is_valid_json_or_yaml_false(self): http_resp = self.generate_response('"', 'image/jpeg') self.assertFalse(OpenAPI.is_valid_json_or_yaml(http_resp)) - def generate_response(self, specification_as_string, content_type='application/json'): + def test_no_forcing_fuzzing_auth_headers(self): + specification_as_string = PetstoreModel().get_specification() + http_response = self.generate_response(specification_as_string) + parser = OpenAPI(http_response) + parser.parse() + api_calls = parser.get_api_calls() + + self.assertTrue(len(api_calls) > 0) + for fuzzable_request in api_calls: + fuzzing_headers = fuzzable_request.get_force_fuzzing_headers() + self.assertNotIn('api_key', fuzzing_headers) + self.assertNotIn('Authorization', fuzzing_headers) + + @staticmethod + def generate_response(specification_as_string, content_type='application/json'): url = URL('http://www.w3af.com/swagger.json') headers = Headers([('content-type', content_type)]) return HTTPResponse(200, specification_as_string, headers, diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_requests.py b/w3af/core/data/parsers/doc/open_api/tests/test_requests.py index a9c9a782d7..1e999b610d 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_requests.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_requests.py @@ -37,7 +37,7 @@ ComplexDereferencedNestedModel, DereferencedPetStore, NestedModel, - ArrayModelItems, PetstoreModel) + ArrayModelItems) class TestRequests(unittest.TestCase): @@ -383,17 +383,3 @@ def test_dereferenced_pet_store(self): self.assertEqual(fuzzable_request.get_headers(), e_headers) self.assertEqual(fuzzable_request.get_data(), e_data) - def test_no_forcing_fuzzing_auth_headers(self): - specification_as_string = PetstoreModel().get_specification() - http_response = self.generate_response(specification_as_string) - handler = SpecificationHandler(http_response) - - data = [d for d in handler.get_api_information()] - - self.assertTrue(len(data) > 0) - for data_i in data: - factory = RequestFactory(*data_i) - fuzzable_request = factory.get_fuzzable_request(discover_fuzzable_headers=True) - fuzzing_headers = fuzzable_request.get_force_fuzzing_headers() - self.assertNotIn('api_key', fuzzing_headers) - self.assertNotIn('Authorization', fuzzing_headers) From 8a7af18d72b3e0409f9923f11b480245db9992d5 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Thu, 25 Oct 2018 14:55:49 +0200 Subject: [PATCH 3/8] Fuzz auth headers only once --- w3af/core/data/parsers/doc/open_api/main.py | 32 +++++++++++++------ .../parsers/doc/open_api/tests/test_main.py | 12 ++++++- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/w3af/core/data/parsers/doc/open_api/main.py b/w3af/core/data/parsers/doc/open_api/main.py index fd59c40f0f..87d4b2900a 100644 --- a/w3af/core/data/parsers/doc/open_api/main.py +++ b/w3af/core/data/parsers/doc/open_api/main.py @@ -67,6 +67,7 @@ def __init__(self, http_response, no_validation=False, discover_fuzzable_headers self.api_calls = [] self.no_validation = no_validation self.discover_fuzzable_headers = discover_fuzzable_headers + self.fuzzed_auth_headers = [] @staticmethod def content_type_match(http_resp): @@ -146,15 +147,11 @@ def parse(self): specification_handler = SpecificationHandler(self.get_http_response(), self.no_validation) + self.fuzzed_auth_headers = [] for data in specification_handler.get_api_information(): try: request_factory = RequestFactory(*data) fuzzable_request = request_factory.get_fuzzable_request() - - if self.discover_fuzzable_headers: - operation = data[4] - headers = self._get_parameter_headers(operation) - fuzzable_request.set_force_fuzzing_headers(headers) except Exception, e: # # This is a strange situation because parsing of the OpenAPI @@ -183,6 +180,11 @@ def parse(self): if not self._should_audit(fuzzable_request): continue + if self.discover_fuzzable_headers: + operation = data[4] + headers = self._get_parameter_headers(operation) + fuzzable_request.set_force_fuzzing_headers(headers) + self.api_calls.append(fuzzable_request) def _get_parameter_headers(self, operation): @@ -199,8 +201,8 @@ def _get_parameter_headers(self, operation): for parameter_name, parameter in operation.params.iteritems(): if parameter.location == 'header': - # Skip auth headers. - if self._is_auth_header(parameter.name, operation.swagger_spec): + # Fuzz auth headers only once. + if self._is_already_fuzzed_auth_header(parameter.name, operation.swagger_spec): continue parameter_headers.add(parameter.name) @@ -209,6 +211,16 @@ def _get_parameter_headers(self, operation): return list(parameter_headers) + def _is_already_fuzzed_auth_header(self, name, spec): + if not self._is_auth_header(name, spec): + return False + + if name in self.fuzzed_auth_headers: + return True + + self.fuzzed_auth_headers.append(name) + return False + @staticmethod def _is_auth_header(name, spec): """ @@ -217,8 +229,10 @@ def _is_auth_header(name, spec): :return: True if this is an auth header, False otherwise. """ for key, auth in spec.security_definitions.iteritems(): - if hasattr(auth, 'location') and hasattr(auth, 'name') \ - and auth.location == 'header' and auth.name == name: + if not hasattr(auth, 'location') or not hasattr(auth, 'name'): + continue + + if auth.location == 'header' and auth.name == name: return True return False diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_main.py b/w3af/core/data/parsers/doc/open_api/tests/test_main.py index 399f3e5d16..53d800e853 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_main.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_main.py @@ -32,6 +32,7 @@ from w3af.core.data.parsers.doc.open_api.tests.example_specifications import PetstoreModel + # Order them to be able to easily assert things def by_path(fra, frb): return cmp(fra.get_url().url_string, frb.get_url().url_string) @@ -443,10 +444,19 @@ def test_no_forcing_fuzzing_auth_headers(self): api_calls = parser.get_api_calls() self.assertTrue(len(api_calls) > 0) + found_api_key = False for fuzzable_request in api_calls: fuzzing_headers = fuzzable_request.get_force_fuzzing_headers() - self.assertNotIn('api_key', fuzzing_headers) self.assertNotIn('Authorization', fuzzing_headers) + if found_api_key: + self.assertNotIn('api_key', fuzzing_headers) + else: + found_api_key = 'api_key' in fuzzing_headers + + # Make sure that we forced to fuzz 'api_key' header once. + self.assertTrue(found_api_key) + self.assertEquals(1, len(parser.fuzzed_auth_headers)) + self.assertIn('api_key', parser.fuzzed_auth_headers) @staticmethod def generate_response(specification_as_string, content_type='application/json'): From 458a161dd9589d139166c865750e5aab913f0cb3 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Thu, 25 Oct 2018 15:06:12 +0200 Subject: [PATCH 4/8] Fixed TestOpenAPINestedModelSpec --- w3af/plugins/tests/crawl/test_open_api.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/w3af/plugins/tests/crawl/test_open_api.py b/w3af/plugins/tests/crawl/test_open_api.py index 8e1fcda040..83ed5097bb 100644 --- a/w3af/plugins/tests/crawl/test_open_api.py +++ b/w3af/plugins/tests/crawl/test_open_api.py @@ -18,8 +18,8 @@ along with w3af; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +import json import re -import urllib from w3af.plugins.audit.sqli import sqli from w3af.plugins.tests.helper import PluginTest, PluginConfig, MockResponse @@ -126,13 +126,14 @@ class TestOpenAPINestedModelSpec(PluginTest): } class SQLIMockResponse(MockResponse): + def get_response(self, http_request, uri, response_headers): basic = http_request.headers.get('Basic', '') if basic != TestOpenAPINestedModelSpec.BEARER: return 401, response_headers, '' # The body is in json format, need to escape my double quotes - request_body = str(http_request.parsed_body) + request_body = json.dumps(http_request.parsed_body) payloads = [p.replace('"', '\\"') for p in sqli.SQLI_STRINGS] response_body = 'Sunny outside' From 01fbd1d5d405928494d17d52948b68f75c186d15 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Thu, 25 Oct 2018 15:36:26 +0200 Subject: [PATCH 5/8] Added TestOpenAPIFuzzAuthHeaders --- w3af/plugins/tests/crawl/test_open_api.py | 76 ++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/w3af/plugins/tests/crawl/test_open_api.py b/w3af/plugins/tests/crawl/test_open_api.py index 83ed5097bb..aed1383906 100644 --- a/w3af/plugins/tests/crawl/test_open_api.py +++ b/w3af/plugins/tests/crawl/test_open_api.py @@ -25,7 +25,8 @@ from w3af.plugins.tests.helper import PluginTest, PluginConfig, MockResponse from w3af.core.data.dc.headers import Headers from w3af.core.data.parsers.doc.open_api.tests.example_specifications import (IntParamQueryString, - NestedModel) + NestedModel, + PetstoreModel) class TestOpenAPIFindAllEndpointsWithAuth(PluginTest): @@ -204,6 +205,79 @@ def by_path(fra, frb): self.assertEqual(len(vulns), 2) +class TestOpenAPIFuzzAuthHeaders(PluginTest): + + api_key = 'zzz' + target_url = 'http://petstore.swagger.io/' + + _run_configs = { + 'cfg': { + 'target': target_url, + 'plugins': {'crawl': (PluginConfig('open_api', + + ('header_auth', + 'api_key: %s' % api_key, + PluginConfig.HEADER), + + ),), + 'audit': (PluginConfig('sqli'),)} + } + } + + class SQLIMockResponse(MockResponse): + + def get_response(self, http_request, uri, response_headers): + api_key = http_request.headers.get('api_key', '') + + for payload in sqli.SQLI_STRINGS: + if payload in api_key: + return self.status, response_headers, 'PostgreSQL query failed:' + + if api_key != TestOpenAPIFuzzAuthHeaders.api_key: + return 401, response_headers, '' + + return self.status, response_headers, 'Sunny outside' + + MOCK_RESPONSES = [MockResponse('http://petstore.swagger.io/openapi.json', + PetstoreModel().get_specification(), + content_type='application/json'), + + SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'), + body=None, + method='GET', + status=200), + + SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'), + body=None, + method='POST', + status=200), + + SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'), + body=None, + method='PUT', + status=200) + ] + + def test_fuzz_auth_header_only_once(self): + cfg = self._run_configs['cfg'] + self._scan(cfg['target'], cfg['plugins']) + + # + # Since we configured authentication we should only get one of the Info + # + infos = self.kb.get('open_api', 'open_api') + self.assertEqual(len(infos), 1, infos) + + info_i = infos[0] + self.assertEqual(info_i.get_name(), 'Open API specification found') + + vulns = self.kb.get('sqli', 'sqli') + self.assertEqual(len(vulns), 1) + + vuln = vulns[0] + self.assertEquals('SQL injection', vuln.get_name()) + + class TestOpenAPIRaisesWarningIfNoAuth(PluginTest): target_url = 'http://w3af.org/' From 6f4489d629581ea165ebd6ba49fad7f2f2b0c528 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Fri, 16 Nov 2018 14:05:04 +0100 Subject: [PATCH 6/8] Remove test_no_forcing_fuzzing_auth_headers() --- .../parsers/doc/open_api/tests/test_main.py | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/w3af/core/data/parsers/doc/open_api/tests/test_main.py b/w3af/core/data/parsers/doc/open_api/tests/test_main.py index 53d800e853..00f8bc4ca3 100644 --- a/w3af/core/data/parsers/doc/open_api/tests/test_main.py +++ b/w3af/core/data/parsers/doc/open_api/tests/test_main.py @@ -436,28 +436,6 @@ def test_is_valid_json_or_yaml_false(self): http_resp = self.generate_response('"', 'image/jpeg') self.assertFalse(OpenAPI.is_valid_json_or_yaml(http_resp)) - def test_no_forcing_fuzzing_auth_headers(self): - specification_as_string = PetstoreModel().get_specification() - http_response = self.generate_response(specification_as_string) - parser = OpenAPI(http_response) - parser.parse() - api_calls = parser.get_api_calls() - - self.assertTrue(len(api_calls) > 0) - found_api_key = False - for fuzzable_request in api_calls: - fuzzing_headers = fuzzable_request.get_force_fuzzing_headers() - self.assertNotIn('Authorization', fuzzing_headers) - if found_api_key: - self.assertNotIn('api_key', fuzzing_headers) - else: - found_api_key = 'api_key' in fuzzing_headers - - # Make sure that we forced to fuzz 'api_key' header once. - self.assertTrue(found_api_key) - self.assertEquals(1, len(parser.fuzzed_auth_headers)) - self.assertIn('api_key', parser.fuzzed_auth_headers) - @staticmethod def generate_response(specification_as_string, content_type='application/json'): url = URL('http://www.w3af.com/swagger.json') From 929fdbd437be23093b9d56fa24f80af269885100 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Fri, 16 Nov 2018 14:06:29 +0100 Subject: [PATCH 7/8] Limit fuzzing of auth headers in HeadersMutant --- .../data/fuzzer/mutants/headers_mutant.py | 27 ++++++++++++ w3af/core/data/parsers/doc/open_api/main.py | 41 +++++++++---------- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/w3af/core/data/fuzzer/mutants/headers_mutant.py b/w3af/core/data/fuzzer/mutants/headers_mutant.py index 5028db20c3..e9cfa8941d 100644 --- a/w3af/core/data/fuzzer/mutants/headers_mutant.py +++ b/w3af/core/data/fuzzer/mutants/headers_mutant.py @@ -20,9 +20,14 @@ """ from w3af.core.data.fuzzer.mutants.mutant import Mutant +import w3af.core.data.kb.knowledge_base as kb class HeadersMutant(Mutant): + + # TODO describe + fuzzed_auth_headers = set() + """ This class is a headers mutant. """ @@ -55,6 +60,8 @@ def create_mutants(cls, freq, mutant_str_list, fuzzable_param_list, """ fuzzable_headers = fuzzer_config['fuzzable_headers'] + freq.get_force_fuzzing_headers() + fuzzable_headers = HeadersMutant._exclude_fuzzed_auth_headers(fuzzable_headers) + if not fuzzable_headers: return [] @@ -64,3 +71,23 @@ def create_mutants(cls, freq, mutant_str_list, fuzzable_param_list, return cls._create_mutants_worker(freq, cls, mutant_str_list, fuzzable_param_list, append, fuzzer_config) + + @classmethod + def _exclude_fuzzed_auth_headers(cls, fuzzable_headers): + """ + TODO + :param fuzzable_headers: + :return: + """ + auth_headers = kb.kb.raw_read('http_data', 'auth_headers') + updated_fuzzable_headers = [] + for fuzzable_header in fuzzable_headers: + if fuzzable_header in auth_headers: + if fuzzable_header in cls.fuzzed_auth_headers: + continue + + cls.fuzzed_auth_headers.add(fuzzable_header) + + updated_fuzzable_headers.append(fuzzable_header) + + return updated_fuzzable_headers diff --git a/w3af/core/data/parsers/doc/open_api/main.py b/w3af/core/data/parsers/doc/open_api/main.py index 87d4b2900a..0b0d83c7b9 100644 --- a/w3af/core/data/parsers/doc/open_api/main.py +++ b/w3af/core/data/parsers/doc/open_api/main.py @@ -24,6 +24,8 @@ from yaml import load +import w3af.core.data.kb.knowledge_base as kb + try: from yaml import CLoader as Loader, CDumper as Dumper except ImportError: @@ -67,7 +69,6 @@ def __init__(self, http_response, no_validation=False, discover_fuzzable_headers self.api_calls = [] self.no_validation = no_validation self.discover_fuzzable_headers = discover_fuzzable_headers - self.fuzzed_auth_headers = [] @staticmethod def content_type_match(http_resp): @@ -147,7 +148,6 @@ def parse(self): specification_handler = SpecificationHandler(self.get_http_response(), self.no_validation) - self.fuzzed_auth_headers = [] for data in specification_handler.get_api_information(): try: request_factory = RequestFactory(*data) @@ -180,18 +180,29 @@ def parse(self): if not self._should_audit(fuzzable_request): continue + operation = data[4] + headers = self._get_parameter_headers(operation) if self.discover_fuzzable_headers: - operation = data[4] - headers = self._get_parameter_headers(operation) fuzzable_request.set_force_fuzzing_headers(headers) + # TODO move it to a separate method + # TODO should it expect multiple specs? + auth_headers = kb.kb.raw_read('http_data', 'auth_headers') + if not isinstance(auth_headers, set): + auth_headers = set() + + for header in headers: + if OpenAPI._is_auth_header(header, operation.swagger_spec): + auth_headers.add(header) + + kb.kb.raw_write('http_data', 'auth_headers', auth_headers) + self.api_calls.append(fuzzable_request) - def _get_parameter_headers(self, operation): + @staticmethod + def _get_parameter_headers(operation): """ Looks for all parameters which are passed to the endpoint via headers. - The method filters out headers with auth info - which are defined in 'securityDefinitions' section of the API spec. :param operation: An instance of Operation class which represents the API endpoint. @@ -200,30 +211,16 @@ def _get_parameter_headers(self, operation): parameter_headers = set() for parameter_name, parameter in operation.params.iteritems(): if parameter.location == 'header': - - # Fuzz auth headers only once. - if self._is_already_fuzzed_auth_header(parameter.name, operation.swagger_spec): - continue - parameter_headers.add(parameter.name) om.out.debug('Found a parameter header for %s endpoint: %s' % (operation.path_name, parameter.name)) return list(parameter_headers) - def _is_already_fuzzed_auth_header(self, name, spec): - if not self._is_auth_header(name, spec): - return False - - if name in self.fuzzed_auth_headers: - return True - - self.fuzzed_auth_headers.append(name) - return False - @staticmethod def _is_auth_header(name, spec): """ + TODO :param name: Header name. :param spec: API specification. :return: True if this is an auth header, False otherwise. From ab7cb7e9e75820e6634b33db0c1c87d980721fc3 Mon Sep 17 00:00:00 2001 From: Artem Smotrakov Date: Fri, 16 Nov 2018 14:07:09 +0100 Subject: [PATCH 8/8] Use common by_path() function in test_open_api.py --- w3af/plugins/tests/crawl/test_open_api.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/w3af/plugins/tests/crawl/test_open_api.py b/w3af/plugins/tests/crawl/test_open_api.py index 143edbeafc..d5e27419f8 100644 --- a/w3af/plugins/tests/crawl/test_open_api.py +++ b/w3af/plugins/tests/crawl/test_open_api.py @@ -30,6 +30,10 @@ PetstoreSimpleModel) +def by_path(fra, frb): + return cmp(fra.get_url().url_string, frb.get_url().url_string) + + class TestOpenAPIFindAllEndpointsWithAuth(PluginTest): target_url = 'http://w3af.org/' @@ -75,9 +79,6 @@ def test_find_all_endpoints_with_auth(self): fuzzable_requests = [f for f in fuzzable_requests if f.get_url().get_path() not in ('/swagger.json', '/')] # Order them to be able to easily assert things - def by_path(fra, frb): - return cmp(fra.get_url().url_string, frb.get_url().url_string) - fuzzable_requests.sort(by_path) # @@ -180,9 +181,6 @@ def test_find_all_endpoints_with_auth(self): fuzzable_requests = [f for f in fuzzable_requests if f.get_url().get_path() not in ('/openapi.json', '/')] # Order them to be able to easily assert things - def by_path(fra, frb): - return cmp(fra.get_url().url_string, frb.get_url().url_string) - fuzzable_requests.sort(by_path) self.assertEqual(len(fuzzable_requests), 1)