From e56ab231a7b3c7bdf85c6c5a7e90fff475018664 Mon Sep 17 00:00:00 2001 From: Kevin Tian Date: Fri, 5 Apr 2024 13:14:43 -0400 Subject: [PATCH 1/5] Use status() for related methods (#1568) * Update status for related methods * Fix infinite recurssion * Add release note * Address comments * Update release note --- qiskit_ibm_runtime/base_runtime_job.py | 9 ++++++--- qiskit_ibm_runtime/runtime_job.py | 21 ++++++++++++--------- qiskit_ibm_runtime/runtime_job_v2.py | 18 ++++++++++-------- release-notes/unreleased/1547.bug.rst | 2 ++ 4 files changed, 30 insertions(+), 20 deletions(-) create mode 100644 release-notes/unreleased/1547.bug.rst diff --git a/qiskit_ibm_runtime/base_runtime_job.py b/qiskit_ibm_runtime/base_runtime_job.py index bc1979f6d..6e3c89438 100644 --- a/qiskit_ibm_runtime/base_runtime_job.py +++ b/qiskit_ibm_runtime/base_runtime_job.py @@ -13,7 +13,7 @@ """Base runtime job class.""" from abc import ABC, abstractmethod -from typing import Any, Optional, Callable, Dict, Type, Union, Sequence, List +from typing import Any, Optional, Callable, Dict, Type, Union, Sequence, List, Tuple import json import logging from concurrent import futures @@ -55,6 +55,9 @@ class BaseRuntimeJob(ABC): _executor = futures.ThreadPoolExecutor(thread_name_prefix="runtime_job") + JOB_FINAL_STATES: Tuple[Any, ...] = () + ERROR: Union[str, RuntimeJobStatus] = None + def __init__( self, backend: Backend, @@ -220,7 +223,7 @@ def error_message(self) -> Optional[str]: def _set_status_and_error_message(self) -> None: """Fetch and set status and error message.""" - if not self.in_final_state(): + if self._status not in self.JOB_FINAL_STATES: response = self._api_client.job_get(job_id=self.job_id()) self._set_status(response) self._set_error_message(response) @@ -255,7 +258,7 @@ def _set_error_message(self, job_response: Dict) -> None: Args: job_response: Job response from runtime API. """ - if self.errored(): + if self._status == self.ERROR: self._error_message = self._error_msg_from_job_response(job_response) else: self._error_message = None diff --git a/qiskit_ibm_runtime/runtime_job.py b/qiskit_ibm_runtime/runtime_job.py index de4ac3866..7e6917392 100644 --- a/qiskit_ibm_runtime/runtime_job.py +++ b/qiskit_ibm_runtime/runtime_job.py @@ -75,6 +75,9 @@ class RuntimeJob(Job, BaseRuntimeJob): the results at a later time, but before the job finishes. """ + JOB_FINAL_STATES = JOB_FINAL_STATES + ERROR = JobStatus.ERROR + def __init__( self, backend: Backend, @@ -153,7 +156,7 @@ def result( # pylint: disable=arguments-differ """ _decoder = decoder or self._final_result_decoder self.wait_for_final_state(timeout=timeout) - if self._status == JobStatus.ERROR: + if self._status == self.ERROR: error_message = self._reason if self._reason else self._error_message if self._reason == "RAN TOO LONG": raise RuntimeJobMaxTimeoutError(error_message) @@ -196,11 +199,11 @@ def status(self) -> JobStatus: def in_final_state(self) -> bool: """Return whether the job is in a final job state such as ``DONE`` or ``ERROR``.""" - return self._status in JOB_FINAL_STATES + return self.status() in self.JOB_FINAL_STATES def errored(self) -> bool: """Return whether the job has failed.""" - return self._status == JobStatus.ERROR + return self.status() == self.ERROR def _status_from_job_response(self, response: Dict) -> str: """Returns the job status from an API response. @@ -213,7 +216,7 @@ def _status_from_job_response(self, response: Dict) -> str: """ mapped_job_status = API_TO_JOB_STATUS[response["state"]["status"].upper()] if mapped_job_status == JobStatus.CANCELLED and self._reason == "RAN TOO LONG": - mapped_job_status = JobStatus.ERROR + mapped_job_status = self.ERROR return mapped_job_status def submit(self) -> None: @@ -303,7 +306,7 @@ def logs(self) -> str: Raises: IBMRuntimeError: If a network error occurred. """ - if self.status() not in JOB_FINAL_STATES: + if self.status() not in self.JOB_FINAL_STATES: logger.warning("Job logs are only available after the job finishes.") try: return self._api_client.job_logs(self.job_id()) @@ -329,14 +332,14 @@ def wait_for_final_state( # pylint: disable=arguments-differ """ try: start_time = time.time() - if self._status not in JOB_FINAL_STATES and not self._is_streaming(): + if self._status not in self.JOB_FINAL_STATES and not self._is_streaming(): self._ws_client_future = self._executor.submit(self._start_websocket_client) if self._is_streaming(): self._ws_client_future.result(timeout) # poll for status after stream has closed until status is final # because status doesn't become final as soon as stream closes status = self.status() - while status not in JOB_FINAL_STATES: + while status not in self.JOB_FINAL_STATES: elapsed_time = time.time() - start_time if timeout is not None and elapsed_time >= timeout: raise RuntimeJobTimeoutError( @@ -383,7 +386,7 @@ def stream_results( RuntimeInvalidStateError: If a callback function is already streaming results or if the job already finished. """ - if self._status in JOB_FINAL_STATES: + if self._status in self.JOB_FINAL_STATES: raise RuntimeInvalidStateError("Job already finished.") if self._is_streaming(): raise RuntimeInvalidStateError("A callback function is already streaming results.") @@ -411,6 +414,6 @@ def interim_results(self, decoder: Optional[Type[ResultDecoder]] = None) -> Any: _decoder = decoder or self._interim_result_decoder interim_results_raw = self._api_client.job_interim_results(job_id=self.job_id()) self._interim_results = _decoder.decode(interim_results_raw) - if self.status() in JOB_FINAL_STATES: + if self.status() in self.JOB_FINAL_STATES: self._final_interim_results = True return self._interim_results diff --git a/qiskit_ibm_runtime/runtime_job_v2.py b/qiskit_ibm_runtime/runtime_job_v2.py index d254c8544..cbbbda651 100644 --- a/qiskit_ibm_runtime/runtime_job_v2.py +++ b/qiskit_ibm_runtime/runtime_job_v2.py @@ -40,7 +40,6 @@ logger = logging.getLogger(__name__) JobStatus = Literal["INITIALIZING", "QUEUED", "RUNNING", "CANCELLED", "DONE", "ERROR"] -JOB_FINAL_STATES: Tuple[JobStatus, ...] = ("DONE", "CANCELLED", "ERROR") API_TO_JOB_STATUS: Dict[str, JobStatus] = { "QUEUED": "QUEUED", "RUNNING": "RUNNING", @@ -53,6 +52,9 @@ class RuntimeJobV2(BasePrimitiveJob[PrimitiveResult, JobStatus], BaseRuntimeJob): """Representation of a runtime V2 primitive exeuction.""" + JOB_FINAL_STATES: Tuple[JobStatus, ...] = ("DONE", "CANCELLED", "ERROR") + ERROR = "ERROR" + def __init__( self, backend: Backend, @@ -199,11 +201,11 @@ def done(self) -> bool: def errored(self) -> bool: """Return whether the job has failed.""" - return self._status == "ERROR" + return self.status() == "ERROR" def in_final_state(self) -> bool: """Return whether the job is in a final job state such as ``DONE`` or ``ERROR``.""" - return self._status in JOB_FINAL_STATES + return self.status() in self.JOB_FINAL_STATES def running(self) -> bool: """Return whether the job is actively running.""" @@ -221,7 +223,7 @@ def logs(self) -> str: Raises: IBMRuntimeError: If a network error occurred. """ - if self.status() not in JOB_FINAL_STATES: + if self.status() not in self.JOB_FINAL_STATES: logger.warning("Job logs are only available after the job finishes.") try: return self._api_client.job_logs(self.job_id()) @@ -247,14 +249,14 @@ def wait_for_final_state( # pylint: disable=arguments-differ """ try: start_time = time.time() - if self._status not in JOB_FINAL_STATES and not self._is_streaming(): + if self._status not in self.JOB_FINAL_STATES and not self._is_streaming(): self._ws_client_future = self._executor.submit(self._start_websocket_client) if self._is_streaming(): self._ws_client_future.result(timeout) # poll for status after stream has closed until status is final # because status doesn't become final as soon as stream closes status = self.status() - while status not in JOB_FINAL_STATES: + while status not in self.JOB_FINAL_STATES: elapsed_time = time.time() - start_time if timeout is not None and elapsed_time >= timeout: raise RuntimeJobTimeoutError( @@ -301,7 +303,7 @@ def stream_results( RuntimeInvalidStateError: If a callback function is already streaming results or if the job already finished. """ - if self._status in JOB_FINAL_STATES: + if self._status in self.JOB_FINAL_STATES: raise RuntimeInvalidStateError("Job already finished.") if self._is_streaming(): raise RuntimeInvalidStateError("A callback function is already streaming results.") @@ -329,6 +331,6 @@ def interim_results(self, decoder: Optional[Type[ResultDecoder]] = None) -> Any: _decoder = decoder or self._interim_result_decoder interim_results_raw = self._api_client.job_interim_results(job_id=self.job_id()) self._interim_results = _decoder.decode(interim_results_raw) - if self.status() in JOB_FINAL_STATES: + if self.status() in self.JOB_FINAL_STATES: self._final_interim_results = True return self._interim_results diff --git a/release-notes/unreleased/1547.bug.rst b/release-notes/unreleased/1547.bug.rst new file mode 100644 index 000000000..ef60299cb --- /dev/null +++ b/release-notes/unreleased/1547.bug.rst @@ -0,0 +1,2 @@ +Fixed an issue where the `in_final_state()` method in `RuntimeJobV2` would not +update the status when called. \ No newline at end of file From 84e2e1976380f6ba047b32b62d820cf494251cb1 Mon Sep 17 00:00:00 2001 From: Ian Hincks Date: Fri, 5 Apr 2024 16:27:48 -0400 Subject: [PATCH 2/5] Add recursive nesting to HTML option table rendering (#1586) * Add recursion to HTML option table rendering * lint * lint again * appease python 3.8 --- qiskit_ibm_runtime/options/options.py | 58 ++++++++++++++++++++------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/qiskit_ibm_runtime/options/options.py b/qiskit_ibm_runtime/options/options.py index daf24a326..87f7094f1 100644 --- a/qiskit_ibm_runtime/options/options.py +++ b/qiskit_ibm_runtime/options/options.py @@ -13,8 +13,8 @@ """Primitive options.""" from abc import abstractmethod -from typing import Optional, Union, ClassVar, Any -from dataclasses import dataclass, fields, field, asdict +from typing import Iterable, Optional, Tuple, Union, ClassVar, Any +from dataclasses import dataclass, fields, field, asdict, is_dataclass import copy import warnings @@ -39,6 +39,43 @@ from ..runtime_options import RuntimeOptions +def _make_data_row(indent: int, name: str, value: Any, is_section: bool) -> Iterable[str]: + """Yield HTML table rows to format an options entry.""" + tag = "th" if is_section else "td" + + weight = " font-weight: bold;" if is_section else "" + style = f"style='text-align: left; vertical-align: top;{weight}'" + + marker = "▸" if is_section else "" + spacer_style = "display: inline-block; text-align: right; margin-right: 10px;" + spacer = f"
{marker}
" + + yield " " + yield f" <{tag} {style}>{spacer}{name}" + yield f" <{tag} {style}>{type(value).__name__ if is_section else repr(value)}" + yield " " + + +def _iter_all_fields( + data_cls: Any, indent: int = 0, dict_form: Union[dict, None] = None +) -> Iterable[Tuple[int, str, Any, bool]]: + """Recursively iterate over a dataclass, yielding (indent, name, value, is_dataclass) fields.""" + # we pass dict_form through recursion simply to avoid calling asdict() more than once + dict_form = dict_form or asdict(data_cls) + + suboptions = [] + for name, val in dict_form.items(): + if is_dataclass(subopt := getattr(data_cls, name)): + suboptions.append((name, subopt)) + elif name != "_VERSION": + yield (indent, name, val, False) + + # put all of the nested options at the bottom + for name, subopt in suboptions: + yield (indent, name, subopt, True) + yield from _iter_all_fields(subopt, indent + 1, dict_form[name]) + + @dataclass class BaseOptions: """Base options class.""" @@ -74,18 +111,11 @@ def _get_runtime_options(options: dict) -> dict: def _repr_html_(self) -> str: """Return a string that formats this instance as an HTML table.""" - html_str = "" - for key, value in asdict(self).items(): - if isinstance(value, dict): - html_str += f"" - for subkey, subvalue in value.items(): - html_str += ( - f"" - f"" - ) - else: - html_str += f"" - return html_str + "
{key}
{subkey}{subvalue}
{key}{value}
" + table_html = [f"
{type(self).__name__}<{hex(id(self))}>
", ""] + for row in _iter_all_fields(self): + table_html.extend(_make_data_row(*row)) + table_html.append("
") + return "\n".join(table_html) @primitive_dataclass From d6c74f2284c93676e56803b729d95c7659b899dc Mon Sep 17 00:00:00 2001 From: Kevin Tian Date: Sat, 6 Apr 2024 00:24:59 -0400 Subject: [PATCH 3/5] Add elapsed_time to session details (#1581) * Add elapsed_time to session details * Add release note * change to usage_time * Docs build * Update release note --- qiskit_ibm_runtime/session.py | 3 +++ release-notes/unreleased/1567.feat.rst | 3 +++ 2 files changed, 6 insertions(+) create mode 100644 release-notes/unreleased/1567.feat.rst diff --git a/qiskit_ibm_runtime/session.py b/qiskit_ibm_runtime/session.py index 269d24381..c68bcc823 100644 --- a/qiskit_ibm_runtime/session.py +++ b/qiskit_ibm_runtime/session.py @@ -286,6 +286,8 @@ def details(self) -> Optional[Dict[str, Any]]: started_at: Timestamp of when the session was started. closed_at: Timestamp of when the session was closed. activated_at: Timestamp of when the session state was changed to active. + usage_time: The usage time, in seconds, of this Session or Batch. + Usage is defined as the time a quantum system is committed to complete a job. """ if self._session_id and isinstance(self._service, QiskitRuntimeService): response = self._service._api_client.session_details(self._session_id) @@ -303,6 +305,7 @@ def details(self) -> Optional[Dict[str, Any]]: "started_at": response.get("started_at"), "closed_at": response.get("closed_at"), "activated_at": response.get("activated_at"), + "usage_time": response.get("elapsed_time"), } return None diff --git a/release-notes/unreleased/1567.feat.rst b/release-notes/unreleased/1567.feat.rst new file mode 100644 index 000000000..83553e36d --- /dev/null +++ b/release-notes/unreleased/1567.feat.rst @@ -0,0 +1,3 @@ +Session `details() `__ +now includes a new field, `usage_time`. Usage is defined as the time a quantum system +is committed to complete a job. \ No newline at end of file From 833b9d244663aaa5129638ab3ffbf9185bc9c7ad Mon Sep 17 00:00:00 2001 From: Kevin Tian Date: Sat, 6 Apr 2024 01:06:23 -0400 Subject: [PATCH 4/5] Add V2 primitive Q-Ctrl tests (#1534) * add initial v2 tests * Add simple sampler & estimator test * Address comments * use correct bell, backend --- test/qctrl/test_qctrl.py | 94 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 2 deletions(-) diff --git a/test/qctrl/test_qctrl.py b/test/qctrl/test_qctrl.py index 08bfd9a8d..b04f28835 100644 --- a/test/qctrl/test_qctrl.py +++ b/test/qctrl/test_qctrl.py @@ -16,8 +16,19 @@ from qiskit.quantum_info import Statevector, hellinger_fidelity from qiskit.providers.jobstatus import JobStatus from qiskit.quantum_info import SparsePauliOp - -from qiskit_ibm_runtime import Sampler, Session, Options, Estimator, QiskitRuntimeService +from qiskit.circuit.library import RealAmplitudes +from qiskit.primitives.containers import PrimitiveResult, PubResult, DataBin, BitArray +from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager + +from qiskit_ibm_runtime import ( + Sampler, + SamplerV2, + EstimatorV2, + Session, + Options, + Estimator, + QiskitRuntimeService, +) from qiskit_ibm_runtime.exceptions import IBMNotAuthorizedError from ..ibm_test_case import IBMIntegrationTestCase @@ -28,6 +39,85 @@ DIFFERENCE_THRESHOLD = 0.35 +class TestV2PrimitivesQCTRL(IBMIntegrationTestCase): + """Integration tests for V2 primitives using QCTRL.""" + + def setUp(self) -> None: + super().setUp() + self.bell = bell() + self.backend = self.service.least_busy(simulator=False) + + @run_integration_test + def test_sampler_v2_qctrl(self, service): + """Test qctrl bell state with samplerV2""" + shots = 1 + + pm = generate_preset_pass_manager(backend=self.backend, optimization_level=1) + isa_circuit = pm.run(self.bell) + + with Session(service, backend=self.backend): + sampler = SamplerV2() + + result = sampler.run([isa_circuit], shots=shots).result() + self._verify_sampler_result(result, num_pubs=1) + + @run_integration_test + def test_estimator_v2_qctrl(self, service): + """Test simple circuit with estimatorV2 using qctrl.""" + pass_mgr = generate_preset_pass_manager(backend=self.backend, optimization_level=1) + + psi1 = pass_mgr.run(RealAmplitudes(num_qubits=2, reps=2)) + psi2 = pass_mgr.run(RealAmplitudes(num_qubits=2, reps=3)) + + # pylint: disable=invalid-name + H1 = SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]).apply_layout(psi1.layout) + H2 = SparsePauliOp.from_list([("IZ", 1)]).apply_layout(psi2.layout) + H3 = SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]).apply_layout(psi1.layout) + + theta1 = [0, 1, 1, 2, 3, 5] + theta2 = [0, 1, 1, 2, 3, 5, 8, 13] + theta3 = [1, 2, 3, 4, 5, 6] + + with Session(service, self.backend) as session: + estimator = EstimatorV2(session=session) + + job = estimator.run([(psi1, H1, [theta1])]) + result = job.result() + self._verify_estimator_result(result, num_pubs=1, shapes=[(1,)]) + + job2 = estimator.run([(psi1, [H1, H3], [theta1, theta3]), (psi2, H2, theta2)]) + result2 = job2.result() + self._verify_estimator_result(result2, num_pubs=2, shapes=[(2,), (1,)]) + + job3 = estimator.run([(psi1, H1, theta1), (psi2, H2, theta2), (psi1, H3, theta3)]) + result3 = job3.result() + self._verify_estimator_result(result3, num_pubs=3, shapes=[(1,), (1,), (1,)]) + + def _verify_sampler_result(self, result, num_pubs, targets=None): + """Verify result type.""" + self.assertIsInstance(result, PrimitiveResult) + self.assertIsInstance(result.metadata, dict) + self.assertEqual(len(result), num_pubs) + for idx, pub_result in enumerate(result): + self.assertIsInstance(pub_result, PubResult) + self.assertIsInstance(pub_result.data, DataBin) + self.assertIsInstance(pub_result.metadata, dict) + if targets: + self.assertIsInstance(result[idx].data.meas, BitArray) + self._assert_allclose(result[idx].data.meas, targets[idx]) + + def _verify_estimator_result(self, result, num_pubs, shapes): + """Verify result type.""" + self.assertIsInstance(result, PrimitiveResult) + self.assertEqual(len(result), num_pubs) + for idx, pub_result in enumerate(result): + self.assertIsInstance(pub_result, PubResult) + self.assertIsInstance(pub_result.data, DataBin) + self.assertTrue(pub_result.metadata) + self.assertEqual(pub_result.data.evs.shape, shapes[idx]) + self.assertEqual(pub_result.data.stds.shape, shapes[idx]) + + class TestQCTRL(IBMIntegrationTestCase): """Integration tests for QCTRL integration.""" From 64f6e661b4fda1efe906da455ecc9bccb27ab5a3 Mon Sep 17 00:00:00 2001 From: gadial Date: Mon, 8 Apr 2024 19:35:50 +0300 Subject: [PATCH 5/5] ZNE options validation fix (#1588) * ZNE options validation fix * Linting * Linting --------- Co-authored-by: Kevin Tian --- qiskit_ibm_runtime/options/zne_options.py | 53 +++++++++++------- test/unit/test_estimator_options.py | 68 ++++++++++++++++++----- 2 files changed, 86 insertions(+), 35 deletions(-) diff --git a/qiskit_ibm_runtime/options/zne_options.py b/qiskit_ibm_runtime/options/zne_options.py index ac1f9bfbc..00bb4fd98 100644 --- a/qiskit_ibm_runtime/options/zne_options.py +++ b/qiskit_ibm_runtime/options/zne_options.py @@ -53,6 +53,14 @@ class ZneOptions: noise_factors: Union[UnsetType, Sequence[float]] = Unset extrapolator: Union[UnsetType, ExtrapolatorType, Sequence[ExtrapolatorType]] = Unset + @classmethod + def _default_noise_factors(cls) -> Sequence[float]: + return (1, 3, 5) + + @classmethod + def _default_extrapolator(cls) -> Sequence[ExtrapolatorType]: + return ("exponential", "linear") + @field_validator("noise_factors") @classmethod @skip_unset_validation @@ -65,24 +73,29 @@ def _validate_zne_noise_factors(cls, factors: Sequence[float]) -> Sequence[float @model_validator(mode="after") def _validate_options(self) -> "ZneOptions": """Check that there are enough noise factors for all extrapolators.""" - if self.extrapolator and self.noise_factors: - required_factors = { - "linear": 2, - "exponential": 2, - "double_exponential": 4, - } - for idx in range(1, 8): - required_factors[f"polynomial_degree_{idx}"] = idx + 1 - - extrapolators: Sequence = ( - [self.extrapolator] # type: ignore[assignment] - if isinstance(self.extrapolator, str) - else self.extrapolator - ) - for extrap in extrapolators: # pylint: disable=not-an-iterable - if len(self.noise_factors) < required_factors[extrap]: # type: ignore[arg-type] - raise ValueError( - f"{extrap} requires at least {required_factors[extrap]} noise_factors" - ) - + noise_factors = ( + self.noise_factors if self.noise_factors != Unset else self._default_noise_factors() + ) + extrapolator = ( + self.extrapolator if self.extrapolator != Unset else self._default_extrapolator() + ) + + required_factors = { + "linear": 2, + "exponential": 2, + "double_exponential": 4, + } + for idx in range(1, 8): + required_factors[f"polynomial_degree_{idx}"] = idx + 1 + + extrapolators: Sequence = ( + [extrapolator] # type: ignore[assignment] + if isinstance(extrapolator, str) + else extrapolator + ) + for extrap in extrapolators: # pylint: disable=not-an-iterable + if len(noise_factors) < required_factors[extrap]: # type: ignore[arg-type] + raise ValueError( + f"{extrap} requires at least {required_factors[extrap]} noise_factors" + ) return self diff --git a/test/unit/test_estimator_options.py b/test/unit/test_estimator_options.py index 628c29c1a..e06627d43 100644 --- a/test/unit/test_estimator_options.py +++ b/test/unit/test_estimator_options.py @@ -37,24 +37,62 @@ class TestEstimatorOptions(IBMTestCase): """Class for testing the EstimatorOptions class.""" @data( - {"optimization_level": 99}, - {"resilience_level": -1}, - {"default_precision": 0}, - {"dynamical_decoupling": "foo"}, - {"execution": {"init_qubits": 2}}, - {"twirling": {"strategy": "foo"}}, - {"resilience": {"zne": {"noise_factors": [0.5]}}}, - {"noise_factors": [1, 3, 5]}, - {"zne_mitigation": True, "pec_mitigation": True}, - {"simulator": {"noise_model": "foo"}}, - {"resilience": {"measure_noise_learning": {"num_randomizations": 1}}}, - {"resilience": {"zne": {"noise_factors": [1]}}}, + ({"optimization_level": 99}, "optimization_level must be <=1"), + ({"resilience_level": -1}, "resilience_level must be >=0"), + ({"default_precision": 0}, "default_precision must be >0"), + ( + {"dynamical_decoupling": "foo"}, + "Input should be a dictionary or an instance of DynamicalDecouplingOptions", + ), + ({"execution": {"init_qubits": 2}}, "Input should be a valid boolean"), + ( + {"twirling": {"strategy": "foo"}}, + "Input should be 'active', 'active-accum', 'active-circuit' or 'all'", + ), + ( + {"resilience": {"zne": {"noise_factors": [0.5]}}}, + "noise_factors` option value must all be >= 1", + ), + ({"noise_factors": [1, 3, 5]}, "Unexpected keyword argument"), + ( + {"resilience": {"zne_mitigation": True, "pec_mitigation": True}}, + "pec_mitigation and zne_mitigation`options cannot be simultaneously enabled", + ), + ( + {"simulator": {"noise_model": "foo"}}, + "'noise_model' can only be a dictionary or qiskit_aer.noise.NoiseModel", + ), + ( + {"resilience": {"measure_noise_learning": {"num_randomizations": 1}}}, + "'measure_noise_learning' options are set, but 'measure_mitigation' is not set to True", + ), + ( + { + "resilience": { + "measure_mitigation": True, + "measure_noise_learning": {"num_randomizations": 0}, + } + }, + "num_randomizations must be >=1", + ), + ( + {"resilience": {"zne_mitigation": True, "zne": {"noise_factors": [1]}}}, + "exponential requires at least 2 noise_factors", + ), + ( + {"resilience": {"zne_mitigation": True, "zne": {"noise_factors": []}}}, + "exponential requires at least 2 noise_factors", + ), + ( + {"resilience": {"zne": {"noise_factors": [1, 3, 5]}}}, + "'zne' options are set, but 'zne_mitigation' is not set to True", + ), ) def test_bad_inputs(self, val): """Test invalid inputs.""" - with self.assertRaises(ValidationError) as exc: - EstimatorOptions(**val) - self.assertIn(list(val.keys())[0], str(exc.exception)) + bad_input, error_msg = val + with self.assertRaisesRegex(ValidationError, error_msg): + EstimatorOptions(**bad_input) def test_program_inputs(self): """Test converting to program inputs from estimator options."""