diff --git a/falcon/testing/client.py b/falcon/testing/client.py index c5dab80c8..29f28858b 100644 --- a/falcon/testing/client.py +++ b/falcon/testing/client.py @@ -20,9 +20,9 @@ import asyncio import datetime as dt +import hashlib import inspect import json as json_module -import os import time from typing import Dict from typing import Optional @@ -34,8 +34,8 @@ from falcon.asgi_spec import ScopeType from falcon.constants import COMBINED_METHODS -from falcon.constants import MEDIA_JSON, MEDIA_URLENCODED -from falcon.errors import CompatibilityError, HTTPBadRequest +from falcon.constants import MEDIA_JSON, MEDIA_MULTIPART, MEDIA_URLENCODED +from falcon.errors import CompatibilityError from falcon.testing import helpers from falcon.testing.srmock import StartResponseMock from falcon.util import async_to_sync @@ -439,8 +439,7 @@ def simulate_request( content_type=None, body=None, json=None, - files=None, - data=None, + form=None, file_wrapper=None, wsgierrors=None, params=None, @@ -532,30 +531,6 @@ def simulate_request( overrides `body` and sets the Content-Type header to ``'application/json'``, overriding any value specified by either the `content_type` or `headers` arguments. - - Note: - Can only be used if data and files are null, otherwise an exception - is thrown. - - files(dict): same as the files parameter in requests, - dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``) - for multipart encoding upload. - ``file-tuple``: can be a 2-tuple ``('filename', fileobj)`` or a - 3-tuple ``('filename', fileobj, 'content_type')`` - where ``'content-type'`` is a string defining the content - type of the given file. - - Note: - If both data and json are present, an exception is thrown. - To pass additional form-data with files, use data. - - data : list of tuples, dict or (b)string, with additional data - to be passed with files (or alone if files is null), to be treated - as urlencoded form data. - - Note: - If both data and json are present, an exception is thrown. - file_wrapper (callable): Callable that returns an iterable, to be used as the value for *wsgi.file_wrapper* in the WSGI environ (default: ``None``). This can be used to test @@ -603,8 +578,7 @@ def simulate_request( content_type=content_type, body=body, json=json, - files=files, - data=data, + form=form, params=params, params_csv=params_csv, protocol=protocol, @@ -628,8 +602,7 @@ def simulate_request( headers, body, json, - files, - data, + form, extras, ) @@ -683,8 +656,7 @@ async def _simulate_request_asgi( content_type=None, body=None, json=None, - files=None, - data=None, + form=None, params=None, params_csv=True, protocol='http', @@ -770,29 +742,9 @@ async def _simulate_request_asgi( overrides `body` and sets the Content-Type header to ``'application/json'``, overriding any value specified by either the `content_type` or `headers` arguments. - - Note: - Can only be used if data and files are null, otherwise an exception - is thrown. - - files(dict): same as the files parameter in requests, - dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``) - for multipart encoding upload. - ``file-tuple``: can be a 2-tuple ``('filename', fileobj)`` or a - 3-tuple ``('filename', fileobj, 'content_type')``, - where ``'content-type'`` is a string defining the content - type of the given file. - - Mote: - If both files and json are present, an exception is thrown. To pass - additional form-data with files, use data. - - data : list of tuples, dict or (b)string with additional data to be passed with - files (or alone if files is null), to be treated as urlencoded form data. - - Note: - If both data and json are present, an exception is thrown. - + form (dict): A form to submit as the request's body + (default: ``None``). If present, overrides `body`, and sets the + Content-Type header. host(str): A string to use for the hostname part of the fully qualified request URL (default: 'falconframework.org') remote_addr (str): A string to use as the remote IP address for the @@ -831,8 +783,7 @@ async def _simulate_request_asgi( headers, body, json, - files, - data, + form, extras, ) @@ -2195,153 +2146,57 @@ async def __aexit__(self, exc_type, exc, tb): await self._task_req -def _prepare_data_fields(data, boundary=None, urlenc=False): - """Prepare data fields for request body. - - Args: - data: dict or list of tuples with json data from the request - - Returns: list of 2-tuples (field-name(str), value(bytes)) - - """ - urlresult = [] - body_part = b'' - if isinstance(data, (str, bytes)) or hasattr(data, 'read'): - try: - fields = list(json_module.loads(data).items()) - except ValueError: - # if it's not a json, then treat as body - return data - elif isinstance(data, dict): - fields = list(data.items()) - else: - fields = list(data) - - # Append data to the other multipart parts - for field, val in fields: - if isinstance(val, str) or not hasattr(val, '__iter__'): - val = [val] - # if no files are passed, make urlencoded form - if urlenc: - for v in val: - if v: - urlresult.append( - ( - field.encode('utf-8') if isinstance(field, str) else field, - v.encode('utf-8') if isinstance(v, str) else v, - ) - ) - # if files and data are passed, concat data to files body like in requests - else: - for v in val: - body_part += ( - f'Content-Disposition: form-data; name={field}; ' - f'\r\n\r\n'.encode() - ) - if v: - if not isinstance(v, bytes): - v = str(v) - body_part += v.encode('utf-8') if isinstance(v, str) else v - body_part += b'\r\n--' + boundary.encode() + b'\r\n' - else: - body_part += b'\r\n--' + boundary.encode() + b'\r\n' - - return body_part if not urlenc else urlencode(urlresult, doseq=True) - - -def _prepare_files(k, v): - """Prepare file attributes for body of request form. - - Args: - k: (str), file-name - v: fileobj or tuple (filename, data, content_type?) +def _encode_form(form: dict) -> tuple: + """Build the body for a URL-encoded or multipart form. - Returns: file_name, file_data, file_content_type + This utility method accepts two types of forms: a simple dict mapping + string keys to values will get URL-encoded, whereas if any value is a list + of two or three items, these will be treated as (filename, content) or + (filename, content, content_type), and encoded as a multipart form. + Returns: (encoded body bytes, Content-Type header) """ - file_content_type = None - if not v: - raise ValueError(f'No file provided for {k}') - if isinstance(v, (tuple, list)): - if len(v) == 2: - file_name, file_data = v - else: - file_name, file_data, file_content_type = v - if ( - len(v) == 3 - and file_content_type - and file_content_type.startswith('multipart/mixed') - ): - file_data, new_header = _encode_files(json_module.loads(file_data.decode())) - file_content_type = 'multipart/mixed; ' + ( - new_header['Content-Type'].split('; ')[1] - ) - else: - # if v is not a tuple or iterable it has to be a filelike obj - name = getattr(v, 'name', None) - if name and isinstance(name, str) and name[0] != '<' and name[-1] != '>': - file_name = os.path.basename(name) - else: - file_name = k - file_data = v - if hasattr(file_data, 'read'): - file_data = file_data.read() - return file_name, file_data, file_content_type - + form_items = form.items() if isinstance(form, dict) else form -def _make_boundary(): - """ - Create random boundary to be used in multipart/form-data with files. - """ - boundary = os.urandom(16).hex() - return boundary + if not any(isinstance(value, (list, tuple)) for _, value in form_items): + # URL-encoded form + return urlencode(form, doseq=True).encode(), MEDIA_URLENCODED + # Encode multipart form + body = [b''] -def _encode_files(files, data=None): - """Build the body for a multipart/form-data request. + for name, value in form_items: + data = value + filename = None + content_type = 'text/plain' - Will successfully encode files when passed as a dict or a list of - tuples. ``data`` fields are added first. - The tuples may be 2-tuples (filename, fileobj) or - 3-tuples (filename, fileobj, contentype). - Allows for content_type = ``multipart/mixed`` for submission of nested files + if isinstance(value, (list, tuple)): + try: + filename, data = value + content_type = 'application/octet-stream' + except ValueError: + filename, data, content_type = value + if isinstance(data, str): + data = data.encode() + elif not isinstance(data, bytes): + # Assume a file-like object + data = data.read() - Returns: (encoded body string, headers dict) - """ - boundary = _make_boundary() - body_string = b'--' + boundary.encode() + b'\r\n' - header = {'Content-Type': 'multipart/form-data; boundary=' + boundary} - - # Deal with the files tuples - if not isinstance(files, (dict, list)): - raise ValueError('cannot encode objects that are not 2-tuples') - elif isinstance(files, dict): - files = list(files.items()) - - for (k, v) in files: - file_name, file_data, file_content_type = _prepare_files(k, v) - if not file_data: - continue - - body_string += f'Content-Disposition: form-data; name={k}; '.encode() - body_string += ( - f'filename={file_name}\r\n'.encode() if file_name else '\r\n'.encode() - ) - body_string += ( - f'Content-Type: {file_content_type or "text/plain"}\r\n\r\n'.encode() - ) - body_string += ( - file_data.encode('utf-8') if isinstance(file_data, str) else file_data - ) - body_string += b'\r\n--' + boundary.encode() + b'\r\n' + headers = f'Content-Disposition: form-data; name="{name}"' + if filename: + headers += f'; filename="{filename}"' + headers += f'\r\nContent-Type: {content_type}\r\n\r\n' - # Handle whatever json data gets passed along with files - if data: - body_string += _prepare_data_fields(data, boundary) + body.append(headers.encode() + data + b'\r\n') - body_string = body_string[:-2] + b'--\r\n' + checksum = hashlib.sha256() + for chunk in body: + checksum.update(chunk) + boundary = checksum.hexdigest() - return body_string, header + encoded = f'--{boundary}\r\n'.encode().join(body) + encoded += f'--{boundary}--\r\n'.encode() + return encoded, f'{MEDIA_MULTIPART}; boundary={boundary}' def _prepare_sim_args( @@ -2353,8 +2208,7 @@ def _prepare_sim_args( headers, body, json, - files, - data, + form, extras, ): if not path.startswith('/'): @@ -2384,24 +2238,16 @@ def _prepare_sim_args( headers = headers or {} headers['Content-Type'] = content_type - if files or data: - if json: - raise HTTPBadRequest( - description='Cannot process both json and (files or data) args' - ) - elif files: - body, headers = _encode_files(files, data) - else: - body = _prepare_data_fields(data, None, True) - headers = headers or {} - if not headers: - headers['Content-Type'] = MEDIA_URLENCODED - elif json is not None: body = json_module.dumps(json, ensure_ascii=False) headers = headers or {} headers['Content-Type'] = MEDIA_JSON + elif form is not None: + body, content_type = _encode_form(form) + headers = headers or {} + headers['Content-Type'] = content_type + return path, query_string, headers, body, extras diff --git a/tests/test_media_multipart.py b/tests/test_media_multipart.py index 7edf51f4c..1a0f8636b 100644 --- a/tests/test_media_multipart.py +++ b/tests/test_media_multipart.py @@ -850,3 +850,42 @@ async def deserialize_async(self, stream, content_type, content_length): assert resp.status_code == 200 assert resp.json == ['', '0x48'] + + +def test_simulate_form(client): + resp = client.simulate_post( + '/submit', + form={ + 'checked': 'true', + 'file': ('test.txt', b'Hello, World!\n', 'text/plain'), + 'another': ('test.dat', io.BytesIO(b'1\n2\n3\n')), + }, + ) + + assert resp.status_code == 200 + assert resp.json == [ + { + 'content_type': 'text/plain', + 'data': 'true', + 'filename': None, + 'name': 'checked', + 'secure_filename': None, + 'text': 'true', + }, + { + 'content_type': 'text/plain', + 'data': 'Hello, World!\n', + 'filename': 'test.txt', + 'name': 'file', + 'secure_filename': 'test.txt', + 'text': 'Hello, World!\n', + }, + { + 'content_type': 'application/octet-stream', + 'data': '1\n2\n3\n', + 'filename': 'test.dat', + 'name': 'another', + 'secure_filename': 'test.dat', + 'text': None, + }, + ] diff --git a/tests/test_media_urlencoded.py b/tests/test_media_urlencoded.py index be7458773..950dbdf0d 100644 --- a/tests/test_media_urlencoded.py +++ b/tests/test_media_urlencoded.py @@ -83,3 +83,12 @@ def test_urlencoded_form(client, body, expected): headers={'Content-Type': 'application/x-www-form-urlencoded'}, ) assert resp.json == expected + + +@pytest.mark.parametrize( + 'form', [{}, {'a': '1', 'b': '2'}, (('a', '1'), ('b', '2'), ('c', '3'))] +) +def test_simulate_form(client, form): + resp = client.simulate_post('/media', form=form) + assert resp.status_code == 200 + assert resp.json == dict(form)