From 2c70d6bf6fd41b2370bf7ea86ee7379e0c1e1327 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 3 May 2024 22:36:37 -0400 Subject: [PATCH] Stage 2: allow UPDATE/DELETE without index map Mostly unit test changes! --- .../resources/query_apis/datasets/__init__.py | 40 ++-- .../resources/query_apis/datasets/datasets.py | 23 +-- .../server/database/models/index_map.py | 6 +- .../test/unit/server/query_apis/conftest.py | 2 +- .../unit/server/query_apis/test_datasets.py | 186 +++++++++--------- 5 files changed, 127 insertions(+), 130 deletions(-) diff --git a/lib/pbench/server/api/resources/query_apis/datasets/__init__.py b/lib/pbench/server/api/resources/query_apis/datasets/__init__.py index 1595d41362..97122ad418 100644 --- a/lib/pbench/server/api/resources/query_apis/datasets/__init__.py +++ b/lib/pbench/server/api/resources/query_apis/datasets/__init__.py @@ -11,7 +11,7 @@ ParamType, ) from pbench.server.api.resources.query_apis import ElasticBase -from pbench.server.database.models.datasets import Dataset, Metadata +from pbench.server.database.models.datasets import Dataset from pbench.server.database.models.index_map import IndexMap from pbench.server.database.models.templates import Template @@ -95,40 +95,36 @@ def get_index( ) -> str: """Retrieve ES indices based on a given root_index_name. - Datasets marked "archiveonly" aren't indexed, and can't be referenced - in most APIs that rely on Elasticsearch. Instead, we'll raise a - CONFLICT error. + Datasets without an index can't be referenced in most APIs that rely on + Elasticsearch. Instead, we'll raise a CONFLICT error. However, the + /api/v1/datasets API will specify ok_no_index as they need to operate + on the dataset regardless of whether indexing is enabled. + + All indices are returned if root_index_name is omitted. Args: dataset: dataset object root_index_name: A root index name like "run-data" - ok_no_index: Don't fail on an archiveonly dataset + ok_no_index: Don't fail if dataset has no indices Raises: - APIAbort(CONFLICT) if indexing was disabled on the target dataset. - APIAbort(NOT_FOUND) if the dataset has no matching index data + APIAbort(NOT_FOUND) if index is required and the dataset has none Returns: A string that joins all selected indices with ",", suitable for use in an Elasticsearch query URI. """ - archive_only = Metadata.getvalue(dataset, Metadata.SERVER_ARCHIVE) - if archive_only: - if ok_no_index: - return "" - raise APIAbort(HTTPStatus.CONFLICT, "Dataset indexing was disabled") - - index_keys = list(IndexMap.indices(dataset, root_index_name)) + index_keys = IndexMap.indices(dataset, root_index_name) + if index_keys: + return ",".join(index_keys) + if ok_no_index: + return "" - if not index_keys: - raise APIAbort( - HTTPStatus.NOT_FOUND, - f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data", - ) - - indices = ",".join(index_keys) - return indices + raise APIAbort( + HTTPStatus.NOT_FOUND, + f"Dataset has no {root_index_name if root_index_name else 'indexed'!r} data", + ) def get_aggregatable_fields( self, mappings: JSON, prefix: AnyStr = "", result: Union[List, None] = None diff --git a/lib/pbench/server/api/resources/query_apis/datasets/datasets.py b/lib/pbench/server/api/resources/query_apis/datasets/datasets.py index 32c75de566..706dafbbcf 100644 --- a/lib/pbench/server/api/resources/query_apis/datasets/datasets.py +++ b/lib/pbench/server/api/resources/query_apis/datasets/datasets.py @@ -187,7 +187,7 @@ def assemble(self, params: ApiParams, context: ApiContext) -> JSONOBJECT: # # It's important that all context fields required for postprocessing # of unindexed datasets have been set before this! - indices = self.get_index(dataset, ok_no_index=(action != "get")) + indices = self.get_index(dataset, ok_no_index=True) context["indices"] = indices if not indices: return {} @@ -234,7 +234,7 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response: and some other data. We mostly want to determine whether it was 100% successful (before updating or deleting the dataset), but we also summarize the results for the client. - * For get, we directly return the "hit list". + * For get, we return a count of documents for each index name. Args: es_json: the Elasticsearch response document @@ -249,22 +249,17 @@ def postprocess(self, es_json: JSONOBJECT, context: ApiContext) -> Response: current_app.logger.info("POSTPROCESS {}: {}", dataset.name, es_json) failures = 0 if action == "get": - count = None hits = [] - try: - count = es_json["hits"]["total"]["value"] - hits = es_json["hits"]["hits"] - if int(count) == 0: + if es_json: + try: + hits = es_json["hits"]["hits"] + except KeyError as e: raise APIInternalError( - f"Elasticsearch returned no matches for {dataset.name}" + f"Can't find search service match data for {dataset.name} ({e}) in {es_json!r}", ) - except KeyError as e: - raise APIInternalError( - f"Can't find Elasticsearch match data for {dataset.name} ({e}) in {es_json!r}", - ) - except ValueError as e: + if not isinstance(hits, list): raise APIInternalError( - f"Elasticsearch bad hit count {count!r} for {dataset.name}: {e}", + f"search service did not return hits list ({type(hits).__name__})" ) results = defaultdict(int) for hit in hits: diff --git a/lib/pbench/server/database/models/index_map.py b/lib/pbench/server/database/models/index_map.py index b9b188db49..f6c396daeb 100644 --- a/lib/pbench/server/database/models/index_map.py +++ b/lib/pbench/server/database/models/index_map.py @@ -1,4 +1,4 @@ -from typing import Iterator, NewType, Optional +from typing import NewType, Optional from sqlalchemy import Column, ForeignKey, Integer, String from sqlalchemy.exc import SQLAlchemyError @@ -186,7 +186,7 @@ def merge(cls, dataset: Dataset, merge_map: IndexMapType): cls.commit(dataset, "merge") @staticmethod - def indices(dataset: Dataset, root: Optional[str] = None) -> Iterator[str]: + def indices(dataset: Dataset, root: Optional[str] = None) -> list[str]: """Return the indices matching the specified root index name. Args: @@ -207,7 +207,7 @@ def indices(dataset: Dataset, root: Optional[str] = None) -> Iterator[str]: except SQLAlchemyError as e: raise IndexMapSqlError(e, operation="indices", dataset=dataset, name=root) - return (i.index for i in map) + return [str(i.index) for i in map] @staticmethod def exists(dataset: Dataset) -> bool: diff --git a/lib/pbench/test/unit/server/query_apis/conftest.py b/lib/pbench/test/unit/server/query_apis/conftest.py index 2081f6ab6a..2b5b8a457d 100644 --- a/lib/pbench/test/unit/server/query_apis/conftest.py +++ b/lib/pbench/test/unit/server/query_apis/conftest.py @@ -33,7 +33,7 @@ def query_api( expected_index: str, expected_status: str, headers: Optional[dict] = None, - request_method=ApiMethod.POST, + request_method: ApiMethod = ApiMethod.POST, query_params: Optional[JSONOBJECT] = None, expect_call: Optional[bool] = None, **kwargs, diff --git a/lib/pbench/test/unit/server/query_apis/test_datasets.py b/lib/pbench/test/unit/server/query_apis/test_datasets.py index 1f32e9c748..68b80374ec 100644 --- a/lib/pbench/test/unit/server/query_apis/test_datasets.py +++ b/lib/pbench/test/unit/server/query_apis/test_datasets.py @@ -61,11 +61,11 @@ def _setup(self, client): "user,ao,idx,expected_status", [ ("drb", False, True, HTTPStatus.OK), - ("drb", False, False, HTTPStatus.NOT_FOUND), + ("drb", False, False, HTTPStatus.OK), ("drb", True, True, HTTPStatus.OK), ("drb", True, False, HTTPStatus.OK), ("test_admin", False, True, HTTPStatus.OK), - ("test_admin", False, False, HTTPStatus.NOT_FOUND), + ("test_admin", False, False, HTTPStatus.OK), ("test", False, False, HTTPStatus.FORBIDDEN), (None, False, False, HTTPStatus.UNAUTHORIZED), ], @@ -105,16 +105,17 @@ def test_empty_delete( else: IndexMap.delete(drb) + expect_a_call = expected_status == HTTPStatus.OK and idx response = query_api( - self.pbench_endpoint, - self.elastic_endpoint, + pbench_uri=self.pbench_endpoint, + es_uri=self.elastic_endpoint, payload=None, expected_index=index, expected_status=expected_status, + headers=headers, request_method=self.api_method, + expect_call=expect_a_call, json=EMPTY_DELDATE_RESPONSE, - headers=headers, - expect_call=(expected_status == HTTPStatus.OK and not ao), ) if response.status_code == HTTPStatus.OK: expected = { @@ -157,31 +158,28 @@ def test_empty_delete( Dataset.query(name="drb") else: # On failure, the dataset should still exist - drb = Dataset.query(name="drb") - ops = Metadata.getvalue(drb, "dataset.operations") - if expected_status == HTTPStatus.NOT_FOUND: - assert "FAILED" == ops["DELETE"]["state"] - else: - assert response.json["message"].endswith( - "is not authorized to DELETE a resource owned by drb with private access" - ) + Dataset.query(name="drb") + assert response.json["message"].endswith( + "is not authorized to DELETE a resource owned by drb with private access" + ) - # permission errors should be caught before auditing - assert len(Audit.query()) == 0 + # permission errors should be caught before auditing + assert len(Audit.query()) == 0 @pytest.mark.parametrize( - "user,ao,owner,access,expected_status", + "user,ao,idx,owner,access,expected_status", [ - ("drb", False, None, "public", HTTPStatus.OK), - ("drb", True, None, "public", HTTPStatus.OK), - ("test_admin", False, "test", None, HTTPStatus.OK), - ("test", False, None, "public", HTTPStatus.FORBIDDEN), - (None, False, None, "public", HTTPStatus.UNAUTHORIZED), - ("drb", False, "test", "public", HTTPStatus.FORBIDDEN), - ("drb", True, "test", None, HTTPStatus.FORBIDDEN), - ("test_admin", False, None, None, HTTPStatus.BAD_REQUEST), - ("test", False, "test", None, HTTPStatus.FORBIDDEN), - (None, False, "drb", None, HTTPStatus.UNAUTHORIZED), + ("drb", False, True, None, "public", HTTPStatus.OK), + ("drb", False, False, None, "public", HTTPStatus.OK), + ("drb", True, True, None, "public", HTTPStatus.OK), + ("test_admin", False, True, "test", None, HTTPStatus.OK), + ("test", False, True, None, "public", HTTPStatus.FORBIDDEN), + (None, False, True, None, "public", HTTPStatus.UNAUTHORIZED), + ("drb", False, True, "test", "public", HTTPStatus.FORBIDDEN), + ("drb", True, True, "test", None, HTTPStatus.FORBIDDEN), + ("test_admin", False, True, None, None, HTTPStatus.BAD_REQUEST), + ("test", False, True, "test", None, HTTPStatus.FORBIDDEN), + (None, False, True, "drb", None, HTTPStatus.UNAUTHORIZED), ], ) def test_update( @@ -191,6 +189,7 @@ def test_update( get_token_func, user, ao, + idx, owner, access, expected_status, @@ -208,12 +207,17 @@ def test_update( user_id = None drb = Dataset.query(name="drb") + index = None if ao: # Set archiveonly flag to disable index-map logic Metadata.setvalue(drb, Metadata.SERVER_ARCHIVE, True) - index = None - else: + + # If we want an index, build the expected path; otherwise make sure + # the dataset doesn't have one. + if idx: index = self.build_index_from_metadata() + else: + IndexMap.delete(drb) expected_owner = drb.owner_id original_owner = drb.owner_id @@ -238,16 +242,17 @@ def test_update( else: query = "" + expect_a_call = expected_status == HTTPStatus.OK and idx response = query_api( - f"/datasets/random_md5_string1{query}", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri=f"/datasets/random_md5_string1{query}", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=expected_status, + headers=headers, request_method=ApiMethod.POST, + expect_call=expect_a_call, json=EMPTY_DELDATE_RESPONSE, - headers=headers, - expect_call=(expected_status == HTTPStatus.OK and not ao), ) # Look up the post-update dataset @@ -342,15 +347,15 @@ def test_update_partial_failure(self, client, query_api, get_token_func): "failures": ["bad"], } response = query_api( - "/datasets/random_md5_string1?access=public", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1?access=public", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.OK, - request_method=ApiMethod.POST, - json=es_json, headers=headers, + request_method=ApiMethod.POST, expect_call=True, + json=es_json, ) expected = { "total": 2, @@ -386,20 +391,22 @@ def test_update_total_failure(self, client, query_api, get_token_func): "failures": ["bad", "me too"], } query_api( - "/datasets/random_md5_string1?access=public", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1?access=public", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.INTERNAL_SERVER_ERROR, - request_method=ApiMethod.POST, - json=es_json, headers=headers, + request_method=ApiMethod.POST, expect_call=True, + json=es_json, ) - @pytest.mark.parametrize("ao", (True, False)) + @pytest.mark.parametrize( + "ao,idx", ((True, True), (True, False), (False, True), (False, False)) + ) def test_get( - self, monkeypatch, more_datasets, client, query_api, build_auth_header, ao + self, monkeypatch, more_datasets, client, query_api, build_auth_header, ao, idx ): """Check on the GET summary behavior @@ -417,46 +424,45 @@ def test_get( ds = Dataset.query(name="drb") json = copy.deepcopy(self.empty_es_response_payload) + index = None if ao: # Set archiveonly flag to disable index-map logic Metadata.setvalue(ds, Metadata.SERVER_ARCHIVE, True) - index = None - if expected_status == HTTPStatus.OK: - expected_status = HTTPStatus.CONFLICT - monkeypatch.setattr(IndexMap, "indices", lambda _d: []) - expected = {} - else: - indices = list(IndexMap.indices(ds)) + + expected = {} + if idx: + indices = IndexMap.indices(ds) index = "/" + ",".join(indices) hits = [] - expected = {} for i, n in enumerate(indices): hits.append({"_index": n, "_id": i, "_source": {"data": f"{n}_{i}"}}) expected[n] = 1 - json["hits"]["total"]["value"] = str(len(hits)) + json["hits"]["total"]["value"] = len(hits) json["hits"]["hits"] = hits + else: + IndexMap.delete(ds) + expect_a_call = expected_status == HTTPStatus.OK and idx response = query_api( - self.pbench_endpoint, - "/_search?ignore_unavailable=true", - self.payload, - index, - expected_status, - request_method=ApiMethod.GET, + pbench_uri=self.pbench_endpoint, + es_uri="/_search?ignore_unavailable=true", + payload=self.payload, + expected_index=index, + expected_status=expected_status, headers=build_auth_header["header"], + request_method=ApiMethod.GET, + expect_call=expect_a_call, json=json, ) if expected_status == HTTPStatus.OK: assert expected == response.json - elif expected_status == HTTPStatus.CONFLICT: - assert {"message": "Dataset indexing was disabled"} == response.json else: assert { "message": "Unauthenticated client is not authorized to READ a resource owned by drb with private access" } == response.json - @pytest.mark.parametrize("value", (None, "not-integer", 0)) - def test_bad_get(self, client, query_api, get_token_func, value): + @pytest.mark.parametrize("hits", (None, 0, "string", {})) + def test_bad_get(self, client, query_api, get_token_func, hits): """Check a GET with bad Elasticsearch hit counts""" token = get_token_func("drb") @@ -474,20 +480,20 @@ def test_bad_get(self, client, query_api, get_token_func, value): "hits": [], }, } - if value is None: - del json["hits"]["total"]["value"] + if hits is None: + del json["hits"]["hits"] else: - json["hits"]["total"]["value"] = value + json["hits"]["hits"] = hits query_api( - self.pbench_endpoint, - "/_search?ignore_unavailable=true", - self.payload, - index, - HTTPStatus.INTERNAL_SERVER_ERROR, - request_method=ApiMethod.GET, + pbench_uri=self.pbench_endpoint, + es_uri="/_search?ignore_unavailable=true", + payload=self.payload, + expected_index=index, + expected_status=HTTPStatus.INTERNAL_SERVER_ERROR, headers=headers, - json=json, + request_method=ApiMethod.GET, expect_call=True, + json=json, ) def test_update_unstable(self, monkeypatch, client, query_api, get_token_func): @@ -507,14 +513,14 @@ def test_update_unstable(self, monkeypatch, client, query_api, get_token_func): ) response = query_api( - "/datasets/random_md5_string1?access=public", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1?access=public", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.CONFLICT, + headers=headers, request_method=ApiMethod.POST, json=EMPTY_DELDATE_RESPONSE, - headers=headers, ) assert {"message": "Dataset is working on INDEX"} == response.json @@ -535,14 +541,14 @@ def fails(_self, _dataset, _state): monkeypatch.setattr(Sync, "update", fails) query_api( - "/datasets/random_md5_string1?access=public", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1?access=public", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.INTERNAL_SERVER_ERROR, + headers=headers, request_method=ApiMethod.POST, json=EMPTY_DELDATE_RESPONSE, - headers=headers, ) def test_update_bad_final_sync( @@ -564,15 +570,15 @@ def fails( monkeypatch.setattr(Sync, "update", fails) query_api( - "/datasets/random_md5_string1?access=public", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1?access=public", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.INTERNAL_SERVER_ERROR, - request_method=ApiMethod.POST, - json=EMPTY_DELDATE_RESPONSE, headers=headers, + request_method=ApiMethod.POST, expect_call=True, + json=EMPTY_DELDATE_RESPONSE, ) def test_update_bad_update(self, monkeypatch, client, query_api, get_token_func): @@ -589,15 +595,15 @@ def fails(_s): monkeypatch.setattr(Dataset, "update", fails) query_api( - "/datasets/random_md5_string1?access=public", - "/_update_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1?access=public", + es_uri="/_update_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.INTERNAL_SERVER_ERROR, - request_method=ApiMethod.POST, - json=EMPTY_DELDATE_RESPONSE, headers=headers, + request_method=ApiMethod.POST, expect_call=True, + json=EMPTY_DELDATE_RESPONSE, ) def test_update_bad_delete(self, monkeypatch, client, query_api, get_token_func): @@ -614,13 +620,13 @@ def fails(_s): monkeypatch.setattr(Dataset, "delete", fails) query_api( - "/datasets/random_md5_string1", - "/_delete_by_query?ignore_unavailable=true&refresh=true", + pbench_uri="/datasets/random_md5_string1", + es_uri="/_delete_by_query?ignore_unavailable=true&refresh=true", payload=None, expected_index=index, expected_status=HTTPStatus.INTERNAL_SERVER_ERROR, - request_method=ApiMethod.DELETE, - json=EMPTY_DELDATE_RESPONSE, headers=headers, + request_method=ApiMethod.DELETE, expect_call=True, + json=EMPTY_DELDATE_RESPONSE, )