-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FDS-2373] Update jwt verification + synpy update v4.4.0->v4.4.1 (#1493)
* Update jwt verification + synpy update v4.4.0->v4.4.1
- Loading branch information
1 parent
a7ece1d
commit c6db5bb
Showing
9 changed files
with
1,070 additions
and
888 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import logging | ||
from typing import Dict, Union | ||
|
||
from jwt import PyJWKClient, decode | ||
from jwt.exceptions import PyJWTError | ||
from synapseclient import Synapse | ||
|
||
from schematic.configuration.configuration import CONFIG | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
syn = Synapse( | ||
configPath=CONFIG.synapse_configuration_path, | ||
cache_client=False, | ||
) | ||
jwks_client = PyJWKClient( | ||
uri=syn.authEndpoint + "/oauth2/jwks", headers=syn._generate_headers() | ||
) | ||
|
||
|
||
def info_from_bearer_auth(token: str) -> Dict[str, Union[str, int]]: | ||
""" | ||
Authenticate user using bearer token. The token claims are decoded and returned. | ||
Example from: | ||
<https://pyjwt.readthedocs.io/en/stable/usage.html#retrieve-rsa-signing-keys-from-a-jwks-endpoint> | ||
Args: | ||
token (str): Bearer token. | ||
Returns: | ||
dict: Decoded token information. | ||
""" | ||
try: | ||
signing_key = jwks_client.get_signing_key_from_jwt(token) | ||
data = decode( | ||
jwt=token, | ||
key=signing_key.key, | ||
algorithms=[signing_key.algorithm_name], | ||
options={"verify_aud": False}, | ||
) | ||
|
||
return data | ||
except PyJWTError: | ||
logger.exception("Error decoding authentication token") | ||
# When the return type is None the web framework will return a 401 OAuthResponseProblem exception | ||
return None |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import jwt | ||
from cryptography.hazmat.backends import default_backend | ||
from cryptography.hazmat.primitives.asymmetric import rsa | ||
from pytest import LogCaptureFixture | ||
|
||
from schematic_api.api.security_controller import info_from_bearer_auth | ||
|
||
|
||
class TestSecurityController: | ||
def test_valid_synapse_token(self, syn_token: str) -> None: | ||
# GIVEN a valid synapse token | ||
assert syn_token is not None | ||
|
||
# WHEN the token is decoded | ||
decoded_token = info_from_bearer_auth(syn_token) | ||
|
||
# THEN the decoded claims are a dictionary | ||
assert isinstance(decoded_token, dict) | ||
assert "sub" in decoded_token | ||
assert decoded_token["sub"] is not None | ||
assert "token_type" in decoded_token | ||
assert decoded_token["token_type"] is not None | ||
|
||
def test_invalid_synapse_signing_key(self, caplog: LogCaptureFixture) -> None: | ||
# GIVEN an invalid synapse token | ||
private_key = rsa.generate_private_key( | ||
public_exponent=65537, key_size=2048, backend=default_backend() | ||
) | ||
|
||
random_token = jwt.encode( | ||
payload={"sub": "random"}, key=private_key, algorithm="RS256" | ||
) | ||
|
||
# WHEN the token is decoded | ||
decoded_token = info_from_bearer_auth(random_token) | ||
|
||
# THEN nothing is returned | ||
assert decoded_token is None | ||
|
||
# AND an error is logged | ||
assert ( | ||
"jwt.exceptions.PyJWKClientError: Unable to find a signing key that matches:" | ||
in caplog.text | ||
) | ||
|
||
def test_invalid_synapse_token_not_enough_parts( | ||
self, caplog: LogCaptureFixture | ||
) -> None: | ||
# GIVEN an invalid synapse token | ||
random_token = "invalid token" | ||
|
||
# WHEN the token is decoded | ||
decoded_token = info_from_bearer_auth(random_token) | ||
|
||
# THEN nothing is returned | ||
assert decoded_token is None | ||
|
||
# AND an error is logged | ||
assert "jwt.exceptions.DecodeError: Not enough segments" in caplog.text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters