From 07be714c8fb9cd7d6c27dfa5eae2a9c1256e88d7 Mon Sep 17 00:00:00 2001 From: jsjiang Date: Mon, 23 Oct 2023 14:52:51 -0700 Subject: [PATCH] test mint, update, delete id --- ezidapp/management/commands/check-ezid.py | 71 +++++++++++++++-- impl/client_util.py | 94 +++++++++++++++++++++++ 2 files changed, 158 insertions(+), 7 deletions(-) create mode 100644 impl/client_util.py diff --git a/ezidapp/management/commands/check-ezid.py b/ezidapp/management/commands/check-ezid.py index 199bb873..a3df67a8 100644 --- a/ezidapp/management/commands/check-ezid.py +++ b/ezidapp/management/commands/check-ezid.py @@ -7,8 +7,6 @@ import argparse import logging -import pathlib -import re import django.conf import django.contrib.auth.models @@ -19,9 +17,19 @@ import ezidapp.models.datacenter import impl.nog_sql.ezid_minter import impl.nog_sql.util +import impl.client_util log = logging.getLogger(__name__) +record_1 = { + '_profile': 'datacite', + '_target': "https://google.com", + 'datacite.date': '2023', + 'datacite.type': 'Dataset', + 'datacite.title': 'test record on shoulder - ark:/99999/fk88', + 'datacite.publisher': 'test publisher', + 'datacite.creator': 'unknown', +} class Command(django.core.management.BaseCommand): help = __doc__ @@ -30,11 +38,6 @@ def __init__(self): super(Command, self).__init__() self.opt = None - def create_parser(self, *args, **kwargs): - parser = super(Command, self).create_parser(*args, **kwargs) - parser.formatter_class = argparse.RawTextHelpFormatter - return parser - def add_arguments(self, parser): # Misc parser.add_argument( @@ -47,6 +50,10 @@ def handle(self, *_, **opt): self.opt = opt = argparse.Namespace(**opt) impl.nog_sql.util.log_setup(__name__, opt.debug) + base_url = django.conf.settings.EZID_BASE_URL + admin_username = django.conf.settings.ADMIN_USERNAME + admin_password = django.conf.settings.ADMIN_PASSWORD + log.info('Checking EZID on Dev or Stg ...') on_prd = input('Are you on EZID-Dev or Stg? Yes/No\n') if on_prd.upper() != 'YES': @@ -112,6 +119,40 @@ def handle(self, *_, **opt): log.info('#### Create shoulder completed successfully') + username = 'apitest' + password = 'apitest' + prefix = 'ark:/99999/fk88' + self.test_shoulder_mint_cmd(prefix) + self.grant_user_access_to_shoulder(username, prefix) + shoulder, id_created, text = impl.client_util.mint_identifers(base_url, username, password, prefix, record_1) + if id_created is not None: + log.info(f'#### OK mint_id on {shoulder}, ID created: {id_created}') + print(f'#### OK mint_id on {shoulder}, ID created: {id_created}') + update_data = { + '_status': 'reserved', + } + + impl.client_util.update_identifier(base_url, admin_username, admin_password, id_created, update_data) + + impl.client_util.delete_identifier(base_url, username, password, id_created) + + else: + log.error(f'#### ERROR mint_id on {shoulder}') + print(f'#### ERROR mint_id on {shoulder}') + + + + + prefix = 'doi:10.5072/FK7' + self.test_shoulder_mint_cmd(prefix) + self.grant_user_access_to_shoulder(username, prefix) + + prefix = 'doi:10.31223/FK3' + self.test_shoulder_mint_cmd(prefix) + self.grant_user_access_to_shoulder(username, prefix) + + + def check_create_sholuder_and_minter(self, shoulder_type, prefix, org_name, datacenter_symbol=None, is_crossref=False, is_super_shoulder=False): if shoulder_type == 'ark': @@ -151,6 +192,12 @@ def check_create_sholuder_and_minter(self, shoulder_type, prefix, org_name, data return shoulder, minter + def test_shoulder_mint_cmd(self, prefix): + cmd = 'shoulder-mint' + cmd_args = [prefix, '--count', '2', '--update'] + django.core.management.call_command(cmd, *cmd_args) + + def delete_shoulder_minter(self, prefix): shoulder = ezidapp.models.shoulder.Shoulder.objects.filter(prefix=prefix) if shoulder.exists(): @@ -160,6 +207,16 @@ def delete_shoulder_minter(self, prefix): if minter.exists(): minter.delete() + def grant_user_access_to_shoulder(self, username, prefix): + try: + user = ezidapp.models.user.User.objects.get(username=username) + shoulder = ezidapp.models.shoulder.Shoulder.objects.get(prefix=prefix) + user.shoulders.add(shoulder) + except Exception as ex: + raise django.core.management.CommandError(f"Grant access to shoulder {prefix} for user '{username}' failed: {ex}") + + + diff --git a/impl/client_util.py b/impl/client_util.py new file mode 100644 index 00000000..7b6e0125 --- /dev/null +++ b/impl/client_util.py @@ -0,0 +1,94 @@ +import requests +import base64 +import re + +import impl.anvl + +def post_data(url, user_name, password, data): + data = impl.anvl.format(data) + # convert data to byte string + try: + data = data.encode('UTF-8') + except: + pass + + success = False + status_code = -1 + text = "" + err_msg = "" + + headers = { + "Content-Type": "text/plain; charset=UTF-8", + "Authorization": "Basic " + base64.b64encode(f"{user_name}:{password}".encode('utf-8')).decode('utf-8'), + } + try: + r = requests.post(url=url, headers=headers, data=data) + status_code = r.status_code + text = r.text + success = True + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response is not None: + status_code = e.response.status_code + err_msg = "HTTPError: " + str(e)[:200] + return success, status_code, text, err_msg + + +def mint_identifers(base_url, user_name, password, shoulder, data): + print(data) + url = f'{base_url}/shoulder/{shoulder}' + http_success, status_code, text, err_msg = post_data(url, user_name, password, data) + + id_created = None + if http_success: + # should return text as: + # success: doi:10.15697/FK27S78 | ark:/c5697/fk27s78 + # success: ark:/99999/fk4m631r0h + # error: bad request - no such shoulder created + if text.strip().startswith("success"): + list_1 = text.split(':', 1) + if len(list_1) > 1: + ids = list_1[1] + list_2 = ids.split("|") + if len(list_2) > 0: + id_created = list_2[0].strip() + + status = (shoulder, id_created, text) + + return status + +def update_identifier(base_url, user_name, password, id, data): + url = f"{base_url}/id/{id}" + http_success, status_code, text, err_msg = post_data(url, user_name, password, data) + if http_success and status_code == 200: + print(f"ok update identifier - {id} updated with new data: {data}") + else: + print(f"ERROR update identifier - update {id} failed - status_code: {status_code}: {text}: {err_msg}") + + +def delete_identifier(base_url, user_name, password, id): + url = f'{base_url}/id/{id}' + success = False + status_code = -1 + text = "" + err_msg = "" + + headers = { + "Content-Type": "text/plain; charset=UTF-8", + "Authorization": "Basic " + base64.b64encode(f"{user_name}:{password}".encode('utf-8')).decode('utf-8'), + } + try: + r = requests.delete(url=url, headers=headers) + status_code = r.status_code + text = r.text + success = True + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response is not None: + status_code = e.response.status_code + err_msg = "HTTPError: " + str(e)[:200] + + if success and status_code == 200: + print(f"ok delete identifier - {id} ") + else: + print(f"ERROR delete identifier - update {id} failed - status_code: {status_code}: {text}: {err_msg}") + +