Skip to content

Commit

Permalink
Remove duplicated code
Browse files Browse the repository at this point in the history
  • Loading branch information
C. Weaver committed Feb 10, 2024
1 parent d88c1e2 commit db27c41
Showing 1 changed file with 1 addition and 132 deletions.
133 changes: 1 addition & 132 deletions scimma_admin/hopskotch_auth/api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,137 +109,6 @@ def do_scram_first(client_first: str):
ex.save()
return (ex,s)

def do_scram_final(client_final: str, sid: Optional[str]=None):
"""
Return: If successful, the (completed) SCRAMExchange and the SCRAM server object
"""
if sid:
print("Client supplied sid:",sid)
ex = SCRAMExchange.objects.get(sid=sid)
else:
# a bit ugly: To find the previously started exchange session, if any, we need to extract
# the nonce from the request. We can either reimplement the parsing logic, or underhandedly
# reach inside of scramp to use its parse function. We do the latter.
try:
parsed = scramp.core._parse_message(client_final, "client final", "crp")
except:
return Response(status=status.HTTP_400_BAD_REQUEST)
ex = SCRAMExchange.objects.get(j_nonce=parsed['r'])
# recreate the SCRAM server state from our stored exchange record
s = scramp.ScramMechanism("SCRAM-SHA-512").make_server(scram_user_lookup,
s_nonce=ex.s_nonce())
s.set_client_first(ex.client_first)
s.get_server_first() # waste of time, but scramp requires this to be called
# if we reach this point, we are ready to process the second half of the exchange
s.set_client_final(client_final)
# if scramp hasn't objected, the authentication has now succeeded
return (ex,s)

def parse_list_header(header: str):
return [v[1:-1] if v[0] == v[-1] == '"' else v for v in parse_http_list(header)]

def parse_dict_header(header: str):
def unquote(v: str):
return v[1:-1] if v[0] == v[-1] == '"' else v
d = dict()
for item in parse_list_header(header):
if '=' in item:
k, v = item.split('=', 1)
d[k] = unquote(v)
else:
d[k] = None
return d

class ScramState(object):
def __init__(self, mech, sid, s):
self.mech = mech
self.sid = sid
self.s = s

class ScramAuthentication(BaseAuthentication):
def authenticate(self, request):
auth_header = get_authorization_header(request)
if not auth_header or len(auth_header)==0:
return None
try:
auth_header=auth_header.decode("utf-8")
except:
raise AuthenticationFailed("Malformed authentication header")

if not auth_header.upper().startswith("SCRAM-"):
return None
m = re.fullmatch("(SCRAM-[A-Z0-9-]+) *([^ ].*)", auth_header, flags=re.IGNORECASE)
if not m:
raise AuthenticationFailed("Malformed SCRAM authentication header")
scram_mech=m.group(1).upper()
auth_data = parse_dict_header(m.group(2))
if "data" in auth_data and "sid" in auth_data:
# If we have both of these we are in the final phase of the SCRAM handshake
sid = auth_data.get("sid")
data = auth_data.get("data")
if not sid or not data:
raise AuthenticationFailed("Malformed SCRAM authentication header")
client_final=base64.b64decode(data).decode("utf-8")
ex,s = do_scram_final(client_final, sid)
request.META["scram_state"]=ScramState(scram_mech, sid, s)
return (ex.cred.owner, ex.cred)
# Otherwise, SCRAM has not yet succeeded
return None

def authenticate_header(self, request):
auth_header = get_authorization_header(request)
if not auth_header or len(auth_header)==0:
return "SCRAM-SHA-512"
try:
auth_header=auth_header.decode("utf-8")
except:
raise AuthenticationFailed("Malformed SCRAM authentication header")
if auth_header.upper().startswith("SCRAM-"):
m = re.fullmatch("(SCRAM-[A-Z0-9-]+) *([^ ].*)", auth_header, flags=re.IGNORECASE)
if not m:
return "SCRAM-SHA-512"
scram_mech=m.group(1).upper()
auth_data = parse_dict_header(m.group(2))
if not auth_data.get("data", None):
return "SCRAM-SHA-512"
client_first=base64.b64decode(auth_data.get("data")).decode("utf-8")
try:
# This function will only be called during the SCRAM first phase, so we do that
ex, s = do_scram_first(client_first)
sfirst=base64.b64encode(s.get_server_first().encode("utf-8")).decode('utf-8')
return f"{scram_mech} sid={ex.sid}, data={sfirst}"
except (scramp.ScramException):
raise AuthenticationFailed("SCRAM authentication failed")

def set_scram_auth_info_header(get_response):
def middleware(request):
response = get_response(request)
scram_state = request.META.get("scram_state", None)
if scram_state:
sfinal=base64.b64encode(scram_state.s.get_server_final().encode("utf-8")).decode('utf-8')
response["Authentication-Info"]=f"{scram_state.mech} sid={scram_state.sid}, data={sfinal}"
return response
return middleware

def do_scram_first(client_first: str):
"""
Return: a tuple (sid, server first data)
"""
# all credentials we issue are SHA-512
s = scramp.ScramMechanism("SCRAM-SHA-512").make_server(scram_user_lookup)
s.set_client_first(client_first)

# If scramp did not complain, the exchange can proceed.
# First, we record the state so that it can be picked up later.
ex = SCRAMExchange()
ex.cred = SCRAMCredentials.objects.get(username=s.user)
ex.j_nonce = s.nonce
ex.s_nonce_len = len(s.s_nonce)
ex.client_first = client_first
ex.began = datetime.datetime.now(datetime.timezone.utc)
ex.save()
return (ex,s)

def do_scram_final(client_final: str, sid: Optional[str]=None):
"""
Return: If successful, the (completed) SCRAMExchange and the SCRAM server
Expand Down Expand Up @@ -476,7 +345,7 @@ def header_transform(name):
if "body" in rdata:
# Icky Hack: DRF wants to decode JSON for us, so we must re-encode the
# sub-request's body to be decoded. . . again.
# If there's a way to tell DRF to do no parsing on this request (beacuse it
# If there's a way to tell DRF to do no parsing on this request (because it
# already did it), that could be much more efficient
try:
raw_body = json.dumps(rdata["body"]).encode("utf-8")
Expand Down

0 comments on commit db27c41

Please sign in to comment.