diff --git a/gitlab_api/gitlab_api.py b/gitlab_api/gitlab_api.py index 9701b9f..3bb8bca 100644 --- a/gitlab_api/gitlab_api.py +++ b/gitlab_api/gitlab_api.py @@ -2048,13 +2048,13 @@ def protect_branch(self, **kwargs): if protected_branch.data: response = self._session.post(url=f'{self.url}/projects/{protected_branch.project_id}' - f'/protected_branches{protected_branch.branch_filter}', + f'/protected_branches{protected_branch.api_parameters}', headers=self.headers, data=json.dumps(protected_branch.data, indent=2), verify=self.verify) else: response = self._session.post(url=f'{self.url}/projects/{protected_branch.project_id}' - f'/protected_branches{protected_branch.branch_filter}', + f'/protected_branches{protected_branch.api_parameters}', headers=self.headers, verify=self.verify) return response @@ -2530,7 +2530,7 @@ def get_project_runners(self, **kwargs): raise MissingParameterError try: response = self._session.get(url=f'{self.url}/projects/{runner.project_id}' - f'/runners{runner.runner_filter}', + f'/runners{runner.api_parameters}', headers=self.headers, verify=self.verify) except ValidationError as e: diff --git a/gitlab_api/gitlab_models.py b/gitlab_api/gitlab_models.py index 5be2f2d..56f5051 100644 --- a/gitlab_api/gitlab_models.py +++ b/gitlab_api/gitlab_models.py @@ -199,7 +199,7 @@ def validate_state(cls, v): Raises: - ValueError: If 'state' is provided and not a valid state. """ - if v is not None and v not in ['pending', 'running', 'success', 'failed', 'canceled']: + if v is not None and v.lower() not in ['pending', 'running', 'success', 'failed', 'canceled']: raise ValueError("Invalid states") return v @@ -217,7 +217,7 @@ def validate_line_type(cls, v): Raises: - ValueError: If 'line_type' is provided and not a valid line type. """ - if v is not None and v not in ['new', 'old']: + if v is not None and v.lower() not in ['new', 'old']: raise ValueError("Invalid line_type") return v @@ -235,7 +235,7 @@ def validate_report_type(cls, v): Raises: - ValueError: If 'report_type' is provided and not a valid report type. """ - if v is not None and v not in ['license_scanning', 'code_coverage']: + if v is not None and v.lower() not in ['license_scanning', 'code_coverage']: raise ValueError("Invalid report_type") return v @@ -253,7 +253,7 @@ def validate_rule_type(cls, v): Raises: - ValueError: If 'rule_type' is provided and not a valid rule type. """ - if v is not None and v not in ['any_approver', 'regular']: + if v is not None and v.lower() not in ['any_approver', 'regular']: raise ValueError("Invalid rule_type") return v @@ -289,38 +289,57 @@ def construct_data_dict(cls, values): Raises: - ValueError: If no key is present in the data dictionary. """ - data = { - "branch": values.get("branch"), - "commit_message": values.get("commit_message"), - "start_branch": values.get("start_branch"), - "start_sha": values.get("start_sha"), - "start_project": values.get("start_project"), - "actions": values.get("actions"), - "author_email": values.get("author_email"), - "author_name": values.get("author_name"), - "stats": values.get("stats"), - "force": values.get("force"), - "note": values.get("note"), - "path": values.get("path"), - "line": values.get("line"), - "line_type": values.get("line_type"), - "state": values.get("state"), - "ref": values.get("ref"), - "name": values.get("name"), - "context": values.get("context"), - "target_url": values.get("target_url"), - "description": values.get("description"), - "coverage": values.get("coverage"), - "pipeline_id": values.get("pipeline_id"), - } + data = {} - # Remove None values - data = {k: v for k, v in data.items() if v is not None} + if 'branch' in values: + data['branch'] = values.get("branch") + if 'commit_message' in values: + data['commit_message'] = values.get("commit_message") + if 'start_branch' in values: + data['start_branch'] = values.get("start_branch") + if 'start_sha' in values: + data['start_sha'] = values.get("start_sha") + if 'start_project' in values: + data['start_project'] = values.get("start_project") + if 'actions' in values: + data['actions'] = values.get("actions") + if 'author_email' in values: + data['author_email'] = values.get("author_email") + if 'author_name' in values: + data['author_name'] = values.get("author_name") + if 'stats' in values: + data['stats'] = values.get("stats") + if 'force' in values: + data['force'] = values.get("force") + if 'note' in values: + data['note'] = values.get("note") + if 'path' in values: + data['path'] = values.get("path") + if 'line' in values: + data['line'] = values.get("line") + if 'line_type' in values: + data['line_type'] = values.get("line_type") + if 'state' in values: + data['state'] = values.get("state") + if 'reference' in values: + data['ref'] = values.get("reference") + if 'name' in values: + data['name'] = values.get("name") + if 'context' in values: + data['context'] = values.get("context") + if 'target_url' in values: + data['target_url'] = values.get("target_url") + if 'description' in values: + data['description'] = values.get("description") + if 'coverage' in values: + data['coverage'] = values.get("coverage") + if 'pipeline_id' in values: + data['pipeline_id'] = values.get("pipeline_id") - if not data: - raise ValueError("At least one key is required in the data dictionary.") + data = {k: v for k, v in data.items() if v is not None} - return data + values['data'] = data + return values class DeployTokenModel(BaseModel): @@ -340,12 +359,12 @@ class DeployTokenModel(BaseModel): The class includes field_validator functions for specific attribute validations. """ project_id: Union[int, str] = None - group_id: Union[int, str] = None - token: str = None - name: str = None - expires_at: str = None - username: str = None - scopes: str = None + group_id: Optional[Union[int, str]] = None + token: Optional[str] = None + name: Optional[str] = None + expires_at: Optional[str] = None + username: Optional[str] = None + scopes: Optional[str] = None @field_validator('expires_at') def validate_expires_at(cls, v): @@ -380,8 +399,8 @@ def validate_optional_parameters(cls, v, values): Raises: - MissingParameterError: If the parameter is provided and 'project_id' and 'group_id' are None. """ - if ('project_id' in values or 'group_id' in values) and v is not None: - return v + if ('project_id' in values.lower() or 'group_id' in values.lower()) and v is not None: + return v.lower() else: raise MissingParameterError @@ -419,7 +438,7 @@ def validate_scopes(cls, v): """ valid_scopes = ['read_repository', 'read_registry', 'write_registry', 'read_package_registry', 'write_package_registry'] - if v is not None and v not in valid_scopes: + if v is not None and v.lower() not in valid_scopes: raise ParameterError return v @@ -439,9 +458,9 @@ class GroupModel(BaseModel): The class includes field_validator functions for specific attribute validations. """ group_id: Union[int, str] = None - per_page: int = 100 - page: int = 1 - argument: str = 'state=opened' + per_page: Optional[int] = 100 + page: Optional[int] = 1 + argument: Optional[str] = 'state=opened' api_parameters: Optional[str] = "" @field_validator('per_page', 'page') @@ -513,11 +532,9 @@ def build_api_parameters(cls, values): Constructs API parameters based on provided values. """ filters = [] - - if values.get("page") is not None: + if 'page' in values: filters.append(f'page={values["page"]}') - - if values.get("per_page") is not None: + if 'per_page' in values: filters.append(f'per_page={values["per_page"]}') if filters: @@ -545,11 +562,11 @@ class JobModel(BaseModel): """ project_id: Union[int, str] = None job_id: Union[int, str] = None - scope: List[str] = None - per_page: int = 100 - page: int = 1 - include_retried: bool = None - job_variable_attributes: Dict = None + scope: Optional[List[str]] = None + per_page: Optional[int] = 100 + page: Optional[int] = 1 + include_retried: Optional[bool] = None + job_variable_attributes: Optional[Dict] = None api_parameters: Optional[str] = "" @field_validator('per_page', 'page') @@ -602,10 +619,10 @@ def validate_scope(cls, v): Raises: - ParameterError: If 'scope' contains invalid values. """ - if v not in ['created', 'pending', 'running', 'failed', 'success', 'canceled', 'skipped', - 'waiting_for_resource', 'manual']: + if v.lower() not in ['created', 'pending', 'running', 'failed', 'success', 'canceled', 'skipped', + 'waiting_for_resource', 'manual']: raise ParameterError - return v + return v.lower() @field_validator('job_variable_attributes') def validate_job_variable_attributes(cls, v): @@ -640,14 +657,11 @@ def build_api_parameters(cls, values): Constructs API parameters based on provided values. """ filters = [] - - if values.get("page") is not None: + if 'page' in values: filters.append(f'page={values["page"]}') - - if values.get("per_page") is not None: + if 'per_page' in values: filters.append(f'per_page={values["per_page"]}') - - if values.get("scope") is not None: + if 'scope' in values: filters.append(f'scope[]={values["scope"]}') if filters: @@ -670,10 +684,10 @@ class MembersModel(BaseModel): Note: The class includes field_validator functions for specific attribute validations. """ - group_id: Union[int, str] = None - project_id: Union[int, str] = None - per_page: int = 100 - page: int = 1 + group_id: Optional[Union[int, str]] = None + project_id: Optional[Union[int, str]] = None + per_page: Optional[int] = 100 + page: Optional[int] = 1 api_parameters: Optional[str] = "" @field_validator('per_page', 'page') @@ -709,11 +723,9 @@ def build_api_parameters(cls, values): Constructs API parameters based on provided values. """ filters = [] - - if values.get("page") is not None: + if 'page' in values: filters.append(f'page={values["page"]}') - - if values.get("per_page") is not None: + if 'per_page' in values: filters.append(f'per_page={values["per_page"]}') if filters: @@ -902,6 +914,41 @@ def build_api_parameters(cls, values): if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + + data = {} + + if 'source_branch' in values: + data['source_branch'] = values.get("source_branch") + if 'target_branch' in values: + data['target_branch'] = values.get("target_branch") + if 'title' in values: + data['title'] = values.get("title") + if 'allow_collaboration' in values: + data['allow_collaboration'] = values.get("allow_collaboration") + if 'allow_maintainer_to_push' in values: + data['allow_maintainer_to_push'] = values.get("allow_maintainer_to_push") + if 'approvals_before_merge' in values: + data['approvals_before_merge'] = values.get("approvals_before_merge") + if 'assignee_id' in values: + data['assignee_id'] = values.get("assignee_id") + if 'description' in values: + data['description'] = values.get("description") + if 'labels' in values: + data['labels'] = values.get("labels") + if 'milestone_id' in values: + data['milestone_id'] = values.get("milestone_id") + if 'remove_source_branch' in values: + data['remove_source_branch'] = values.get("remove_source_branch") + if 'reviewer_ids' in values: + data['reviewer_ids'] = values.get("reviewer_ids") + if 'squash' in values: + data['squash'] = values.get("squash") + if 'target_project_id' in values: + data['target_project_id'] = values.get("target_project_id") + + data = {k: v for k, v in data.items() if v is not None} + + values['data'] = data return values @field_validator("scope") @@ -919,9 +966,9 @@ def validate_scope(cls, value): - ValueError: If 'value' is not a valid scope. """ valid_scopes = ['created_by_me', 'assigned_to_me', 'all'] - if value and not all(scope in valid_scopes for scope in value): + if value and not all(scope in valid_scopes for scope in value.lower()): raise ValueError("Invalid scope values") - return value + return value.lower() @field_validator("search_in") def validate_search_in(cls, value): @@ -938,9 +985,9 @@ def validate_search_in(cls, value): - ValueError: If 'value' is not a valid search_in value. """ valid_search_in = ['title', 'description', 'title,description'] - if value and value not in valid_search_in: + if value and value.lower() not in valid_search_in: raise ValueError("Invalid search_in value") - return value + return value.lower() @field_validator("search_exclude") def validate_search_exclude(cls, value): @@ -958,9 +1005,9 @@ def validate_search_exclude(cls, value): """ valid_search_exclude = ['labels', 'milestone', 'author_id', 'assignee_id', 'author_username', 'reviewer_id', 'reviewer_username', 'my_reaction_emoji'] - if value and value not in valid_search_exclude: + if value and value.lower() not in valid_search_exclude: raise ValueError("Invalid search_exclude value") - return value + return value.lower() @field_validator("state") def validate_state(cls, value): @@ -978,9 +1025,9 @@ def validate_state(cls, value): """ valid_states = ['opened', 'closed', 'locked', 'merged'] - if value and value not in valid_states: + if value and value.lower() not in valid_states: raise ValueError("Invalid state value") - return value + return value.lower() @field_validator("sort") def validate_sort(cls, value): @@ -997,9 +1044,9 @@ def validate_sort(cls, value): - ValueError: If 'value' is not a valid sort value. """ valid_sorts = ['asc', 'desc'] - if value and value not in valid_sorts: + if value and value.lower() not in valid_sorts: raise ValueError("Invalid sort value") - return value + return value.lower() @field_validator("wip") def validate_wip(cls, value): @@ -1016,9 +1063,9 @@ def validate_wip(cls, value): - ValueError: If 'value' is not a valid wip value. """ valid_wip_values = ['yes', 'no'] - if value and value not in valid_wip_values: + if value and value.lower() not in valid_wip_values: raise ValueError("Invalid wip value") - return value + return value.lower() @field_validator('source_branch', 'target_branch', 'title') def validate_string(cls, v): @@ -1092,45 +1139,6 @@ def validate_list_of_integers(cls, v): raise ParameterError return v - @field_validator("data") - def construct_data_dict(cls, values): - """ - Construct a data dictionary. - - Args: - - values: Dictionary of values. - - Returns: - - The constructed data dictionary. - - Raises: - - ValueError: If the data dictionary is empty. - """ - data = { - "source_branch": values.get("source_branch"), - "target_branch": values.get("target_branch"), - "title": values.get("title"), - "allow_collaboration": values.get("allow_collaboration"), - "allow_maintainer_to_push": values.get("allow_maintainer_to_push"), - "approvals_before_merge": values.get("approvals_before_merge"), - "assignee_id": values.get("assignee_id"), - "description": values.get("description"), - "labels": values.get("labels"), - "milestone_id": values.get("milestone_id"), - "remove_source_branch": values.get("remove_source_branch"), - "reviewer_ids": values.get("reviewer_ids"), - "squash": values.get("squash"), - "target_project_id": values.get("target_project_id"), - } - - # Remove None values - data = {k: v for k, v in data.items() if v is not None} - - if not data: - raise ValueError("At least one key is required in the data dictionary.") - - return data - class MergeRequestRuleModel(BaseModel): """ @@ -1230,7 +1238,7 @@ def validate_rule_type(cls, value): raise ValueError("Invalid rule_type") return value - @field_validator("data") + @model_validator(mode="before") def construct_data_dict(cls, values): """ Construct a data dictionary. @@ -1244,24 +1252,30 @@ def construct_data_dict(cls, values): Raises: - ValueError: If the data dictionary is empty. """ - data = { - "approvals_required": values.get("approvals_required"), - "name": values.get("name"), - "applies_to_all_protected_branches": values.get("applies_to_all_protected_branches"), - "group_ids": values.get("group_ids"), - "protected_branch_ids": values.get("protected_branch_ids"), - "report_type": values.get("report_type"), - "rule_type": values.get("rule_type"), - "user_ids": values.get("user_ids"), - } + data = {} + + if 'approvals_required' in values: + data['approvals_required'] = values.get("approvals_required") + if 'name' in values: + data['name'] = values.get("name") + if 'applies_to_all_protected_branches' in values: + data['applies_to_all_protected_branches'] = values.get("applies_to_all_protected_branches") + if 'group_ids' in values: + data['group_ids'] = values.get("group_ids") + if 'protected_branch_ids' in values: + data['protected_branch_ids'] = values.get("protected_branch_ids") + if 'report_type' in values: + data['report_type'] = values.get("report_type") + if 'rule_type' in values: + data['rule_type'] = values.get("rule_type") + if 'user_ids' in values: + data['user_ids'] = values.get("user_ids") # Remove None values data = {k: v for k, v in data.items() if v is not None} - if not data: - raise ValueError("At least one key is required in the data dictionary.") - - return data + values['data'] = data + return values class PackageModel(BaseModel): @@ -1290,11 +1304,11 @@ class PackageModel(BaseModel): - Example 2: Another example of usage. """ project_id: Union[int, str] = None - package_name: str = None - package_version: str = None - file_name: str = None - status: str = None - select: str = None + package_name: Optional[str] = None + package_version: Optional[str] = None + file_name: Optional[str] = None + status: Optional[str] = None + select: Optional[str] = None api_parameters: Optional[str] = "" @model_validator(mode="before") @@ -1312,9 +1326,12 @@ def build_api_parameters(cls, values): - None. """ filters = [] - - if values.get("status") is not None: + if 'status' in values: filters.append(f'status={values["status"]}') + if 'per_page' in values: + filters.append(f'per_page={values["per_page"]}') + if 'reference' in values: + filters.append(f'ref={values["reference"]}') if filters: api_parameters = "?" + "&".join(filters) @@ -1573,6 +1590,176 @@ def build_api_parameters(cls, values): if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + + data = {} + + if 'allow_merge_on_skipped_pipeline' in values: + data['allow_merge_on_skipped_pipeline'] = values.get("allow_merge_on_skipped_pipeline") + if 'allow_pipeline_trigger_approve_deployment' in values: + data['allow_pipeline_trigger_approve_deployment'] = values.get("allow_pipeline_trigger_approve_deployment") + if 'only_allow_merge_if_all_status_checks_passed' in values: + data['only_allow_merge_if_all_status_checks_passed'] = ( + values.get("only_allow_merge_if_all_status_checks_passed")) + if 'analytics_access_level' in values: + data['analytics_access_level'] = values.get("analytics_access_level") + if 'approvals_before_merge' in values: + data['approvals_before_merge'] = values.get("approvals_before_merge") + if 'auto_cancel_pending_pipelines' in values: + data['auto_cancel_pending_pipelines'] = values.get("auto_cancel_pending_pipelines") + if 'auto_devops_deploy_strategy' in values: + data['auto_devops_deploy_strategy'] = values.get("auto_devops_deploy_strategy") + if 'auto_devops_enabled' in values: + data['auto_devops_enabled'] = values.get("auto_devops_enabled") + if 'autoclose_referenced_issues' in values: + data['autoclose_referenced_issues'] = values.get("autoclose_referenced_issues") + if 'avatar' in values: + data['avatar'] = values.get("avatar") + if 'build_git_strategy' in values: + data['build_git_strategy'] = values.get("build_git_strategy") + if 'build_timeout' in values: + data['build_timeout'] = values.get("build_timeout") + if 'builds_access_level' in values: + data['builds_access_level'] = values.get("builds_access_level") + if 'ci_config_path' in values: + data['ci_config_path'] = values.get("ci_config_path") + if 'container_registry_access_level' in values: + data['container_registry_access_level'] = values.get("container_registry_access_level") + if 'container_registry_enabled' in values: + data['container_registry_enabled'] = values.get("container_registry_enabled") + if 'default_branch' in values: + data['default_branch'] = values.get("default_branch") + if 'description' in values: + data['description'] = values.get("description") + if 'emails_disabled' in values: + data['emails_disabled'] = values.get("emails_disabled") + if 'emails_enabled' in values: + data['emails_enabled'] = values.get("emails_enabled") + if 'enforce_auth_checks_on_uploads' in values: + data['enforce_auth_checks_on_uploads'] = values.get("enforce_auth_checks_on_uploads") + if 'environments_access_level' in values: + data['environments_access_level'] = values.get("environments_access_level") + if 'external_authorization_classification_label' in values: + data['external_authorization_classification_label'] = ( + values.get("external_authorization_classification_label")) + if 'feature_flags_access_level' in values: + data['feature_flags_access_level'] = values.get("feature_flags_access_level") + if 'forking_access_level' in values: + data['forking_access_level'] = values.get("forking_access_level") + if 'group_runners_enabled' in values: + data['group_runners_enabled'] = values.get("group_runners_enabled") + if 'group_with_project_templates_id' in values: + data['group_with_project_templates_id'] = values.get("group_with_project_templates_id") + if 'import_url' in values: + data['import_url'] = values.get("import_url") + if 'infrastructure_access_level' in values: + data['infrastructure_access_level'] = values.get("infrastructure_access_level") + if 'initialize_with_readme' in values: + data['initialize_with_readme'] = values.get("initialize_with_readme") + if 'issue_branch_template' in values: + data['issue_branch_template'] = values.get("issue_branch_template") + if 'issues_access_level' in values: + data['issues_access_level'] = values.get("issues_access_level") + if 'issues_enabled' in values: + data['issues_enabled'] = values.get("issues_enabled") + if 'jobs_enabled' in values: + data['jobs_enabled'] = values.get("jobs_enabled") + if 'lfs_enabled' in values: + data['lfs_enabled'] = values.get("lfs_enabled") + if 'merge_commit_template' in values: + data['merge_commit_template'] = values.get("merge_commit_template") + if 'merge_method' in values: + data['merge_method'] = values.get("merge_method") + if 'merge_requests_access_level' in values: + data['merge_requests_access_level'] = values.get("merge_requests_access_level") + if 'merge_requests_enabled' in values: + data['merge_requests_enabled'] = values.get("merge_requests_enabled") + if 'mirror_trigger_builds' in values: + data['mirror_trigger_builds'] = values.get("mirror_trigger_builds") + if 'mirror' in values: + data['mirror'] = values.get("mirror") + if 'model_experiments_access_level' in values: + data['model_experiments_access_level'] = values.get("model_experiments_access_level") + if 'model_registry_access_level' in values: + data['model_registry_access_level'] = values.get("model_registry_access_level") + if 'monitor_access_level' in values: + data['monitor_access_level'] = values.get("monitor_access_level") + if 'namespace_id' in values: + data['namespace_id'] = values.get("namespace_id") + if 'only_allow_merge_if_all_discussions_are_resolved' in values: + data['only_allow_merge_if_all_discussions_are_resolved'] = ( + values.get("only_allow_merge_if_all_discussions_are_resolved")) + if 'only_allow_merge_if_all_status_checks_passed' in values: + data['only_allow_merge_if_all_status_checks_passed'] = ( + values.get("only_allow_merge_if_all_status_checks_passed")) + if 'only_allow_merge_if_pipeline_succeeds' in values: + data['only_allow_merge_if_pipeline_succeeds'] = values.get("only_allow_merge_if_pipeline_succeeds") + if 'packages_enabled' in values: + data['packages_enabled'] = values.get("packages_enabled") + if 'pages_access_level' in values: + data['pages_access_level'] = values.get("pages_access_level") + if 'path' in values: + data['path'] = values.get("path") + if 'printing_merge_request_link_enabled' in values: + data['printing_merge_request_link_enabled'] = values.get("printing_merge_request_link_enabled") + if 'public_builds' in values: + data['public_builds'] = values.get("public_builds") + if 'public_jobs' in values: + data['public_jobs'] = values.get("public_jobs") + if 'releases_access_level' in values: + data['releases_access_level'] = values.get("releases_access_level") + if 'repository_object_format' in values: + data['repository_object_format'] = values.get("repository_object_format") + if 'remove_source_branch_after_merge' in values: + data['remove_source_branch_after_merge'] = values.get("remove_source_branch_after_merge") + if 'repository_access_level' in values: + data['repository_access_level'] = values.get("repository_access_level") + if 'repository_storage' in values: + data['repository_storage'] = values.get("repository_storage") + if 'request_access_enabled' in values: + data['request_access_enabled'] = values.get("request_access_enabled") + if 'requirements_access_level' in values: + data['requirements_access_level'] = values.get("requirements_access_level") + if 'resolve_outdated_diff_discussions' in values: + data['resolve_outdated_diff_discussions'] = values.get("resolve_outdated_diff_discussions") + if 'security_and_compliance_access_level' in values: + data['security_and_compliance_access_level'] = values.get("security_and_compliance_access_level") + if 'shared_runners_enabled' in values: + data['shared_runners_enabled'] = values.get("shared_runners_enabled") + if 'show_default_award_emojis' in values: + data['show_default_award_emojis'] = values.get("show_default_award_emojis") + if 'snippets_access_level' in values: + data['snippets_access_level'] = values.get("snippets_access_level") + if 'snippets_enabled' in values: + data['snippets_enabled'] = values.get("snippets_enabled") + if 'squash_commit_template' in values: + data['squash_commit_template'] = values.get("squash_commit_template") + if 'squash_option' in values: + data['squash_option'] = values.get("squash_option") + if 'suggestion_commit_message' in values: + data['suggestion_commit_message'] = values.get("suggestion_commit_message") + if 'tag_list' in values: + data['tag_list'] = values.get("tag_list") + if 'template_name' in values: + data['template_name'] = values.get("template_name") + if 'topics' in values: + data['topics'] = values.get("topics") + if 'use_custom_template' in values: + data['use_custom_template'] = values.get("use_custom_template") + if 'visibility' in values: + data['visibility'] = values.get("visibility") + if 'warn_about_potentially_unwanted_characters' in values: + data['warn_about_potentially_unwanted_characters'] = ( + values.get("warn_about_potentially_unwanted_characters")) + if 'wiki_access_level' in values: + data['wiki_access_level'] = values.get("wiki_access_level") + if 'wiki_enabled' in values: + data['wiki_enabled'] = values.get("wiki_enabled") + + # Remove None values + data = {k: v for k, v in data.items() if v is not None} + + values['data'] = data + return values @field_validator("analytics_access_level", "builds_access_level", "container_registry_access_level", @@ -1594,9 +1781,9 @@ def validate_access_level(cls, value): """ valid_access_levels = ['disabled', 'private', 'enabled'] - if value and value not in valid_access_levels: + if value and value.lower() not in valid_access_levels: raise ValueError("Invalid access level value") - return value + return value.lower() @field_validator("auto_cancel_pending_pipelines", "auto_devops_deploy_strategy", "mirror_overwrites_diverged_branches", @@ -1677,9 +1864,9 @@ def validate_order_by(cls, value): Raises: - ValueError: If the value is not a valid order_by. """ - if value not in ['id', 'name', 'username', 'created_at', 'updated_at']: + if value.lower() not in ['id', 'name', 'username', 'created_at', 'updated_at']: raise ValueError("Invalid order_by") - return value + return value.lower() class ProtectedBranchModel(BaseModel): @@ -1718,9 +1905,9 @@ class ProtectedBranchModel(BaseModel): merge_access_level: Optional[int] = None unprotect_access_level: Optional[int] = None allow_force_push: Optional[List[str]] = None - allowed_to_push: Optional[List[str]] = None - allowed_to_merge: Optional[List[str]] = None - allowed_to_unprotect: Optional[List[str]] = None + allowed_to_push: Optional[List[Dict]] = None + allowed_to_merge: Optional[List[Dict]] = None + allowed_to_unprotect: Optional[List[Dict]] = None code_owner_approval_required: Optional[bool] = None api_parameters: Optional[str] = "" data: Optional[Dict] = None @@ -1753,73 +1940,61 @@ def build_api_parameters(cls, values): if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + + data = {} + + if 'allow_force_push' in values: + data['allow_force_push'] = values.get("allow_force_push") + if 'allowed_to_push' in values: + data['allowed_to_push'] = values.get("allowed_to_push") + if 'allowed_to_merge' in values: + data['allowed_to_merge'] = values.get("allowed_to_merge") + if 'allowed_to_unprotect' in values: + data['allowed_to_unprotect'] = values.get("allowed_to_unprotect") + if 'code_owner_approval_required' in values: + data['code_owner_approval_required'] = values.get("code_owner_approval_required") + + # Remove None values + data = {k: v for k, v in data.items() if v is not None} + + values['data'] = data return values - # @field_validator('project_id') - # def validate_project_id(cls, value): - # """ - # Validate project ID for non-None. - # - # Args: - # - value: Project ID to validate. - # - # Returns: - # - The validated project ID. - # - # Raises: - # - ValueError: If the project ID is None. - # """ - # if value is None: - # raise ValueError('Project ID cannot be None') - # return value - # - # @field_validator('project_id') - # def validate_project_id_type(cls, value): - # """ - # Validate project ID for type (int or str). - # - # Args: - # - value: Project ID to validate. - # - # Returns: - # - The validated project ID. - # - # Raises: - # - ValueError: If the project ID is not an integer or a string. - # """ - # if not isinstance(value, (int, str)): - # raise ValueError('Project ID must be an integer or a string') - # return value - - # @model_validator(mode="before") - # def construct_data_dict(cls, values): - # """ - # Construct data dictionary. - # - # Args: - # - values: Dictionary of values. - # - # Returns: - # - The constructed data dictionary. - # - # Raises: - # - ValueError: If no key is present in the data dictionary. - # """ - # data = { - # "allow_force_push": values.get("allow_force_push"), - # "allowed_to_push": values.get("allowed_to_push"), - # "allowed_to_merge": values.get("allowed_to_merge"), - # "allowed_to_unprotect": values.get("allowed_to_unprotect"), - # "code_owner_approval_required": values.get("code_owner_approval_required"), - # } - # - # # Remove None values - # data = {k: v for k, v in data.items() if v is not None} - # - # if not data: - # raise ValueError("At least one key is required in the data dictionary.") - # - # return data + @field_validator('project_id') + def validate_project_id(cls, value): + """ + Validate project ID for non-None. + + Args: + - value: Project ID to validate. + + Returns: + - The validated project ID. + + Raises: + - ValueError: If the project ID is None. + """ + if value is None: + raise ValueError('Project ID cannot be None') + return value + + @field_validator('project_id') + def validate_project_id_type(cls, value): + """ + Validate project ID for type (int or str). + + Args: + - value: Project ID to validate. + + Returns: + - The validated project ID. + + Raises: + - ValueError: If the project ID is not an integer or a string. + """ + if not isinstance(value, (int, str)): + raise ValueError('Project ID must be an integer or a string') + return value class ReleaseModel(BaseModel): @@ -1866,7 +2041,7 @@ class ReleaseModel(BaseModel): tag_name: Optional[str] = None description: Optional[str] = None tag_message: Optional[str] = None - ref: Optional[str] = None + reference: Optional[str] = None direct_asset_path: Optional[str] = None name: Optional[List[str]] = None milestones: Optional[str] = None @@ -1891,11 +2066,41 @@ def build_api_parameters(cls, values): filters = [] if 'simple' in values: - filters.append(f'simple={values["simple"]}') + filters.append(f'simple={values["simple"]}'.lower()) if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + + data = {} + + if 'description' in values: + data['name'] = values.get("description") + if 'tag_name' in values: + data['tag_name'] = values.get("tag_name") + if 'tag_message' in values: + data['tag_message'] = values.get("tag_message") + if 'description' in values: + data['description'] = values.get("description") + if 'ref' in values: + data['ref'] = values.get("reference") + if 'milestones' in values: + data['milestones'] = values.get("milestones") + if 'assets:links' in values: + data['assets:links'] = values.get("assets:links") + if 'assets:links:name' in values: + data['assets:links:name'] = values.get("assets:links:name") + if 'assets:links:url' in values: + data['assets:links:url'] = values.get("assets:links:url") + if 'assets:links:direct_asset_path' in values: + data['assets:links:direct_asset_path'] = values.get("assets:links:direct_asset_path") + if 'released_at' in values: + data['released_at'] = values.get("released_at") + + data = {k: v for k, v in data.items() if v is not None} + + values['data'] = data + return values @field_validator("order_by") @@ -1971,42 +2176,6 @@ def validate_project_id_type(cls, value): raise ValueError('Project ID must be an integer or a string') return value - @field_validator("data") - def construct_data_dict(cls, values): - """ - Construct data dictionary. - - Args: - - values: Dictionary of values. - - Returns: - - The constructed data dictionary. - - Raises: - - ValueError: If no key is present in the data dictionary. - """ - data = { - "name": values.get("description"), - "tag_name": values.get("tag_name"), - "tag_message": values.get("tag_message"), - "description": values.get("description"), - "ref": values.get("ref"), - "milestones": values.get("milestones"), - "assets:links": values.get("assets:links"), - "assets:links:name": values.get("assets:links:name"), - "assets:links:url": values.get("assets:links:url"), - "assets:links:direct_asset_path": values.get("assets:links:direct_asset_path"), - "released_at": values.get("released_at"), - } - - # Remove None values - data = {k: v for k, v in data.items() if v is not None} - - if not data: - raise ValueError("At least one key is required in the data dictionary.") - - return data - class RunnerModel(BaseModel): """ @@ -2044,24 +2213,24 @@ class RunnerModel(BaseModel): - Example 1: How to use this Pydantic model. - Example 2: Another example of usage. """ - description: str = None - active: bool = None - paused: bool = None - tag_list: List[str] = None - run_untagged: bool = None - locked: bool = None - access_level: str = None - maintenance_note: str = None - info: str = None - token: str = None - project_id: Union[int, str] = None - group_id: Union[int, str] = None - maximum_timeout: int = None - runner_type: str = None - status: str = None - all_runners: bool = False + description: Optional[str] = None + active: Optional[bool] = None + paused: Optional[bool] = None + tag_list: Optional[List[str]] = None + run_untagged: Optional[bool] = None + locked: Optional[bool] = None + access_level: Optional[str] = None + maintenance_note: Optional[str] = None + info: Optional[str] = None + token: Optional[str] = None + project_id: Optional[Union[int, str]] = None + group_id: Optional[Union[int, str]] = None + maximum_timeout: Optional[int] = None + runner_type: Optional[str] = None + status: Optional[str] = None + all_runners: Optional[bool] = False api_parameters: Optional[str] = "" - data: Dict = None + data: Optional[Dict] = None @model_validator(mode="before") def build_api_parameters(cls, values): @@ -2077,30 +2246,51 @@ def build_api_parameters(cls, values): Raises: - None. """ - filters = [] - - if values.get("tag_list") is not None: - filters.append(f'tag_list={values["tag_list"]}') - - if values.get("runner_type") is not None: - filters.append(f'runner_type={values["runner_type"]}') - - if values.get("status") is not None: - filters.append(f'status={values["status"]}') - - if values.get("paused") is not None: - filters.append(f'paused={values["paused"]}') - - if values.get("tag_list") is not None: - filters.append(f'tag_list={values["tag_list"]}') - - if values.get("all_runners"): + if 'tag_list' in values: + filters.append(f'tag_list={values["tag_list"].lower()}') + if 'runner_type' in values: + filters.append(f'runner_type={values["runner_type"].lower()}') + if 'status' in values: + filters.append(f'status={values["status"].lower()}') + if 'paused' in values: + filters.append(f'paused={values["paused"].lower()}') + if 'all_runners' in values: filters = ['/all'] if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + + data = {} + + if 'description' in values: + data['name'] = values.get("description") + if 'active' in values: + data['active'] = values.get("active") + if 'paused' in values: + data['paused'] = values.get("paused") + if 'tag_list' in values: + data['tag_list'] = values.get("tag_list") + if 'run_untagged' in values: + data['run_untagged'] = values.get("run_untagged") + if 'locked' in values: + data['locked'] = values.get("locked") + if 'access_level' in values: + data['access_level'] = values.get("access_level") + if 'maximum_timeout' in values: + data['maximum_timeout'] = values.get("maximum_timeout") + if 'info' in values: + data['info'] = values.get("info") + if 'maintenance_note' in values: + data['maintenance_note'] = values.get("maintenance_note") + if 'token' in values: + data['token'] = values.get("token") + + data = {k: v for k, v in data.items() if v is not None} + + values['data'] = data + return values @field_validator("runner_type") @@ -2117,9 +2307,9 @@ def validate_runner_type(cls, value): Raises: - ValueError: If the runner_type attribute is not valid. """ - if value not in ['instance_type', 'group_type', 'project_type']: + if value.lower() not in ['instance_type', 'group_type', 'project_type']: raise ValueError("Invalid runner_type") - return value + return value.lower() @field_validator("status") def validate_status(cls, value): @@ -2135,45 +2325,9 @@ def validate_status(cls, value): Raises: - ValueError: If the status attribute is not valid. """ - if value not in ['online', 'offline', 'stale', 'never_contacted', 'active', 'paused']: + if value.lower() not in ['online', 'offline', 'stale', 'never_contacted', 'active', 'paused']: raise ValueError("Invalid status") - return value - - @field_validator("data") - def construct_data_dict(cls, values): - """ - Construct data dictionary. - - Args: - - values: Dictionary of values. - - Returns: - - The constructed data dictionary. - - Raises: - - ValueError: If no key is present in the data dictionary. - """ - data = { - "description": values.get("description"), - "active": values.get("active"), - "paused": values.get("paused"), - "tag_list": values.get("tag_list"), - "run_untagged": values.get("run_untagged"), - "locked": values.get("locked"), - "access_level": values.get("access_level"), - "maximum_timeout": values.get("maximum_timeout"), - "info": values.get("info"), - "maintenance_note": values.get("maximum_timeout"), - "token": values.get("token"), - } - - # Remove None values - data = {k: v for k, v in data.items() if v is not None} - - if not data: - raise ValueError("At least one key is required in the data dictionary.") - - return data + return value.lower() class UserModel(BaseModel): @@ -2218,29 +2372,29 @@ class UserModel(BaseModel): - Example 1: How to use this Pydantic model. - Example 2: Another example of usage. """ - username: str = None - active: bool = None - blocked: bool = None - external: bool = None - exclude_internal: bool = None - exclude_external: bool = None - without_project_bots: bool = None - extern_uid: str = None - provider: str = None - created_before: str = None - created_after: str = None - with_custom_attributes: str = None - sort: str = None - order_by: str = None - two_factor: str = None - without_projects: bool = None - admins: bool = None - saml_provider_id: str = None - max_pages: int = 0 - page: int = 1 - per_page: int = 100 - sudo: bool = False - user_id: Union[str, int] = None + username: Optional[str] = None + active: Optional[bool] = None + blocked: Optional[bool] = None + external: Optional[bool] = None + exclude_internal: Optional[bool] = None + exclude_external: Optional[bool] = None + without_project_bots: Optional[bool] = None + extern_uid: Optional[str] = None + provider: Optional[str] = None + created_before: Optional[str] = None + created_after: Optional[str] = None + with_custom_attributes: Optional[str] = None + sort: Optional[str] = None + order_by: Optional[str] = None + two_factor: Optional[str] = None + without_projects: Optional[bool] = None + admins: Optional[bool] = None + saml_provider_id: Optional[str] = None + max_pages: Optional[int] = 0 + page: Optional[int] = 1 + per_page: Optional[int] = 100 + sudo: Optional[bool] = False + user_id: Optional[Union[str, int]] = None api_parameters: Optional[str] = "" @model_validator(mode="before") @@ -2257,76 +2411,58 @@ def build_api_parameters(cls, values): Raises: - None. """ - filters = [] - - if values.get("username") is not None: - filters.append(f'username={values["username"]}') - - if values.get("active") is not None: - filters.append(f'active={values["active"]}') - - if values.get("blocked") is not None: - filters.append(f'blocked={values["blocked"]}') - - if values.get("external") is not None: - filters.append(f'external={values["external"]}') - - if values.get("exclude_internal") is not None: - filters.append(f'exclude_internal={values["exclude_internal"]}') - - if values.get("exclude_external") is not None: - filters.append(f'exclude_external={values["exclude_external"]}') - - if values.get("without_project_bots") is not None: - filters.append(f'without_project_bots={values["without_project_bots"]}') - - if values.get("order_by") is not None: - filters.append(f'order_by={values["order_by"]}') - - if values.get("sort") is not None: - filters.append(f'sort={values["sort"]}') - if values.get("two_factor") is not None: - filters.append(f'two_factor={values["two_factor"]}') - - if values.get("without_projects") is not None: - filters.append(f'without_projects={values["without_projects"]}') - - if values.get("admins") is not None: - filters.append(f'admins={values["admins"]}') - - if values.get("saml_provider_id") is not None: - filters.append(f'saml_provider_id={values["saml_provider_id"]}') - - if values.get("extern_uid") is not None: - filters.append(f'extern_uid={values["extern_uid"]}') - - if values.get("provider") is not None: - filters.append(f'provider={values["provider"]}') - - if values.get("created_before") is not None: - filters.append(f'created_before={values["created_before"]}') - - if values.get("created_after") is not None: - filters.append(f'created_after={values["created_after"]}') - - if values.get("with_custom_attributes") is not None: - filters.append(f'with_custom_attributes={values["with_custom_attributes"]}') - - if values.get("sudo") is not None: - filters.append(f'sudo={values["user_id"]}') - elif values.get("user_id") is not None: - filters.append(f'{values["user_id"]}') - - if values.get("page") is not None: - filters.append(f'page={values["page"]}') + filters = [] - if values.get("per_page") is not None: - filters.append(f'per_page={values["per_page"]}') + if 'username' in values: + filters.append(f'username={values["username"]}'.lower()) + if 'active' in values: + filters.append(f'active={values["active"]}'.lower()) + if 'blocked' in values: + filters.append(f'blocked={values["blocked"]}'.lower()) + if 'external' in values: + filters.append(f'external={values["external"]}'.lower()) + if 'exclude_internal' in values: + filters.append(f'exclude_internal={values["exclude_internal"]}'.lower()) + if 'exclude_external' in values: + filters.append(f'exclude_external={values["exclude_external"]}'.lower()) + if 'without_project_bots' in values: + filters.append(f'without_project_bots={values["without_project_bots"]}'.lower()) + if 'order_by' in values: + filters.append(f'order_by={values["order_by"]}'.lower()) + if 'sort' in values: + filters.append(f'sort={values["sort"]}'.lower()) + if 'two_factor' in values: + filters.append(f'two_factor={values["two_factor"]}'.lower()) + if 'without_projects' in values: + filters.append(f'without_projects={values["without_projects"]}'.lower()) + if 'admins' in values: + filters.append(f'admins={values["admins"]}'.lower()) + if 'saml_provider_id' in values: + filters.append(f'saml_provider_id={values["saml_provider_id"]}'.lower()) + if 'extern_uid' in values: + filters.append(f'extern_uid={values["extern_uid"]}'.lower()) + if 'provider' in values: + filters.append(f'provider={values["provider"]}'.lower()) + if 'created_before' in values: + filters.append(f'created_before={values["created_before"]}'.lower()) + if 'created_after' in values: + filters.append(f'created_after={values["created_after"]}'.lower()) + if 'with_custom_attributes' in values: + filters.append(f'with_custom_attributes={values["with_custom_attributes"]}'.lower()) + if 'sudo' in values: + filters.append(f'sudo={values["user_id"]}'.lower()) + if 'user_id' in values: + filters.append(f'user_id={values["user_id"]}'.lower()) + if 'page' in values: + filters.append(f'page={values["page"]}'.lower()) + if 'per_page' in values: + filters.append(f'per_page={values["per_page"]}'.lower()) if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + return values @field_validator("order_by") @@ -2343,9 +2479,9 @@ def validate_order_by(cls, value): Raises: - ValueError: If the order_by attribute is not valid. """ - if value not in ['id', 'name', 'username', 'created_at', 'updated_at']: + if value.lower() not in ['id', 'name', 'username', 'created_at', 'updated_at']: raise ValueError("Invalid order_by") - return value + return value.lower() @field_validator("sort") def validate_sort(cls, value): @@ -2362,9 +2498,9 @@ def validate_sort(cls, value): - ValueError: If the sort attribute is not valid. """ valid_sorts = ['asc', 'desc'] - if value and value not in valid_sorts: + if value and value.lower() not in valid_sorts: raise ValueError("Invalid sort value") - return value + return value.lower() @field_validator("two_factor") def validate_two_factor(cls, value): @@ -2381,9 +2517,9 @@ def validate_two_factor(cls, value): - ValueError: If the two_factor attribute is not valid. """ valid_two_factor = ['enabled', 'disabled'] - if value and value not in valid_two_factor: + if value and value.lower() not in valid_two_factor: raise ValueError("Invalid two_factor value") - return value + return value.lower() class WikiModel(BaseModel): @@ -2443,17 +2579,31 @@ def build_api_parameters(cls, values): filters = [] if 'with_content' in values: - filters.append(f'with_content={values["with_content"]}') + filters.append(f'with_content={values["with_content"]}'.lower()) if 'render_html' in values: - filters.append(f'render_html={values["render_html"]}') + filters.append(f'render_html={values["render_html"]}'.lower()) if 'version' in values: - filters.append(f'version={values["version"]}') + filters.append(f'version={values["version"]}'.lower()) if filters: api_parameters = "?" + "&".join(filters) values['api_parameters'] = api_parameters + + data = {} + + if 'content' in values: + data['content'] = values.get("content") + if 'title' in values: + data['title'] = values.get("title") + if 'format' in values: + data['format'] = values.get("format") + + data = {k: v for k, v in data.items() if v is not None} + + values['data'] = data + return values @field_validator('project_id') @@ -2491,31 +2641,3 @@ def validate_project_id_type(cls, value): if not isinstance(value, (int, str)): raise ValueError('Project ID must be an integer or a string') return value - - @field_validator("data") - def construct_data_dict(cls, values): - """ - Construct data dictionary. - - Args: - - values: Dictionary of values. - - Returns: - - The constructed data dictionary. - - Raises: - - ValueError: If no key is present in the data dictionary. - """ - data = { - "content": values.get("content"), - "title": values.get("title"), - "format": values.get("format"), - } - - # Remove None values - data = {k: v for k, v in data.items() if v is not None} - - if not data: - raise ValueError("At least one key is required in the data dictionary.") - - return data diff --git a/test/test_gitlab_models.py b/test/test_gitlab_models.py index c0f2ca8..0de22a7 100644 --- a/test/test_gitlab_models.py +++ b/test/test_gitlab_models.py @@ -115,8 +115,14 @@ def test_project_model(): ) def test_protected_branches_model(): project_id = 5679 - protected_branch = ProtectedBranchModel(project_id=project_id, branch="test") + branch="test" + protected_branch = ProtectedBranchModel(project_id=project_id, branch=branch, + allowed_to_push=[{"access_level": 40}], + allowed_to_merge=[{"access_level": 20}], + all_runners=True) assert project_id == protected_branch.project_id + assert protected_branch.allowed_to_push == [{"access_level": 40}] + assert protected_branch.allowed_to_merge == [{"access_level": 20}] @pytest.mark.skipif( @@ -127,7 +133,30 @@ def test_release_model(): project_id = 5679 release = ReleaseModel(project_id=project_id, simple=True) assert project_id == release.project_id - assert release.api_parameters == "?simple=True" + assert release.api_parameters == "?simple=true" + + +@pytest.mark.skipif( + sys.platform in ["darwin"] or skip, + reason=reason, + ) +def test_runner_model(): + project_id = 5679 + runner = RunnerModel(project_id=project_id, active=True, status="Online") + assert project_id == runner.project_id + assert runner.api_parameters == "?status=online" + + +@pytest.mark.skipif( + sys.platform in ["darwin"] or skip, + reason=reason, + ) +def test_user_model(): + username = "test_user" + user = UserModel(username=username, active=True) + assert user.username == username + assert user.active == True + assert user.api_parameters == "?username=test_user&active=true" @pytest.mark.skipif( @@ -138,7 +167,7 @@ def test_wiki_model(): project_id = 5679 wiki = WikiModel(project_id=project_id, with_content=True) assert project_id == wiki.project_id - assert wiki.api_parameters == "?with_content=True" + assert wiki.api_parameters == "?with_content=true" if __name__ == "__main__": @@ -150,4 +179,6 @@ def test_wiki_model(): test_project_model() test_protected_branches_model() test_release_model() + test_runner_model() + test_user_model() test_wiki_model() \ No newline at end of file