Skip to content

Commit

Permalink
feat(testing): add a new parameter to simulate form
Browse files Browse the repository at this point in the history
  • Loading branch information
vytas7 committed Dec 29, 2023
1 parent 5872d3d commit 6845e5a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 211 deletions.
268 changes: 57 additions & 211 deletions falcon/testing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import asyncio
import datetime as dt
import hashlib
import inspect
import json as json_module
import os
import time
from typing import Dict
from typing import Optional
Expand All @@ -34,8 +34,8 @@

from falcon.asgi_spec import ScopeType
from falcon.constants import COMBINED_METHODS
from falcon.constants import MEDIA_JSON, MEDIA_URLENCODED
from falcon.errors import CompatibilityError, HTTPBadRequest
from falcon.constants import MEDIA_JSON, MEDIA_MULTIPART, MEDIA_URLENCODED
from falcon.errors import CompatibilityError
from falcon.testing import helpers
from falcon.testing.srmock import StartResponseMock
from falcon.util import async_to_sync
Expand Down Expand Up @@ -439,8 +439,7 @@ def simulate_request(
content_type=None,
body=None,
json=None,
files=None,
data=None,
form=None,
file_wrapper=None,
wsgierrors=None,
params=None,
Expand Down Expand Up @@ -532,30 +531,6 @@ def simulate_request(
overrides `body` and sets the Content-Type header to
``'application/json'``, overriding any value specified by either
the `content_type` or `headers` arguments.
Note:
Can only be used if data and files are null, otherwise an exception
is thrown.
files(dict): same as the files parameter in requests,
dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``)
for multipart encoding upload.
``file-tuple``: can be a 2-tuple ``('filename', fileobj)`` or a
3-tuple ``('filename', fileobj, 'content_type')``
where ``'content-type'`` is a string defining the content
type of the given file.
Note:
If both data and json are present, an exception is thrown.
To pass additional form-data with files, use data.
data : list of tuples, dict or (b)string, with additional data
to be passed with files (or alone if files is null), to be treated
as urlencoded form data.
Note:
If both data and json are present, an exception is thrown.
file_wrapper (callable): Callable that returns an iterable,
to be used as the value for *wsgi.file_wrapper* in the
WSGI environ (default: ``None``). This can be used to test
Expand Down Expand Up @@ -603,8 +578,7 @@ def simulate_request(
content_type=content_type,
body=body,
json=json,
files=files,
data=data,
form=form,
params=params,
params_csv=params_csv,
protocol=protocol,
Expand All @@ -628,8 +602,7 @@ def simulate_request(
headers,
body,
json,
files,
data,
form,
extras,
)

Expand Down Expand Up @@ -683,8 +656,7 @@ async def _simulate_request_asgi(
content_type=None,
body=None,
json=None,
files=None,
data=None,
form=None,
params=None,
params_csv=True,
protocol='http',
Expand Down Expand Up @@ -770,29 +742,9 @@ async def _simulate_request_asgi(
overrides `body` and sets the Content-Type header to
``'application/json'``, overriding any value specified by either
the `content_type` or `headers` arguments.
Note:
Can only be used if data and files are null, otherwise an exception
is thrown.
files(dict): same as the files parameter in requests,
dictionary of ``'name': file-like-objects`` (or ``{'name': file-tuple}``)
for multipart encoding upload.
``file-tuple``: can be a 2-tuple ``('filename', fileobj)`` or a
3-tuple ``('filename', fileobj, 'content_type')``,
where ``'content-type'`` is a string defining the content
type of the given file.
Mote:
If both files and json are present, an exception is thrown. To pass
additional form-data with files, use data.
data : list of tuples, dict or (b)string with additional data to be passed with
files (or alone if files is null), to be treated as urlencoded form data.
Note:
If both data and json are present, an exception is thrown.
form (dict): A form to submit as the request's body
(default: ``None``). If present, overrides `body`, and sets the
Content-Type header.
host(str): A string to use for the hostname part of the fully
qualified request URL (default: 'falconframework.org')
remote_addr (str): A string to use as the remote IP address for the
Expand Down Expand Up @@ -831,8 +783,7 @@ async def _simulate_request_asgi(
headers,
body,
json,
files,
data,
form,
extras,
)

Expand Down Expand Up @@ -2195,153 +2146,57 @@ async def __aexit__(self, exc_type, exc, tb):
await self._task_req


def _prepare_data_fields(data, boundary=None, urlenc=False):
"""Prepare data fields for request body.
Args:
data: dict or list of tuples with json data from the request
Returns: list of 2-tuples (field-name(str), value(bytes))
"""
urlresult = []
body_part = b''
if isinstance(data, (str, bytes)) or hasattr(data, 'read'):
try:
fields = list(json_module.loads(data).items())
except ValueError:
# if it's not a json, then treat as body
return data
elif isinstance(data, dict):
fields = list(data.items())
else:
fields = list(data)

# Append data to the other multipart parts
for field, val in fields:
if isinstance(val, str) or not hasattr(val, '__iter__'):
val = [val]
# if no files are passed, make urlencoded form
if urlenc:
for v in val:
if v:
urlresult.append(
(
field.encode('utf-8') if isinstance(field, str) else field,
v.encode('utf-8') if isinstance(v, str) else v,
)
)
# if files and data are passed, concat data to files body like in requests
else:
for v in val:
body_part += (
f'Content-Disposition: form-data; name={field}; '
f'\r\n\r\n'.encode()
)
if v:
if not isinstance(v, bytes):
v = str(v)
body_part += v.encode('utf-8') if isinstance(v, str) else v
body_part += b'\r\n--' + boundary.encode() + b'\r\n'
else:
body_part += b'\r\n--' + boundary.encode() + b'\r\n'

return body_part if not urlenc else urlencode(urlresult, doseq=True)


def _prepare_files(k, v):
"""Prepare file attributes for body of request form.
Args:
k: (str), file-name
v: fileobj or tuple (filename, data, content_type?)
def _encode_form(form: dict) -> tuple:
"""Build the body for a URL-encoded or multipart form.
Returns: file_name, file_data, file_content_type
This utility method accepts two types of forms: a simple dict mapping
string keys to values will get URL-encoded, whereas if any value is a list
of two or three items, these will be treated as (filename, content) or
(filename, content, content_type), and encoded as a multipart form.
Returns: (encoded body bytes, Content-Type header)
"""
file_content_type = None
if not v:
raise ValueError(f'No file provided for {k}')
if isinstance(v, (tuple, list)):
if len(v) == 2:
file_name, file_data = v
else:
file_name, file_data, file_content_type = v
if (
len(v) == 3
and file_content_type
and file_content_type.startswith('multipart/mixed')
):
file_data, new_header = _encode_files(json_module.loads(file_data.decode()))
file_content_type = 'multipart/mixed; ' + (
new_header['Content-Type'].split('; ')[1]
)
else:
# if v is not a tuple or iterable it has to be a filelike obj
name = getattr(v, 'name', None)
if name and isinstance(name, str) and name[0] != '<' and name[-1] != '>':
file_name = os.path.basename(name)
else:
file_name = k
file_data = v
if hasattr(file_data, 'read'):
file_data = file_data.read()
return file_name, file_data, file_content_type

form_items = form.items() if isinstance(form, dict) else form

Check warning on line 2159 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2159

Added line #L2159 was not covered by tests

def _make_boundary():
"""
Create random boundary to be used in multipart/form-data with files.
"""
boundary = os.urandom(16).hex()
return boundary
if not any(isinstance(value, (list, tuple)) for _, value in form_items):
# URL-encoded form
return urlencode(form, doseq=True).encode(), MEDIA_URLENCODED

Check warning on line 2163 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2163

Added line #L2163 was not covered by tests

# Encode multipart form
body = [b'']

Check warning on line 2166 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2166

Added line #L2166 was not covered by tests

def _encode_files(files, data=None):
"""Build the body for a multipart/form-data request.
for name, value in form_items:
data = value
filename = None
content_type = 'text/plain'

Check warning on line 2171 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2169-L2171

Added lines #L2169 - L2171 were not covered by tests

Will successfully encode files when passed as a dict or a list of
tuples. ``data`` fields are added first.
The tuples may be 2-tuples (filename, fileobj) or
3-tuples (filename, fileobj, contentype).
Allows for content_type = ``multipart/mixed`` for submission of nested files
if isinstance(value, (list, tuple)):
try:
filename, data = value
content_type = 'application/octet-stream'
except ValueError:
filename, data, content_type = value

Check warning on line 2178 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2174-L2178

Added lines #L2174 - L2178 were not covered by tests
if isinstance(data, str):
data = data.encode()

Check warning on line 2180 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2180

Added line #L2180 was not covered by tests
elif not isinstance(data, bytes):
# Assume a file-like object
data = data.read()

Check warning on line 2183 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2183

Added line #L2183 was not covered by tests

Returns: (encoded body string, headers dict)
"""
boundary = _make_boundary()
body_string = b'--' + boundary.encode() + b'\r\n'
header = {'Content-Type': 'multipart/form-data; boundary=' + boundary}

# Deal with the files tuples
if not isinstance(files, (dict, list)):
raise ValueError('cannot encode objects that are not 2-tuples')
elif isinstance(files, dict):
files = list(files.items())

for (k, v) in files:
file_name, file_data, file_content_type = _prepare_files(k, v)
if not file_data:
continue

body_string += f'Content-Disposition: form-data; name={k}; '.encode()
body_string += (
f'filename={file_name}\r\n'.encode() if file_name else '\r\n'.encode()
)
body_string += (
f'Content-Type: {file_content_type or "text/plain"}\r\n\r\n'.encode()
)
body_string += (
file_data.encode('utf-8') if isinstance(file_data, str) else file_data
)
body_string += b'\r\n--' + boundary.encode() + b'\r\n'
headers = f'Content-Disposition: form-data; name="{name}"'

Check warning on line 2185 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2185

Added line #L2185 was not covered by tests
if filename:
headers += f'; filename="{filename}"'
headers += f'\r\nContent-Type: {content_type}\r\n\r\n'

Check warning on line 2188 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2187-L2188

Added lines #L2187 - L2188 were not covered by tests

# Handle whatever json data gets passed along with files
if data:
body_string += _prepare_data_fields(data, boundary)
body.append(headers.encode() + data + b'\r\n')

Check warning on line 2190 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2190

Added line #L2190 was not covered by tests

body_string = body_string[:-2] + b'--\r\n'
checksum = hashlib.sha256()

Check warning on line 2192 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2192

Added line #L2192 was not covered by tests
for chunk in body:
checksum.update(chunk)
boundary = checksum.hexdigest()

Check warning on line 2195 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2194-L2195

Added lines #L2194 - L2195 were not covered by tests

return body_string, header
encoded = f'--{boundary}\r\n'.encode().join(body)
encoded += f'--{boundary}--\r\n'.encode()
return encoded, f'{MEDIA_MULTIPART}; boundary={boundary}'

Check warning on line 2199 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2197-L2199

Added lines #L2197 - L2199 were not covered by tests


def _prepare_sim_args(
Expand All @@ -2353,8 +2208,7 @@ def _prepare_sim_args(
headers,
body,
json,
files,
data,
form,
extras,
):
if not path.startswith('/'):
Expand Down Expand Up @@ -2384,24 +2238,16 @@ def _prepare_sim_args(
headers = headers or {}
headers['Content-Type'] = content_type

if files or data:
if json:
raise HTTPBadRequest(
description='Cannot process both json and (files or data) args'
)
elif files:
body, headers = _encode_files(files, data)
else:
body = _prepare_data_fields(data, None, True)
headers = headers or {}
if not headers:
headers['Content-Type'] = MEDIA_URLENCODED

elif json is not None:
body = json_module.dumps(json, ensure_ascii=False)
headers = headers or {}
headers['Content-Type'] = MEDIA_JSON

elif form is not None:
body, content_type = _encode_form(form)
headers = headers or {}
headers['Content-Type'] = content_type

Check warning on line 2249 in falcon/testing/client.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/client.py#L2247-L2249

Added lines #L2247 - L2249 were not covered by tests

return path, query_string, headers, body, extras


Expand Down
Loading

0 comments on commit 6845e5a

Please sign in to comment.