diff --git a/w3af/conftest.py b/w3af/conftest.py
new file mode 100644
index 0000000000..d00b6fe14e
--- /dev/null
+++ b/w3af/conftest.py
@@ -0,0 +1,30 @@
+import pytest
+
+from w3af.core.data.dc.headers import Headers
+from w3af.core.data.parsers.doc.url import URL
+from w3af.core.data.url.HTTPRequest import HTTPRequest
+from w3af.core.data.url.HTTPResponse import HTTPResponse
+
+
+@pytest.fixture
+def http_response():
+ url = URL('http://example.com/')
+ headers = Headers([('content-type', 'text/html')])
+ return HTTPResponse(
+ 200,
+ '
',
+ headers,
+ url,
+ url,
+ )
+
+
+@pytest.fixture
+def http_request():
+ url = URL('http://example.com/')
+ headers = Headers([('content-type', 'text/html')])
+ return HTTPRequest(
+ url,
+ headers,
+ method='GET',
+ )
diff --git a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py
index e87ae5734d..12bd36ac85 100644
--- a/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py
+++ b/w3af/core/controllers/chrome/crawler/tests/frameworks/test_angularjs_basics.py
@@ -24,6 +24,7 @@
from w3af.core.controllers.chrome.tests.helpers import ExtendedHttpRequestHandler
+@pytest.mark.skip('uses internet')
class AngularBasicTest(BaseChromeCrawlerTest):
def test_angular_click(self):
self._unittest_setup(AngularButtonClickRequestHandler)
diff --git a/w3af/core/data/parsers/doc/tests/test_wsdl.py b/w3af/core/data/parsers/doc/tests/test_wsdl.py
new file mode 100644
index 0000000000..6a808c04e0
--- /dev/null
+++ b/w3af/core/data/parsers/doc/tests/test_wsdl.py
@@ -0,0 +1,111 @@
+import pytest
+from mock import MagicMock, patch
+
+from w3af.core.data.dc.headers import Headers
+from w3af.core.data.parsers.doc.url import URL
+from w3af.core.data.parsers.doc.wsdl import ZeepTransport, WSDLParser
+from w3af.core.data.url.HTTPResponse import HTTPResponse
+from w3af.core.data.url.extended_urllib import ExtendedUrllib
+from w3af.plugins.tests.plugin_testing_tools import NetworkPatcher
+
+
+@pytest.fixture
+def mocked_http_client():
+ return MagicMock()
+
+
+@pytest.fixture
+def zeep_transport(mocked_http_client):
+ transport = ZeepTransport()
+ transport.uri_opener = mocked_http_client
+ return transport
+
+
+@pytest.fixture
+def zeep_transport_from_class(zeep_transport):
+ return lambda *args, **kwargs: zeep_transport
+
+
+@pytest.fixture
+def http_response():
+ return HTTPResponse(
+ 200,
+ '',
+ Headers(),
+ URL('https://example.com/'),
+ URL('https://example.com/'),
+ )
+
+
+class TestZeepTransport:
+ def test_it_implements_all_needed_methods(self):
+ zeep_transport = ZeepTransport()
+ required_methods = [
+ 'get',
+ 'load',
+ 'post',
+ 'post_xml',
+ ]
+ for method in required_methods:
+ assert hasattr(zeep_transport, method)
+
+ def test_it_calls_http_client_on_get_method(self, zeep_transport, mocked_http_client):
+ zeep_transport.get('https://example.com/', '', {})
+ assert mocked_http_client.GET.called
+
+ def test_it_calls_http_client_on_post_method(self, zeep_transport, mocked_http_client):
+ zeep_transport.post('https://example.com/', 'some data', {})
+ assert mocked_http_client.POST.called
+
+ def test_it_calls_http_client_on_post_xml_method(self, zeep_transport, mocked_http_client):
+ from lxml import etree # feeding Zeep dependencies
+ zeep_transport.post_xml('https://example.com/', etree.Element('test'), {})
+ assert mocked_http_client.POST.called
+
+ def test_it_loads_the_response_content(self, zeep_transport, mocked_http_client):
+ mocked_response = MagicMock(name='mocked_response')
+ mocked_response.body = 'test'
+ mocked_http_client.GET = MagicMock(return_value=mocked_response)
+
+ result = zeep_transport.load('http://example.com/')
+ assert result == 'test'
+
+
+class TestZeepTransportIntegration:
+ def test_it_can_perform_get_request(self):
+ url = 'http://example.com/'
+ with NetworkPatcher() as network_patcher:
+ zeep_transport = ZeepTransport()
+ zeep_transport.get(url, {}, {})
+ assert url in network_patcher.mocked_server.urls_requested
+
+ def test_it_can_perform_post_request(self):
+ url = 'http://example.com/'
+ with NetworkPatcher() as network_patcher:
+ zeep_transport = ZeepTransport()
+ zeep_transport.post(url, 'some data', {})
+ assert url in network_patcher.mocked_server.urls_requested
+
+ def test_it_can_load_url(self):
+ url = 'http://example.com/'
+ with NetworkPatcher() as network_patcher:
+ zeep_transport = ZeepTransport()
+ zeep_transport.load('http://example.com/')
+ assert url in network_patcher.mocked_server.urls_requested
+
+
+class TestWSDLParserIntegration:
+ def test_wsdl_zeep_transport_uses_extended_urllib(self):
+ zeep_transport = ZeepTransport()
+ assert isinstance(zeep_transport.uri_opener, ExtendedUrllib)
+
+ def test_it_uses_extended_urllib_for_performing_requests(
+ self,
+ mocked_http_client,
+ zeep_transport_from_class,
+ http_response,
+ ):
+ mocked_http_client.GET = MagicMock(return_value=http_response)
+ with patch('w3af.core.data.parsers.doc.wsdl.ZeepTransport', zeep_transport_from_class):
+ WSDLParser(http_response=http_response)
+ assert mocked_http_client.GET.called
diff --git a/w3af/core/data/parsers/doc/wsdl.py b/w3af/core/data/parsers/doc/wsdl.py
index 3c2cd9bfb9..adbb01ce31 100644
--- a/w3af/core/data/parsers/doc/wsdl.py
+++ b/w3af/core/data/parsers/doc/wsdl.py
@@ -21,23 +21,62 @@
"""
import contextlib
import sys
-import xml.parsers.expat as expat
from cStringIO import StringIO
-import SOAPpy
import zeep
from requests import HTTPError
from zeep.exceptions import XMLSyntaxError
-import w3af.core.controllers.output_manager as om
import w3af.core.data.kb.knowledge_base as kb
-from w3af.core.controllers.exceptions import BaseFrameworkException
from w3af.core.data.kb.info import Info
from w3af.core.data.parsers.doc.baseparser import BaseParser
from w3af.core.data.parsers.doc.url import URL
from w3af.core.controllers import output_manager
+class ZeepTransport(zeep.Transport):
+ """
+ Custom Zeep Transport class which overrides it's methods to use w3af's HTTP client.
+ We don't call super() on any overwritten method as we want to force Zeep to use
+ our client, not their.
+
+ Tradeoff:
+ As WSDLParser has to be tight coupled to Zeep by design we have to also
+ make tight coupling between WSDLParser and ExtendedUrllib. And that's because
+ parser by design is not intended to perform any requests by itself. Although
+ Zeep is constructed in this specific way that it performs request when it's
+ instantiated.
+ As parsers are not intended to make requests there's also no obvious way to
+ pass uri_opener into parser.
+ """
+ def __init__(self):
+ super(ZeepTransport, self).__init__()
+ from w3af.core.data.url.extended_urllib import ExtendedUrllib
+ self.uri_opener = ExtendedUrllib()
+ self.uri_opener.setup(disable_cache=True)
+
+ def get(self, address, params, headers):
+ return self.uri_opener.GET(address, params, headers=headers)
+
+ def post(self, address, message, headers):
+ return self.uri_opener.POST(address, data=message, headers=headers)
+
+ def post_xml(self, address, envelope, headers):
+ from zeep.wsdl.utils import etree_to_string
+ message = etree_to_string(envelope)
+ return self.uri_opener.POST(address, data=message, headers=headers)
+
+ def load(self, url):
+ response = self.uri_opener.GET(url)
+ return response.body
+
+
+class ZeepClientAdapter(zeep.Client):
+ def __init__(self, url, transport=None, *args, **kwargs):
+ transport = transport or ZeepTransport()
+ super(ZeepClientAdapter, self).__init__(url, transport=transport, *args, **kwargs)
+
+
class WSDLParser(BaseParser):
"""
This class parses WSDL documents.
@@ -48,7 +87,8 @@ class WSDLParser(BaseParser):
def __init__(self, http_response):
self._proxy = None
super(WSDLParser, self).__init__(http_response)
- self._wsdl_client = zeep.Client(str(http_response.get_uri()))
+ wsdl_url = str(http_response.get_uri())
+ self._wsdl_client = ZeepClientAdapter(wsdl_url)
self._discovered_urls = set()
def __getstate__(self):
@@ -58,13 +98,13 @@ def __getstate__(self):
def __setstate__(self, state):
self.__dict__.update(state)
- self._wsdl_client = zeep.Client(str(self._http_response.get_uri()))
+ self._wsdl_client = ZeepClientAdapter(str(self._http_response.get_uri()))
@staticmethod
def can_parse(http_resp):
url = http_resp.get_uri()
try:
- wsdl_client = zeep.Client(str(url))
+ wsdl_client = ZeepClientAdapter(str(url))
except (XMLSyntaxError, HTTPError):
exception_description = (
"The result of url: {} seems not to be valid XML.".format(
diff --git a/w3af/core/data/parsers/tests/test_document_parser.py b/w3af/core/data/parsers/tests/test_document_parser.py
index c1fe9f2aab..9a3a248da9 100644
--- a/w3af/core/data/parsers/tests/test_document_parser.py
+++ b/w3af/core/data/parsers/tests/test_document_parser.py
@@ -21,7 +21,6 @@
"""
import unittest
-import time
import os
from w3af import ROOT_PATH
@@ -33,6 +32,7 @@
from w3af.core.data.parsers.doc.url import URL
from w3af.core.data.parsers.document_parser import (document_parser_factory,
DocumentParser)
+from w3af.plugins.tests.plugin_testing_tools import patch_network
def _build_http_response(body_content, content_type):
@@ -44,6 +44,7 @@ def _build_http_response(body_content, content_type):
return HTTPResponse(200, body_content, headers, url, url, charset='utf-8')
+@patch_network
class TestDocumentParserFactory(unittest.TestCase):
PDF_FILE = os.path.join(ROOT_PATH, 'core', 'data', 'parsers', 'doc',
diff --git a/w3af/core/data/parsers/tests/test_mp_document_parser.py b/w3af/core/data/parsers/tests/test_mp_document_parser.py
index ee7582238f..ee8ce26ff7 100644
--- a/w3af/core/data/parsers/tests/test_mp_document_parser.py
+++ b/w3af/core/data/parsers/tests/test_mp_document_parser.py
@@ -41,7 +41,7 @@
from w3af.core.data.dc.headers import Headers
from w3af.core.data.parsers.doc.html import HTMLParser
from w3af.core.data.parsers.tests.test_document_parser import _build_http_response
-from w3af.plugins.tests.plugin_testing_tools import NetworkPatcher
+from w3af.plugins.tests.plugin_testing_tools import NetworkPatcher, patch_network
@pytest.fixture
@@ -388,6 +388,7 @@ def test_dictproxy_pickle_8748(self):
parser = self.mpdoc.get_document_parser_for(resp)
assert isinstance(parser._parser, HTMLParser)
+ @patch_network
def test_get_tags_by_filter(self):
body = 'foobar'
url = URL('http://www.w3af.com/')
@@ -399,6 +400,7 @@ def test_get_tags_by_filter(self):
assert [Tag('a', {'href': '/abc'}, 'foo'), Tag('b', {}, 'bar')] == tags
+ @patch_network
def test_get_tags_by_filter_empty_tag(self):
body = ''
url = URL('http://www.w3af.com/')
@@ -413,7 +415,7 @@ def test_get_tags_by_filter_empty_tag(self):
def test_it_doesnt_silence_type_error_from_document_parser(self, html_response):
self.mpdoc._document_parser_class = MockedDamagedDocumentParser
- with pytest.raises(TypeError):
+ with pytest.raises(TypeError), NetworkPatcher():
self.mpdoc.get_document_parser_for(html_response)
diff --git a/w3af/core/data/url/extended_urllib.py b/w3af/core/data/url/extended_urllib.py
index 20c77797f3..bc0ebcf9df 100644
--- a/w3af/core/data/url/extended_urllib.py
+++ b/w3af/core/data/url/extended_urllib.py
@@ -528,10 +528,10 @@ def end(self):
def restart(self):
self.end()
- def setup(self):
+ def setup(self, disable_cache=False):
if self.settings.need_update or self._opener is None:
self.settings.need_update = False
- self.settings.build_openers()
+ self.settings.build_openers(disable_cache=disable_cache)
self._opener = self.settings.get_custom_opener()
self.clear_timeout()
@@ -674,12 +674,25 @@ def send_mutant(self, mutant, callback=None, grep=True, cache=True,
return res
- def GET(self, uri, data=None, headers=None, cache=False,
- grep=True, cookies=True, session=None,
- respect_size_limit=True, new_connection=False,
- error_handling=True, timeout=None, follow_redirects=False,
- use_basic_auth=True, use_proxy=True, debugging_id=None,
- binary_response=False):
+ def GET(
+ self,
+ uri,
+ data=None,
+ headers=None,
+ cache=False,
+ grep=True,
+ cookies=True,
+ session=None,
+ respect_size_limit=True,
+ new_connection=False,
+ error_handling=True,
+ timeout=None,
+ follow_redirects=False,
+ use_basic_auth=True,
+ use_proxy=True,
+ debugging_id=None,
+ binary_response=False,
+ ):
"""
HTTP GET a URI using a proxy, user agent, and other settings
that where previously set in opener_settings.py .
@@ -702,15 +715,7 @@ def GET(self, uri, data=None, headers=None, cache=False,
:return: An HTTPResponse object.
"""
- headers = headers or Headers()
-
- if not isinstance(uri, URL):
- raise TypeError('The uri parameter of ExtendedUrllib.GET() must be'
- ' of url.URL type.')
-
- if not isinstance(headers, Headers):
- raise TypeError('The header parameter of ExtendedUrllib.GET() must'
- ' be of Headers type.')
+ uri, headers = self._parse_uri_and_headers(uri, headers, method_name='GET')
# Validate what I'm sending, init the library (if needed)
self.setup()
@@ -738,12 +743,25 @@ def GET(self, uri, data=None, headers=None, cache=False,
with raise_size_limit(respect_size_limit):
return self.send(req, grep=grep)
- def POST(self, uri, data='', headers=None, grep=True, cache=False,
- cookies=True, session=None, error_handling=True, timeout=None,
- follow_redirects=None, use_basic_auth=True, use_proxy=True,
- debugging_id=None, new_connection=False,
- respect_size_limit=None,
- binary_response=False):
+ def POST(
+ self,
+ uri,
+ data='',
+ headers=None,
+ grep=True,
+ cache=False,
+ cookies=True,
+ session=None,
+ error_handling=True,
+ timeout=None,
+ follow_redirects=None,
+ use_basic_auth=True,
+ use_proxy=True,
+ debugging_id=None,
+ new_connection=False,
+ respect_size_limit=None,
+ binary_response=False,
+ ):
"""
POST's data to a uri using a proxy, user agents, and other settings
that where set previously.
@@ -755,15 +773,7 @@ def POST(self, uri, data='', headers=None, grep=True, cache=False,
:see: The GET() for documentation on the other parameters
:return: An HTTPResponse object.
"""
- headers = headers or Headers()
-
- if not isinstance(uri, URL):
- raise TypeError('The uri parameter of ExtendedUrllib.POST() must'
- ' be of url.URL type. Got %s instead.' % type(uri))
-
- if not isinstance(headers, Headers):
- raise TypeError('The header parameter of ExtendedUrllib.POST() must'
- ' be of Headers type.')
+ uri, headers = self._parse_uri_and_headers(uri, headers, method_name='POST')
# Validate what I'm sending, init the library (if needed)
self.setup()
@@ -792,6 +802,38 @@ def POST(self, uri, data='', headers=None, grep=True, cache=False,
return self.send(req, grep=grep)
+ def _parse_uri_and_headers(self, uri, headers, method_name):
+ """
+ If uri or headers comes in primitive format then make sure they're
+ instantiated to proper ones.
+ """
+ if isinstance(headers, dict):
+ new_headers = []
+ for key, value in headers.items():
+ new_headers.append((key, value))
+ headers = Headers(new_headers)
+ headers = headers or Headers()
+
+ if not isinstance(headers, Headers):
+ error_message = (
+ 'The header parameter of ExtendedUrllib.{}() must be of dict Headers type.'
+ )
+ raise TypeError(
+ error_message.format(method_name)
+ )
+
+ if isinstance(uri, str):
+ uri = URL(uri)
+ if not isinstance(uri, URL):
+ error_message = (
+ 'The uri parameter of ExtendedUrllib.{}() must be of str or url.URL type.'
+ )
+ raise TypeError(
+ error_message.format(method_name)
+ )
+
+ return uri, headers
+
def get_remote_file_size(self, req, cache=True):
"""
This method was previously used in the framework to perform a HEAD
diff --git a/w3af/core/data/url/handlers/cache.py b/w3af/core/data/url/handlers/cache.py
index c348419adc..26cb42305f 100644
--- a/w3af/core/data/url/handlers/cache.py
+++ b/w3af/core/data/url/handlers/cache.py
@@ -28,6 +28,8 @@
# TODO: Why not POST? Why don't we perform real caching and respect
# the cache headers/meta tags?
# @see: https://bitbucket.org/jaraco/jaraco.net/src/65af6e442d21/jaraco/net/http/caching.py
+from w3af.core.data.url.handlers.cache_backend.no_chache import NoCachedResponse
+
CACHE_METHODS = ('GET', 'HEAD')
@@ -42,14 +44,17 @@ class CacheHandler(urllib2.BaseHandler):
:author: Version 0.2 by Andres Riancho
:author: Version 0.3 by Javier Andalia
"""
- def __init__(self):
- CacheClass.init()
+ def __init__(self, disable_cache=False):
+ self._cache_class = DefaultCacheClass
+ if disable_cache:
+ self._cache_class = NoCachedResponse
+ self._cache_class.init()
def clear(self):
"""
Clear the cache (remove all files and directories associated with it).
"""
- return CacheClass.clear()
+ return self._cache_class.clear()
def default_open(self, request):
"""
@@ -64,11 +69,11 @@ def default_open(self, request):
if not request.get_from_cache:
return None
- if not CacheClass.exists_in_cache(request):
+ if not self._cache_class.exists_in_cache(request):
return None
try:
- cache_response_obj = CacheClass(request)
+ cache_response_obj = self._cache_class(request)
except Exception:
# Sometimes the cache gets corrupted, or the initial HTTP
# request that's saved to disk doesn't completely respect the
@@ -105,11 +110,11 @@ def http_response(self, request, response):
# above) to decide if the response should be returned from the
# cache
#
- CacheClass.store_in_cache(request, response)
+ self._cache_class.store_in_cache(request, response)
return response
https_response = http_response
# This is the default implementation
-CacheClass = SQLCachedResponse
+DefaultCacheClass = SQLCachedResponse
diff --git a/w3af/core/data/url/handlers/cache_backend/no_chache.py b/w3af/core/data/url/handlers/cache_backend/no_chache.py
new file mode 100644
index 0000000000..910327524a
--- /dev/null
+++ b/w3af/core/data/url/handlers/cache_backend/no_chache.py
@@ -0,0 +1,19 @@
+from w3af.core.data.url.handlers.cache_backend.cached_response import CachedResponse
+
+
+class NoCachedResponse(CachedResponse):
+ @staticmethod
+ def init():
+ pass
+
+ @staticmethod
+ def exists_in_cache(request):
+ return False
+
+ @staticmethod
+ def clear():
+ pass
+
+ @staticmethod
+ def store_in_cache(request, response):
+ pass
diff --git a/w3af/core/data/url/handlers/tests/test_cache.py b/w3af/core/data/url/handlers/tests/test_cache.py
index aed1d39b1b..c91938af9e 100644
--- a/w3af/core/data/url/handlers/tests/test_cache.py
+++ b/w3af/core/data/url/handlers/tests/test_cache.py
@@ -24,7 +24,7 @@
import unittest
import pytest
-from mock import patch, Mock, _Call
+from mock import patch, Mock, _Call, MagicMock
from w3af.core.data.url.HTTPRequest import HTTPRequest
from w3af.core.data.url.handlers.cache import CacheHandler
@@ -33,64 +33,85 @@
from w3af.core.data.dc.headers import Headers
-class TestCacheHandler(unittest.TestCase):
-
- def tearDown(self):
+class TestCacheHandler:
+ def setup_method(self):
+ self.url = URL('http://www.w3af.org')
+ self.request = HTTPRequest(self.url, cache=True)
+ self.response = FakeHttplibHTTPResponse(
+ 200, 'OK', 'spameggs', Headers(), self.url.url_string
+ )
+
+ def teardown_method(self):
CacheHandler().clear()
-
+
def test_basic(self):
- url = URL('http://www.w3af.org')
- request = HTTPRequest(url, cache=True)
-
+
cache = CacheHandler()
- self.assertEqual(cache.default_open(request), None)
-
- response = FakeHttplibHTTPResponse(200, 'OK', 'spameggs', Headers(),
- url.url_string)
+ assert cache.default_open(self.request) is None
- with patch('w3af.core.data.url.handlers.cache.CacheClass') as cc_mock:
- store_in_cache = Mock()
- cc_mock.attach_mock(store_in_cache, 'store_in_cache')
+ cc_mock = MagicMock()
+ cache._cache_class = cc_mock
+ store_in_cache = Mock()
+ cc_mock.attach_mock(store_in_cache, 'store_in_cache')
- # This stores the response
- cache.http_response(request, response)
+ # This stores the response
+ cache.http_response(self.request, self.response)
- # Make sure the right call was made
- _call = _Call(('store_in_cache', (request, response)))
- self.assertEqual(cc_mock.mock_calls, [_call])
- cc_mock.reset_mock()
+ # Make sure the right call was made
+ _call = _Call(('store_in_cache', (self.request, self.response)))
+ assert cc_mock.mock_calls == [_call]
+ cc_mock.reset_mock()
- exists_in_cache = Mock()
- cc_mock.return_value = response
- cc_mock.attach_mock(exists_in_cache, 'exists_in_cache')
+ exists_in_cache = Mock()
+ cc_mock.return_value = self.response
+ cc_mock.attach_mock(exists_in_cache, 'exists_in_cache')
- # This retrieves the response from the "cache"
- cached_response = cache.default_open(request)
+ # This retrieves the response from the "cache"
+ cached_response = cache.default_open(self.request)
- # Make sure the right call was made
- _exists_call = _Call(('exists_in_cache', (request,)))
- _retrieve_call = _Call(((request,), {}))
- self.assertEqual(cc_mock.mock_calls, [_exists_call, _retrieve_call])
+ # Make sure the right call was made
+ _exists_call = _Call(('exists_in_cache', (self.request,)))
+ _retrieve_call = _Call(((self.request,), {}))
+ assert cc_mock.mock_calls == [_exists_call, _retrieve_call]
- self.assertIsNotNone(cached_response)
+ assert cached_response is not None
- self.assertEqual(cached_response.code, response.code)
- self.assertEqual(cached_response.msg, response.msg)
- self.assertEqual(cached_response.read(), response.read())
- self.assertEqual(Headers(cached_response.info().items()), response.info())
- self.assertEqual(cached_response.geturl(), response.geturl())
+ assert cached_response.code == self.response.code
+ assert cached_response.msg == self.response.msg
+ assert cached_response.read() == self.response.read()
+ assert Headers(cached_response.info().items()) == self.response.info()
+ assert cached_response.geturl() == self.response.geturl()
+
+ def test_cache_handler_with_enabled_cache(self):
+ default_cache = MagicMock()
+ with patch(
+ 'w3af.core.data.url.handlers.cache.DefaultCacheClass', default_cache
+ ):
+ cache_handler = CacheHandler(disable_cache=False)
+ assert cache_handler.default_open(self.request)
+ # cache_handler.http_response(self.request, self.response)
+ # assert default_cache.store_in_cache.call_count == 1
+ # assert cache_handler.http_response(self.request, self.response)
+ # assert default_cache.store_in_cache.call_count == 1
+
+ def test_cache_handler_with_disabled_cache(self):
+ with patch(
+ 'w3af.core.data.url.handlers.cache.DefaultCacheClass', MagicMock()
+ ):
+ cache_handler = CacheHandler(disable_cache=True)
+ assert not cache_handler.default_open(self.request)
def test_no_cache(self):
url = URL('http://www.w3af.org')
request = HTTPRequest(url, cache=False)
cache = CacheHandler()
- self.assertEqual(cache.default_open(request), None)
+ assert cache.default_open(request) is None
response = FakeHttplibHTTPResponse(200, 'OK', 'spameggs', Headers(),
url.url_string)
cache.http_response(request, response)
- self.assertEqual(cache.default_open(request), None)
+ assert cache.default_open(request) is None
class CacheIntegrationTest(unittest.TestCase):
@@ -103,7 +124,7 @@ def test_cache_http_errors(self):
url = URL('http://w3af.org/foo-bar-not-exists.htm')
request = HTTPRequest(url, cache=False)
- with patch('w3af.core.data.url.handlers.cache.CacheClass') as cc_mock:
+ with patch('w3af.core.data.url.handlers.cache.DefaultCacheClass') as cc_mock:
store_in_cache = Mock()
cc_mock.attach_mock(store_in_cache, 'store_in_cache')
diff --git a/w3af/core/data/url/handlers/tests/test_no_cache.py b/w3af/core/data/url/handlers/tests/test_no_cache.py
new file mode 100644
index 0000000000..5c8a99294e
--- /dev/null
+++ b/w3af/core/data/url/handlers/tests/test_no_cache.py
@@ -0,0 +1,15 @@
+from mock import MagicMock
+
+from w3af.core.data.url.handlers.cache_backend.no_chache import NoCachedResponse
+
+
+def test_it_implements_all_static_methods_required():
+ NoCachedResponse.init()
+ NoCachedResponse.clear()
+ NoCachedResponse.exists_in_cache(MagicMock())
+ NoCachedResponse.store_in_cache(MagicMock(), MagicMock())
+
+
+def test_response_wont_exist_in_cache(http_request, http_response):
+ NoCachedResponse.store_in_cache(http_request, http_response)
+ assert not NoCachedResponse.exists_in_cache(http_request)
diff --git a/w3af/core/data/url/opener_settings.py b/w3af/core/data/url/opener_settings.py
index 16ef552237..d4fc7ee2ef 100644
--- a/w3af/core/data/url/opener_settings.py
+++ b/w3af/core/data/url/opener_settings.py
@@ -370,11 +370,11 @@ def get_keep_alive_handlers(self):
self._ka_https
}
- def build_openers(self):
+ def build_openers(self, disable_cache=False):
# Instantiate the handlers passing the proxy as parameter
self._ka_http = HTTPHandler()
self._ka_https = HTTPSHandler(self.get_proxy())
- self._cache_handler = CacheHandler()
+ self._cache_handler = CacheHandler(disable_cache=disable_cache)
# Prepare the list of handlers
handlers = []
diff --git a/w3af/core/data/url/tests/test_xurllib.py b/w3af/core/data/url/tests/test_xurllib.py
index 54dd0c9015..0dd6bb6dc7 100644
--- a/w3af/core/data/url/tests/test_xurllib.py
+++ b/w3af/core/data/url/tests/test_xurllib.py
@@ -31,7 +31,7 @@
import httpretty
from nose.plugins.attrib import attr
-from mock import patch
+from mock import patch, MagicMock
from w3af import ROOT_PATH
from w3af.core.data.url.extended_urllib import ExtendedUrllib
@@ -52,7 +52,10 @@
@attr('moth')
@attr('smoke')
-class TestXUrllib(unittest.TestCase):
+class TestXUrllibUnittest(unittest.TestCase):
+ """
+ Pytest style is preferred for newer tests
+ """
MOTH_MESSAGE = 'moth: vulnerable web application'
MOCK_URL = 'http://www.w3af.org/'
@@ -64,7 +67,7 @@ def tearDown(self):
self.uri_opener.end()
httpretty.reset()
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_basic(self):
url = URL(get_moth_http())
http_response = self.uri_opener.GET(url, cache=False)
@@ -86,7 +89,7 @@ def test_redir_content_length_zero(self):
http_response = self.uri_opener.GET(url, cache=False)
self.assertEqual(http_response.get_code(), 301)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_basic_ssl(self):
url = URL(get_moth_https())
http_response = self.uri_opener.GET(url, cache=False)
@@ -96,6 +99,7 @@ def test_basic_ssl(self):
self.assertGreaterEqual(http_response.id, 1)
self.assertNotEqual(http_response.id, None)
+ @pytest.mark.skip('uses internet')
def test_github_ssl(self):
url = URL('https://raw.githubusercontent.com/RetireJS/retire.js/master/repository/jsrepository.json')
@@ -106,7 +110,7 @@ def test_github_ssl(self):
self.assertGreaterEqual(http_response.id, 1)
self.assertNotEqual(http_response.id, None)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_cache(self):
url = URL(get_moth_http())
http_response = self.uri_opener.GET(url)
@@ -116,7 +120,7 @@ def test_cache(self):
http_response = self.uri_opener.GET(url)
self.assertIn(self.MOTH_MESSAGE, http_response.body)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_qs_params(self):
url = URL(get_moth_http('/audit/xss/simple_xss.py?text=123456abc'))
http_response = self.uri_opener.GET(url, cache=False)
@@ -173,7 +177,7 @@ def test_GET_with_post_data_and_qs(self):
self.assertEqual(httpretty.last_request().body, data)
self.assertEqual(httpretty.last_request().path, '/' + qs)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_post(self):
url = URL(get_moth_http('/audit/xss/simple_xss_form.py'))
@@ -183,7 +187,7 @@ def test_post(self):
http_response = self.uri_opener.POST(url, data, cache=False)
self.assertIn('123456abc', http_response.body)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_post_special_chars(self):
url = URL(get_moth_http('/audit/xss/simple_xss_form.py'))
test_data = u'abc"-á-'
@@ -194,7 +198,6 @@ def test_post_special_chars(self):
http_response = self.uri_opener.POST(url, data, cache=False)
self.assertIn(test_data, http_response.body)
- @pytest.mark.deprecated
def test_unknown_domain(self):
url = URL('http://longsitethatdoesnotexistfoo.com/')
self.assertRaises(HTTPRequestException, self.uri_opener.GET, url)
@@ -203,13 +206,12 @@ def test_file_proto(self):
url = URL('file://foo/bar.txt')
self.assertRaises(HTTPRequestException, self.uri_opener.GET, url)
- @pytest.mark.deprecated
def test_url_port_closed(self):
# TODO: Change 2312 by an always closed/non-http port
url = URL('http://127.0.0.1:2312/')
self.assertRaises(HTTPRequestException, self.uri_opener.GET, url)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_url_port_not_http(self):
upper_daemon = UpperDaemon(EmptyTCPHandler)
upper_daemon.start()
@@ -226,7 +228,6 @@ def test_url_port_not_http(self):
else:
self.assertTrue(False, 'Expected HTTPRequestException.')
- @pytest.mark.deprecated
def test_url_port_not_http_many(self):
upper_daemon = UpperDaemon(EmptyTCPHandler)
upper_daemon.start()
@@ -255,7 +256,7 @@ def test_url_port_not_http_many(self):
self.assertEqual(scan_must_stop_e, 1)
self.assertEqual(http_request_e, 9)
- @pytest.mark.deprecated
+ @pytest.mark.skip('uses internet')
def test_get_wait_time(self):
"""
Asserts that all the responses coming out of the extended urllib have a
@@ -323,7 +324,6 @@ def test_ssl_sni(self):
resp = self.uri_opener.GET(url)
self.assertIn('Great!', resp.get_body())
- @pytest.mark.deprecated
def test_ssl_fail_when_requesting_http(self):
http_daemon = UpperDaemon(Ok200Handler)
http_daemon.start()
@@ -337,7 +337,6 @@ def test_ssl_fail_when_requesting_http(self):
self.assertRaises(HTTPRequestException, self.uri_opener.GET, url)
- @pytest.mark.deprecated
def test_ssl_fail_when_requesting_moth_http(self):
"""
https://github.com/andresriancho/w3af/issues/7989
@@ -415,8 +414,8 @@ def send(uri_opener, output):
self.assertEqual(http_response.get_code(), 200)
self.assertIn(self.MOTH_MESSAGE, http_response.body)
-
- @pytest.mark.deprecated
+
+ @pytest.mark.skip('uses internet')
def test_removes_cache(self):
url = URL(get_moth_http())
self.uri_opener.GET(url, cache=False)
@@ -434,8 +433,8 @@ def test_removes_cache(self):
test_trace_path = os.path.join(temp_dir, trace_fmt % i)
self.assertFalse(os.path.exists(test_db_path), test_db_path)
self.assertFalse(os.path.exists(test_trace_path), test_trace_path)
-
- @pytest.mark.deprecated
+
+ @pytest.mark.skip('uses internet')
def test_special_char_header(self):
url = URL(get_moth_http('/core/headers/echo-headers.py'))
header_content = u'name=ábc'
@@ -443,7 +442,6 @@ def test_special_char_header(self):
http_response = self.uri_opener.GET(url, cache=False, headers=headers)
self.assertIn(header_content, http_response.body)
- @pytest.mark.deprecated
def test_bad_file_descriptor_8125_local(self):
"""
8125 is basically an issue with the way HTTP SSL connections handle the
@@ -505,6 +503,48 @@ def rate_limit_generic(self, max_requests_per_second, _min, _max):
self.assertLessEqual(elapsed_time, _max)
+@pytest.fixture
+def blind_extended_urllib():
+ """
+ It's blind. It doesn't send real request and it returns MagicMock instead of
+ HTTPResponse instance, so mock's implementation stays easy.
+ """
+ extended_urllib = ExtendedUrllib()
+ extended_urllib.setup()
+ extended_urllib._opener = MagicMock()
+ with patch('w3af.core.data.url.extended_urllib.HTTPResponse', MagicMock()):
+ yield extended_urllib
+
+
+class TestXUrllib:
+ def test_get_method_can_be_called_with_url_as_string(self, blind_extended_urllib):
+ blind_extended_urllib.GET('http://example.com/') # no error
+
+ def test_get_method_can_be_called_with_headers_as_dict(self, blind_extended_urllib):
+ headers = {
+ 'origin': 'example.com',
+ 'authorization': 'some token',
+ }
+ # no error
+ blind_extended_urllib.GET('http://example.com/', headers=headers)
+
+ def test_post_method_can_be_called_with_url_as_string(self, blind_extended_urllib):
+ # no error
+ blind_extended_urllib.POST('http://example.com/', data='some data')
+
+ def test_post_method_can_be_called_with_headers_as_dict(self, blind_extended_urllib):
+ headers = {
+ 'origin': 'example.com',
+ 'authorization': 'some token',
+ }
+ # no error
+ blind_extended_urllib.POST(
+ 'http://example.com/',
+ data='some data',
+ headers=headers,
+ )
+
+
class EmptyTCPHandler(SocketServer.BaseRequestHandler):
def handle(self):
self.data = self.request.recv(1024).strip()
diff --git a/w3af/plugins/tests/crawl/test_web_spider.py b/w3af/plugins/tests/crawl/test_web_spider.py
index 9a0a2473a2..6ca9d0ccca 100644
--- a/w3af/plugins/tests/crawl/test_web_spider.py
+++ b/w3af/plugins/tests/crawl/test_web_spider.py
@@ -313,7 +313,7 @@ class TestDeadLock(PluginTest):
MOCK_RESPONSES = [MockResponse('http://mock/', INDEX_HTML),
MockResponse('http://mock/', 'Thanks.', method='POST')]
- @pytest.mark.deprecated
+ @pytest.mark.slow
def test_no_lock(self):
cfg = self._run_configs['cfg']
self._scan(cfg['target'], cfg['plugins'])
diff --git a/w3af/plugins/tests/plugin_testing_tools.py b/w3af/plugins/tests/plugin_testing_tools.py
index 4871758788..f3ca20c034 100644
--- a/w3af/plugins/tests/plugin_testing_tools.py
+++ b/w3af/plugins/tests/plugin_testing_tools.py
@@ -166,10 +166,21 @@ def mock_GET(self, url, *args, **kwargs):
"""
Mock for all places where w3af uses extended urllib.
- :param URL url: w3af.core.data.parsers.doc.url.URL instance
+ :param URL or str url: w3af.core.data.parsers.doc.url.URL instance or str
:return: w3af.core.data.url.HTTPResponse.HTTPResponse instance
"""
- return self._mocked_resp(url, self.match_response(str(url)))
+ url = str(url)
+ return self._mocked_resp(URL(url), self.match_response(url))
+
+ def mock_POST(self, url, *args, **kwargs):
+ """
+ Mock for all places where w3af uses extended urllib.
+
+ :param URL or str url: w3af.core.data.parsers.doc.url.URL instance or str
+ :return: w3af.core.data.url.HTTPResponse.HTTPResponse instance
+ """
+ url = str(url)
+ return self._mocked_resp(URL(url), self.match_response(url))
def mock_chrome_load_url(self, *args, **kwargs):
def real_mock(self_, url, *args, **kwargs):
@@ -270,13 +281,13 @@ def __enter__(self):
chrome_patcher.start()
self.patchers.append(chrome_patcher)
- # for soap plugin
- soap_patcher = patch(
- 'w3af.core.data.parsers.doc.wsdl.zeep.transports.Transport._load_remote_data',
- self.mocked_server.mock_response,
+ post_patcher = patch(
+ 'w3af.core.data.url.extended_urllib.ExtendedUrllib.POST',
+ self.mocked_server.mock_POST,
)
- soap_patcher.start()
- self.patchers.append(soap_patcher)
+ self.patchers.append(post_patcher)
+ post_patcher.start()
+
from w3af.plugins.crawl.web_spider import web_spider
if self.plugin_instance and isinstance(self.plugin_instance, web_spider):
self._handle_web_spider_plugin()
diff --git a/w3af/plugins/tests/test_plugin_testing_tools.py b/w3af/plugins/tests/test_plugin_testing_tools.py
index fda06d08f0..9136c78972 100644
--- a/w3af/plugins/tests/test_plugin_testing_tools.py
+++ b/w3af/plugins/tests/test_plugin_testing_tools.py
@@ -25,6 +25,13 @@ def test_it_works_and_hits_mocked_server(self):
self.url_opener.GET(MagicMock())
assert call.mock_GET in mocked_server.method_calls
+ def test_it_works_for_post_requests(self):
+ mocked_server = MagicMock()
+ network_patcher = NetworkPatcher(mocked_server=mocked_server)
+ with network_patcher:
+ self.url_opener.POST('http://example.com/', 'data')
+ assert mocked_server.mock_POST.called
+
def test_it_stops_all_patchers(self, network_patcher):
with network_patcher:
pass