Skip to content

Commit

Permalink
Add option to validate if a token will expire in the near future
Browse files Browse the repository at this point in the history
  • Loading branch information
KrKOo committed May 29, 2024
1 parent b894fe2 commit 5f97815
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
2 changes: 1 addition & 1 deletion snakemake_executor_plugin_auth_tes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def tes_access_token(self):
if not self.do_oidc_auth:
return self.workflow.executor_settings.token

if self.auth_client.is_token_expired(self._access_token):
if self.auth_client.is_token_expired(self._access_token, 300):
refresh_result = self.auth_client.refresh_access_token(self._refresh_token)

self._access_token = refresh_result["access_token"]
Expand Down
8 changes: 6 additions & 2 deletions snakemake_executor_plugin_auth_tes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ def __init__(self, client_id, client_secret, oidc_url):
self.client_id, self.client_secret
)

def is_token_expired(self, token):
def is_token_expired(self, token, time_offset=0):
jwks_client = jwt.PyJWKClient(self.jwks_url)
header = jwt.get_unverified_header(token)
key = jwks_client.get_signing_key(header["kid"]).key

try:
jwt.decode(token, key, [header["alg"]], options={"verify_aud": False})
data = jwt.decode(
token, key, [header["alg"]], options={"verify_aud": False}
)
if data["exp"] - time_offset:
return True
except jwt.ExpiredSignatureError:
return True

Expand Down

0 comments on commit 5f97815

Please sign in to comment.