From 80f35c8b245964562879130158eefdc52708d6f0 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Wed, 31 Jan 2024 16:58:54 -0500 Subject: [PATCH 1/2] Document requests and responses --- scripts/archive_api.py | 438 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 410 insertions(+), 28 deletions(-) diff --git a/scripts/archive_api.py b/scripts/archive_api.py index f2389b3..5c2628f 100644 --- a/scripts/archive_api.py +++ b/scripts/archive_api.py @@ -49,9 +49,39 @@ async def lifespan(app: FastAPI): await httpClient.aclose() await archiveClient.close() -app = FastAPI(lifespan=lifespan) +app = FastAPI(lifespan=lifespan, + title="Archive API", + summary="REST API for the Hopskotch Archive") -@app.get("/health_check") +@app.get("/health_check", + description="Used to check whether the API and its connections to upstream systems are \ + functioning correctly.", + responses={ + 200: { + "description": "The API itself is functioning", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "DatabaseOK": { + "type": "boolean", + "description": "Metadata database connection is functioning" + }, + "ObjectStoreOK": { + "type": "boolean", + "description": "Object store connection is functioning" + }, + "HopAuthOK": { + "type": "boolean", + "description": "Hopskotch API connection is functioning" + }, + } + } + } + } + } + }) async def health_check(): try: # we don't care if there is a message with this UUID, @@ -77,6 +107,57 @@ async def health_check(): "HopAuthOK": hop_auth_ok, } +messageRecordSchema = { + "type": "object", + "properties": { + "message": { + "type": "string", + "format": "binary", + "description": "The raw mesage body" + }, + "metadata": { + "type": "object", + "description": "Kafka metadata for the message", + "properties": { + "timestamp": { + "type": "number", + "description": "The time at which the message was sent to the Kafka broker" + }, + "headers": { + "type": "object", + "description": "Kafka message headers", + "patternProperties": { + ".*": { + "type": "string", + "format": "binary", + } + } + }, + "key": { + "type": "string", + "format": "binary", + "description": "Kafka message key" + }, + } + }, + "annotations": { + "type": "object", + "description": "Metadata added by the archive", + "properties": { + "con_message_crc32": { + "type": "number", + "description": "The CRC32 of the message data" + }, + "con_text_uuid": { + "type": "string", + "format": "uuid", + "description": "The message UUID as a string" + }, + } + }, + } + } + def effective_topic_name_for_access(topic_name: str): """ Compute the topic name which should be used for querying access control. @@ -102,19 +183,77 @@ def effective_topic_name_for_access(topic_name: str): effective_name = topic_name return effective_name -def default_not_authorized(): +def authentication_required(): return JSONResponse(status_code=401, headers={"WWW-Authenticate": "SCRAM-SHA-512 realm=\"default@dev.hop.scimma.org\""}, - content={"message":"Not Authorized"}) + content={"message":"Authentication required"}) async def stream_s3_response(result): async for chunk in result['Body']: yield chunk -@app.get("/msg/{msg_id}") -async def fetch_message(msg_id: Annotated[str, Path(title="The ID of message item to get")], - authorization: Annotated[Union[str, None], Header()] = None, +@app.get("/msg/{msg_id}", + description="Retrieve a single stored message from the archive, by UUID. " + "Authentication via SCRAM is required to fetch non-public messages. ", + response_class=Response, + responses={ + 200: { + "description": "The requested message. Note that the format is BSON.", + "content": { + "application/bson": { + "schema": messageRecordSchema + } + } + }, + 400: { + "description": "Bad request. This may be caused by an ill-formed message ID.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Invalid Message ID"] + } + } + } + }, + 401: { + "description": "Authentication required.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Authentication required"] + } + } + } + }, + 403: { + "description": "Not authorized. The authenticated user does not have access to the requested message.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Operation not permitted"] + } + } + } + }, + 404: { + "description": "Message not found. There is no archived message with the specified ID.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Message not found"] + } + } + } + }, + }) +async def fetch_message(msg_id: Annotated[str, Path(description="The ID of message item to get", + json_schema_extra={"type": "string", "format": "uuid", "examples": ["457E145B-9F72-416F-87CA-73F0E188183D"]})], + authorization: Annotated[Union[str, None], Header(description="RFC 7804 SCRAM authentication")] = None, ): resp_headers={} @@ -141,7 +280,7 @@ async def fetch_message(msg_id: Annotated[str, Path(title="The ID of message ite # Query hopauth to find out if the user is allowed to read this message. # This requires authenticating the user to hopauth. if authorization is None: - return default_not_authorized(); + return authentication_required(); auth_query_url = f"{config['hop_auth_api_root']}/v1/current_credential/permissions/topic/" \ f"{effective_topic_name_for_access(metadata.topic)}" @@ -157,7 +296,7 @@ async def fetch_message(msg_id: Annotated[str, Path(title="The ID of message ite # as the client may be expecting the authentication-info! allowed_ops = resp.json()["allowed_operations"] - if not isinstance(allowed_ops, collections.Sequence): + if not isinstance(allowed_ops, collections.abc.Sequence): return Response(status_code=500, content="Internal Error", headers=resp_headers) if not "Read" in allowed_ops: return Response(status_code=403, content="Operation not permitted", headers=resp_headers) @@ -385,10 +524,67 @@ class BSONType(enum.Enum): return -@app.get("/msg/{msg_id}/raw_file/{file_name}") +@app.get("/msg/{msg_id}/raw_file/{file_name}", + description="Retrieve the raw body of a single stored message from the archive, by UUID. " + "The data will be treated as if it were a file with the specified name. " + "Kafka headers and other metadata are not available via this mechanism. " + "Authentication via SCRAM is required to fetch non-public messages.", + response_class=Response, + responses={ + 200: { + "description": "The requested message body.", + "content": { + "application/octet-stream": {} + } + }, + 400: { + "description": "Bad request. This may be caused by an ill-formed message ID.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Invalid Message ID"], + } + } + } + }, + 401: { + "description": "Authentication required.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Authentication required"], + } + } + } + }, + 403: { + "description": "Not authorized. The authenticated user does not have access to the requested message.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Operation not permitted"], + } + } + } + }, + 404: { + "description": "Message not found. There is no archived message with the specified ID.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Message not found"], + } + } + } + }, + }) async def fetch_raw_message(msg_id: Annotated[str, Path(title="The ID of message item to get")], file_name: Annotated[str, Path(title="Name to treat the payload as having")], - authorization: Annotated[Union[str, None], Header()] = None, + authorization: Annotated[Union[str, None], Header(description="RFC 7804 SCRAM authentication")] = None, ): resp_headers={} @@ -416,7 +612,7 @@ async def fetch_raw_message(msg_id: Annotated[str, Path(title="The ID of message # Query hopauth to find out if the user is allowed to read this message. # This requires authenticating the user to hopauth. if authorization is None: - return default_not_authorized(); + return authentication_required(); auth_query_url = f"{config['hop_auth_api_root']}/v1/current_credential/permissions/topic/" \ f"{effective_topic_name_for_access(metadata.topic)}" @@ -432,7 +628,7 @@ async def fetch_raw_message(msg_id: Annotated[str, Path(title="The ID of message # as the client may be expecting the authentication-info! allowed_ops = resp.json()["allowed_operations"] - if not isinstance(allowed_ops, collections.Sequence): + if not isinstance(allowed_ops, collections.abc.Sequence): return Response(status_code=500, content="Internal Error", headers=resp_headers) if not "Read" in allowed_ops: return Response(status_code=403, content="Operation not permitted", headers=resp_headers) @@ -512,14 +708,87 @@ async def stream_message_list(archiveClient, db_records, next_offset): yield b"\x00\x00" # NUL terminators for the array and the overall document -@app.get("/topic/{topic_name}") +@app.get("/topic/{topic_name}", + description="Retrieve messages which were published on a given topic during a specified time range. " + "Time ranges are specified as Kafka times (milliseconds since the unix epoch). " #TODO: is this always UTC? + "Results are returned in chunks, so users should expect to repeat requests to " + "obtain all messages in a time range. " + "A response with an empty list of messages indicates a lack of further messages " + "(i.e. after the requested offset) in the specified range. " + "Responses will contain messages in time-order. " + "Authentication via SCRAM is required to fetch non-public messages.", + response_class=Response, + responses={ + 200: { + "description": "A block of messages which were published during the requested period. " + "Note that the format is BSON.", + "content": { + "application/bson": { + "schema": { + "type": "object", + "properties": { + "next_offset": { + "type": "number", + "format": "integer", + "description": "This is the value which should be specified as " + "the offset in a following request to fetch the " + "next block of messages." + }, + "messages": { + "type": "array", + "items": messageRecordSchema, + } + } + } + } + } + }, + 400: { + "description": "Bad request. This may be caused by an an invalid time range or " + "requesting a non-existent topic.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Invalid time range", + "Invalid message count limit", + "Invalid message offset"], + } + } + } + }, + 401: { + "description": "Authentication required.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Authentication required"], + } + } + } + }, + 403: { + "description": "Not authorized. The authenticated user does not have access to the " + "requested topic.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Operation not permitted"], + } + } + } + }, + }) async def fetch_time_range(topic_name: Annotated[str, - Path(title="The name of the topic from which to read")], - start: Annotated[int, Query(title="Timestamp for start of time range")], - end: Annotated[int, Query(title="Timestamp for end of time range")], - limit: Annotated[int, Query(title="Maximum number of messages to return at a tim")] = 10, - offset: Annotated[int, Query(title="Offset of first message to return")] = 0, - authorization: Annotated[Union[str, None], Header()] = None, + Path(description="The name of the topic from which to read")], + start: Annotated[int, Query(description="Timestamp for start of time range")], + end: Annotated[int, Query(description="Timestamp for end of time range")], + limit: Annotated[int, Query(description="Maximum number of messages to return at a time. " + "If too large a value is specified, the server may replace it with its own limit.")] = 10, + offset: Annotated[int, Query(description="Offset of first message to return")] = 0, + authorization: Annotated[Union[str, None], Header(description="RFC 7804 SCRAM authentication")] = None, ): resp_headers={} if end 16: limit = 16 + if offset < 0: + return Response(status_code=400, content="Invalid message offset") + # Query hopauth to find out if the user is allowed to read this topic. # This requires authenticating the user to hopauth. if authorization is None: - return default_not_authorized(); + return authentication_required(); auth_query_url = f"{config['hop_auth_api_root']}/v1/current_credential/permissions/topic/" \ f"{effective_topic_name_for_access(topic_name)}" @@ -543,6 +815,10 @@ async def fetch_time_range(topic_name: Annotated[str, return Response(status_code=401, content=resp.content, headers={"www-authenticate": resp.headers["www-authenticate"]}) if resp.status_code != 200: + if resp.status_code >= 200 and resp.status_code <=499: + # It would be nice to be more informative here, but Django makes it awkward for + # the hopauth code to send things other than error 400 when anything goes wrong. + return Response(status_code=400, content="Bad Request") return Response(status_code=500, content="Internal Error") if "authentication-info" in resp.headers: resp_headers["authentication-info"] = resp.headers["authentication-info"] @@ -550,7 +826,7 @@ async def fetch_time_range(topic_name: Annotated[str, # as the client may be expecting the authentication-info! allowed_ops = resp.json()["allowed_operations"] - if not isinstance(allowed_ops, collections.Sequence): + if not isinstance(allowed_ops, collections.abc.Sequence): return Response(status_code=500, content="Internal Error", headers=resp_headers) if not "Read" in allowed_ops: return Response(status_code=403, content="Operation not permitted", headers=resp_headers) @@ -572,10 +848,116 @@ def _is_bytes_like(obj): except: return False -@app.post("/topic/{topic_name}") +@app.post("/topic/{topic_name}", + description="Publish a message directly to the archive. " + "Messages should generally be sent directly via the Kafka brokers, but large " + "messages and 'attachement' data can be uploaded directly via this mechanism, " + "and then referenced in smaller messages sent directly. " + "Authentication is required to publish messages, and authorization is required " + "for writing to the target topic.", + response_class=Response, + openapi_extra={ + "requestBody": { + "content": { + "application/bson": { + "schema": { + "type": "object", + "properties": { + "message": { + "type": "string", + "format": "binary", + "description": "The body of the message." + }, + "headers": { + "type": "array", + "description": "The Kafka headers attached to the message. " + "This must be either an array of 2-tuples mapping " + "strings to binary blobs, or an equivalent dictionary/object.", + "items": { + "type": "array", + "minItems": 2, + "maxItems": 2, + "items": { + "type": "string", + "format": "binary", + } + } + }, + "key": { + "type": "string", + "format": "binary", + "description": "The Kafka key for the message." + } + }, + "required": ["message"], + } + } + } + } + }, + status_code=201, + responses={ + 201: { + "description": "Message stored successfully.", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + 400: { + "description": "Bad request. This may be caused by an ill-formed request body.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": [ + "Unsupported key {key} in request body", + "Missing message key in request body", + "Header with key {key} is not binary data", + "Message key is not binary or a string", + ] + } + } + } + }, + 401: { + "description": "Authentication required.", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + 403: { + "description": "Not authorized. The authenticated user does not have access to write to the target topic.", + "content": { + "text/plain": { + "schema": { + "type": "string", + "examples": ["Operation not permitted"], + } + } + } + }, + 422: { + "description": "Message could not be stored", + "content": { + "text/plain": { + "schema": { + "type": "string" + } + } + } + }, + }) async def write_message(request: Request, - topic_name: Annotated[str, Path(title="The name of the topic to which to write")], - authorization: Annotated[Union[str, None], Header()] = None): + topic_name: Annotated[str, Path(description="The name of the topic to which to write")], + authorization: Annotated[Union[str, None], Header(description="RFC 7804 SCRAM authentication")] = None): if config["read_only"]: return Response(status_code=501, content="Server is configured as read-only; " @@ -584,7 +966,7 @@ async def write_message(request: Request, # Query hopauth to find out if the user is allowed to write to this topic. # This requires authenticating the user to hopauth. if authorization is None: - return default_not_authorized(); + return authentication_required(); topic_name = unquote(topic_name) auth_query_url = f"{config['hop_auth_api_root']}/v1/current_credential/permissions/topic/" \ @@ -602,7 +984,7 @@ async def write_message(request: Request, # as the client may be expecting the authentication-info! allowed_ops = resp.json()["allowed_operations"] - if not isinstance(allowed_ops, collections.Sequence): + if not isinstance(allowed_ops, collections.abc.Sequence): return Response(status_code=500, content="Internal Error", headers=resp_headers) if not "Write" in allowed_ops: return Response(status_code=403, content="Operation not permitted", headers=resp_headers) @@ -630,7 +1012,7 @@ async def write_message(request: Request, payload = data["message"] if "headers" in data: headers = data["headers"] - if isinstance(headers, collections.Mapping): + if isinstance(headers, collections.abc.Mapping): headers = list(headers.items()) for key, value in headers: if not _is_bytes_like(value): From a777fa2d98819e177fea9a219cd9f98bc7da2a49 Mon Sep 17 00:00:00 2001 From: "C. Weaver" Date: Mon, 12 Feb 2024 18:33:13 -0500 Subject: [PATCH 2/2] Do not mark uploads public if the taregt topic is not. Sanity-check the structure of hop_auth responses more carefully. --- scripts/archive_api.py | 56 +++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/scripts/archive_api.py b/scripts/archive_api.py index 5c2628f..d90fe93 100644 --- a/scripts/archive_api.py +++ b/scripts/archive_api.py @@ -13,7 +13,7 @@ import uuid from contextlib import asynccontextmanager from io import BytesIO -from urllib.parse import unquote +from urllib.parse import unquote, urlparse import httpx import bson @@ -969,25 +969,55 @@ async def write_message(request: Request, return authentication_required(); topic_name = unquote(topic_name) - auth_query_url = f"{config['hop_auth_api_root']}/v1/current_credential/permissions/topic/" \ - f"{effective_topic_name_for_access(topic_name)}" + path_root=urlparse(config['hop_auth_api_root']).path + print("Sending request to hopauth with authorization header:", authorization) + resp = await httpClient.post(config['hop_auth_api_root']+"/v1/multi", + json={ + "ops":{ + "method":"get", + "path":f"{path_root}/v1/current_credential/permissions/topic/{topic_name}", + "headers":{"Authorization": "Inherit"}, + }, + "topic":{ + "method":"get", + "path":f"{path_root}/v1/topics/{topic_name}", + "headers":{"Authorization": "Inherit"}, + }, + }, + headers={"Authorization": authorization} + ) - resp = await httpClient.get(auth_query_url, headers={"Authorization": authorization}) if resp.status_code == 401 and "www-authenticate" in resp.headers: return Response(status_code=401, content=resp.content, headers={"www-authenticate": resp.headers["www-authenticate"]}) if resp.status_code != 200: - return Response(status_code=500, content="Internal Error") + return Response(status_code=500, content="Internal Error: hop_auth API request failed") if "authentication-info" in resp.headers: resp_headers["authentication-info"] = resp.headers["authentication-info"] # After this point it is important to always return a response with resp_headers # as the client may be expecting the authentication-info! - - allowed_ops = resp.json()["allowed_operations"] - if not isinstance(allowed_ops, collections.abc.Sequence): - return Response(status_code=500, content="Internal Error", headers=resp_headers) - if not "Write" in allowed_ops: + + hop_json = resp.json() + if not isinstance(hop_json, collections.abc.Mapping) \ + or "ops" not in hop_json or not isinstance(hop_json["ops"], collections.abc.Mapping) \ + or "status" not in hop_json["ops"] \ + or "topic" not in hop_json or not isinstance(hop_json["topic"], collections.abc.Mapping) \ + or "status" not in hop_json["topic"]: + return Response(status_code=500, content="Internal Error: Malformed response from hop_auth API") + if hop_json["ops"]["status"]!=200 or hop_json["topic"]["status"]!=200: + return Response(status_code=500, content="Internal Error: hop_auth API sub-request failed") + + if "body" not in hop_json["ops"] or not isinstance(hop_json["ops"]["body"], collections.abc.Mapping) \ + or "allowed_operations" not in hop_json["ops"]["body"] \ + or not isinstance(hop_json["ops"]["body"]["allowed_operations"], collections.abc.Sequence) \ + or "body" not in hop_json["topic"] or not isinstance(hop_json["topic"]["body"], collections.abc.Mapping) \ + or "publicly_readable" not in hop_json["topic"]["body"]: + return Response(status_code=500, content="Internal Error: Malformed response from hop_auth API") + + if "Write" not in hop_json["ops"]["body"]["allowed_operations"]: return Response(status_code=403, content="Operation not permitted", headers=resp_headers) + + message_is_public = hop_json["topic"]["body"]["publicly_readable"] # at this point we know the user is allowed to write, so we process the data that was sent @@ -1032,10 +1062,8 @@ async def write_message(request: Request, headers=resp_headers) key = data["key"] metadata = hop.io.Metadata(topic_name, 0, 0, timestamp, key, headers, None) - # TODO: This marks all direct uploads as public; - # in general we should set this based on whether the target topic is public. - stored, reason = await archiveClient.store_message(payload, metadata, - public=True, direct_upload=True) + stored, reason = await archiveClient.store_message(payload, metadata, public=message_is_public, + direct_upload=True) if stored: return Response(status_code=201, headers=resp_headers)