Skip to content

Commit

Permalink
Merge branch 'main' into support-twirling
Browse files Browse the repository at this point in the history
  • Loading branch information
ptristan3 authored Apr 8, 2024
2 parents a8367fe + 64f6e66 commit 933d38e
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 71 deletions.
9 changes: 6 additions & 3 deletions qiskit_ibm_runtime/base_runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""Base runtime job class."""

from abc import ABC, abstractmethod
from typing import Any, Optional, Callable, Dict, Type, Union, Sequence, List
from typing import Any, Optional, Callable, Dict, Type, Union, Sequence, List, Tuple
import json
import logging
from concurrent import futures
Expand Down Expand Up @@ -55,6 +55,9 @@ class BaseRuntimeJob(ABC):

_executor = futures.ThreadPoolExecutor(thread_name_prefix="runtime_job")

JOB_FINAL_STATES: Tuple[Any, ...] = ()
ERROR: Union[str, RuntimeJobStatus] = None

def __init__(
self,
backend: Backend,
Expand Down Expand Up @@ -220,7 +223,7 @@ def error_message(self) -> Optional[str]:

def _set_status_and_error_message(self) -> None:
"""Fetch and set status and error message."""
if not self.in_final_state():
if self._status not in self.JOB_FINAL_STATES:
response = self._api_client.job_get(job_id=self.job_id())
self._set_status(response)
self._set_error_message(response)
Expand Down Expand Up @@ -255,7 +258,7 @@ def _set_error_message(self, job_response: Dict) -> None:
Args:
job_response: Job response from runtime API.
"""
if self.errored():
if self._status == self.ERROR:
self._error_message = self._error_msg_from_job_response(job_response)
else:
self._error_message = None
Expand Down
58 changes: 44 additions & 14 deletions qiskit_ibm_runtime/options/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
"""Primitive options."""

from abc import abstractmethod
from typing import Optional, Union, ClassVar, Any
from dataclasses import dataclass, fields, field, asdict
from typing import Iterable, Optional, Tuple, Union, ClassVar, Any
from dataclasses import dataclass, fields, field, asdict, is_dataclass
import copy
import warnings

Expand All @@ -39,6 +39,43 @@
from ..runtime_options import RuntimeOptions


def _make_data_row(indent: int, name: str, value: Any, is_section: bool) -> Iterable[str]:
"""Yield HTML table rows to format an options entry."""
tag = "th" if is_section else "td"

weight = " font-weight: bold;" if is_section else ""
style = f"style='text-align: left; vertical-align: top;{weight}'"

marker = "▸" if is_section else ""
spacer_style = "display: inline-block; text-align: right; margin-right: 10px;"
spacer = f"<div style='width: {20*(1 + indent)}px; {spacer_style}'>{marker}</div>"

yield " <tr>"
yield f" <{tag} {style}>{spacer}{name}</{tag}>"
yield f" <{tag} {style}>{type(value).__name__ if is_section else repr(value)}</{tag}>"
yield " </tr>"


def _iter_all_fields(
data_cls: Any, indent: int = 0, dict_form: Union[dict, None] = None
) -> Iterable[Tuple[int, str, Any, bool]]:
"""Recursively iterate over a dataclass, yielding (indent, name, value, is_dataclass) fields."""
# we pass dict_form through recursion simply to avoid calling asdict() more than once
dict_form = dict_form or asdict(data_cls)

suboptions = []
for name, val in dict_form.items():
if is_dataclass(subopt := getattr(data_cls, name)):
suboptions.append((name, subopt))
elif name != "_VERSION":
yield (indent, name, val, False)

# put all of the nested options at the bottom
for name, subopt in suboptions:
yield (indent, name, subopt, True)
yield from _iter_all_fields(subopt, indent + 1, dict_form[name])


@dataclass
class BaseOptions:
"""Base options class."""
Expand Down Expand Up @@ -74,18 +111,11 @@ def _get_runtime_options(options: dict) -> dict:

def _repr_html_(self) -> str:
"""Return a string that formats this instance as an HTML table."""
html_str = "<table style='font-size: 14px; width: 300px;'>"
for key, value in asdict(self).items():
if isinstance(value, dict):
html_str += f"<tr><th style='text-align: left;'>{key}</th><td></td></tr>"
for subkey, subvalue in value.items():
html_str += (
f"<tr><td style='text-align: left; padding-left: 20px;'>{subkey}</td>"
f"<td>{subvalue}</td></tr>"
)
else:
html_str += f"<tr><th style='text-align: left;'>{key}</th><td>{value}</td></tr>"
return html_str + "</table>"
table_html = [f"<pre>{type(self).__name__}<{hex(id(self))}></pre>", "<table>"]
for row in _iter_all_fields(self):
table_html.extend(_make_data_row(*row))
table_html.append("</table>")
return "\n".join(table_html)


@primitive_dataclass
Expand Down
53 changes: 33 additions & 20 deletions qiskit_ibm_runtime/options/zne_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ class ZneOptions:
noise_factors: Union[UnsetType, Sequence[float]] = Unset
extrapolator: Union[UnsetType, ExtrapolatorType, Sequence[ExtrapolatorType]] = Unset

@classmethod
def _default_noise_factors(cls) -> Sequence[float]:
return (1, 3, 5)

@classmethod
def _default_extrapolator(cls) -> Sequence[ExtrapolatorType]:
return ("exponential", "linear")

@field_validator("noise_factors")
@classmethod
@skip_unset_validation
Expand All @@ -65,24 +73,29 @@ def _validate_zne_noise_factors(cls, factors: Sequence[float]) -> Sequence[float
@model_validator(mode="after")
def _validate_options(self) -> "ZneOptions":
"""Check that there are enough noise factors for all extrapolators."""
if self.extrapolator and self.noise_factors:
required_factors = {
"linear": 2,
"exponential": 2,
"double_exponential": 4,
}
for idx in range(1, 8):
required_factors[f"polynomial_degree_{idx}"] = idx + 1

extrapolators: Sequence = (
[self.extrapolator] # type: ignore[assignment]
if isinstance(self.extrapolator, str)
else self.extrapolator
)
for extrap in extrapolators: # pylint: disable=not-an-iterable
if len(self.noise_factors) < required_factors[extrap]: # type: ignore[arg-type]
raise ValueError(
f"{extrap} requires at least {required_factors[extrap]} noise_factors"
)

noise_factors = (
self.noise_factors if self.noise_factors != Unset else self._default_noise_factors()
)
extrapolator = (
self.extrapolator if self.extrapolator != Unset else self._default_extrapolator()
)

required_factors = {
"linear": 2,
"exponential": 2,
"double_exponential": 4,
}
for idx in range(1, 8):
required_factors[f"polynomial_degree_{idx}"] = idx + 1

extrapolators: Sequence = (
[extrapolator] # type: ignore[assignment]
if isinstance(extrapolator, str)
else extrapolator
)
for extrap in extrapolators: # pylint: disable=not-an-iterable
if len(noise_factors) < required_factors[extrap]: # type: ignore[arg-type]
raise ValueError(
f"{extrap} requires at least {required_factors[extrap]} noise_factors"
)
return self
21 changes: 12 additions & 9 deletions qiskit_ibm_runtime/runtime_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class RuntimeJob(Job, BaseRuntimeJob):
the results at a later time, but before the job finishes.
"""

JOB_FINAL_STATES = JOB_FINAL_STATES
ERROR = JobStatus.ERROR

def __init__(
self,
backend: Backend,
Expand Down Expand Up @@ -153,7 +156,7 @@ def result( # pylint: disable=arguments-differ
"""
_decoder = decoder or self._final_result_decoder
self.wait_for_final_state(timeout=timeout)
if self._status == JobStatus.ERROR:
if self._status == self.ERROR:
error_message = self._reason if self._reason else self._error_message
if self._reason == "RAN TOO LONG":
raise RuntimeJobMaxTimeoutError(error_message)
Expand Down Expand Up @@ -196,11 +199,11 @@ def status(self) -> JobStatus:

def in_final_state(self) -> bool:
"""Return whether the job is in a final job state such as ``DONE`` or ``ERROR``."""
return self._status in JOB_FINAL_STATES
return self.status() in self.JOB_FINAL_STATES

def errored(self) -> bool:
"""Return whether the job has failed."""
return self._status == JobStatus.ERROR
return self.status() == self.ERROR

def _status_from_job_response(self, response: Dict) -> str:
"""Returns the job status from an API response.
Expand All @@ -213,7 +216,7 @@ def _status_from_job_response(self, response: Dict) -> str:
"""
mapped_job_status = API_TO_JOB_STATUS[response["state"]["status"].upper()]
if mapped_job_status == JobStatus.CANCELLED and self._reason == "RAN TOO LONG":
mapped_job_status = JobStatus.ERROR
mapped_job_status = self.ERROR
return mapped_job_status

def submit(self) -> None:
Expand Down Expand Up @@ -303,7 +306,7 @@ def logs(self) -> str:
Raises:
IBMRuntimeError: If a network error occurred.
"""
if self.status() not in JOB_FINAL_STATES:
if self.status() not in self.JOB_FINAL_STATES:
logger.warning("Job logs are only available after the job finishes.")
try:
return self._api_client.job_logs(self.job_id())
Expand All @@ -329,14 +332,14 @@ def wait_for_final_state( # pylint: disable=arguments-differ
"""
try:
start_time = time.time()
if self._status not in JOB_FINAL_STATES and not self._is_streaming():
if self._status not in self.JOB_FINAL_STATES and not self._is_streaming():
self._ws_client_future = self._executor.submit(self._start_websocket_client)
if self._is_streaming():
self._ws_client_future.result(timeout)
# poll for status after stream has closed until status is final
# because status doesn't become final as soon as stream closes
status = self.status()
while status not in JOB_FINAL_STATES:
while status not in self.JOB_FINAL_STATES:
elapsed_time = time.time() - start_time
if timeout is not None and elapsed_time >= timeout:
raise RuntimeJobTimeoutError(
Expand Down Expand Up @@ -383,7 +386,7 @@ def stream_results(
RuntimeInvalidStateError: If a callback function is already streaming results or
if the job already finished.
"""
if self._status in JOB_FINAL_STATES:
if self._status in self.JOB_FINAL_STATES:
raise RuntimeInvalidStateError("Job already finished.")
if self._is_streaming():
raise RuntimeInvalidStateError("A callback function is already streaming results.")
Expand Down Expand Up @@ -411,6 +414,6 @@ def interim_results(self, decoder: Optional[Type[ResultDecoder]] = None) -> Any:
_decoder = decoder or self._interim_result_decoder
interim_results_raw = self._api_client.job_interim_results(job_id=self.job_id())
self._interim_results = _decoder.decode(interim_results_raw)
if self.status() in JOB_FINAL_STATES:
if self.status() in self.JOB_FINAL_STATES:
self._final_interim_results = True
return self._interim_results
18 changes: 10 additions & 8 deletions qiskit_ibm_runtime/runtime_job_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
logger = logging.getLogger(__name__)

JobStatus = Literal["INITIALIZING", "QUEUED", "RUNNING", "CANCELLED", "DONE", "ERROR"]
JOB_FINAL_STATES: Tuple[JobStatus, ...] = ("DONE", "CANCELLED", "ERROR")
API_TO_JOB_STATUS: Dict[str, JobStatus] = {
"QUEUED": "QUEUED",
"RUNNING": "RUNNING",
Expand All @@ -53,6 +52,9 @@
class RuntimeJobV2(BasePrimitiveJob[PrimitiveResult, JobStatus], BaseRuntimeJob):
"""Representation of a runtime V2 primitive exeuction."""

JOB_FINAL_STATES: Tuple[JobStatus, ...] = ("DONE", "CANCELLED", "ERROR")
ERROR = "ERROR"

def __init__(
self,
backend: Backend,
Expand Down Expand Up @@ -199,11 +201,11 @@ def done(self) -> bool:

def errored(self) -> bool:
"""Return whether the job has failed."""
return self._status == "ERROR"
return self.status() == "ERROR"

def in_final_state(self) -> bool:
"""Return whether the job is in a final job state such as ``DONE`` or ``ERROR``."""
return self._status in JOB_FINAL_STATES
return self.status() in self.JOB_FINAL_STATES

def running(self) -> bool:
"""Return whether the job is actively running."""
Expand All @@ -221,7 +223,7 @@ def logs(self) -> str:
Raises:
IBMRuntimeError: If a network error occurred.
"""
if self.status() not in JOB_FINAL_STATES:
if self.status() not in self.JOB_FINAL_STATES:
logger.warning("Job logs are only available after the job finishes.")
try:
return self._api_client.job_logs(self.job_id())
Expand All @@ -247,14 +249,14 @@ def wait_for_final_state( # pylint: disable=arguments-differ
"""
try:
start_time = time.time()
if self._status not in JOB_FINAL_STATES and not self._is_streaming():
if self._status not in self.JOB_FINAL_STATES and not self._is_streaming():
self._ws_client_future = self._executor.submit(self._start_websocket_client)
if self._is_streaming():
self._ws_client_future.result(timeout)
# poll for status after stream has closed until status is final
# because status doesn't become final as soon as stream closes
status = self.status()
while status not in JOB_FINAL_STATES:
while status not in self.JOB_FINAL_STATES:
elapsed_time = time.time() - start_time
if timeout is not None and elapsed_time >= timeout:
raise RuntimeJobTimeoutError(
Expand Down Expand Up @@ -301,7 +303,7 @@ def stream_results(
RuntimeInvalidStateError: If a callback function is already streaming results or
if the job already finished.
"""
if self._status in JOB_FINAL_STATES:
if self._status in self.JOB_FINAL_STATES:
raise RuntimeInvalidStateError("Job already finished.")
if self._is_streaming():
raise RuntimeInvalidStateError("A callback function is already streaming results.")
Expand Down Expand Up @@ -329,6 +331,6 @@ def interim_results(self, decoder: Optional[Type[ResultDecoder]] = None) -> Any:
_decoder = decoder or self._interim_result_decoder
interim_results_raw = self._api_client.job_interim_results(job_id=self.job_id())
self._interim_results = _decoder.decode(interim_results_raw)
if self.status() in JOB_FINAL_STATES:
if self.status() in self.JOB_FINAL_STATES:
self._final_interim_results = True
return self._interim_results
3 changes: 3 additions & 0 deletions qiskit_ibm_runtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ def details(self) -> Optional[Dict[str, Any]]:
started_at: Timestamp of when the session was started.
closed_at: Timestamp of when the session was closed.
activated_at: Timestamp of when the session state was changed to active.
usage_time: The usage time, in seconds, of this Session or Batch.
Usage is defined as the time a quantum system is committed to complete a job.
"""
if self._session_id and isinstance(self._service, QiskitRuntimeService):
response = self._service._api_client.session_details(self._session_id)
Expand All @@ -303,6 +305,7 @@ def details(self) -> Optional[Dict[str, Any]]:
"started_at": response.get("started_at"),
"closed_at": response.get("closed_at"),
"activated_at": response.get("activated_at"),
"usage_time": response.get("elapsed_time"),
}
return None

Expand Down
2 changes: 2 additions & 0 deletions release-notes/unreleased/1547.bug.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed an issue where the `in_final_state()` method in `RuntimeJobV2` would not
update the status when called.
3 changes: 3 additions & 0 deletions release-notes/unreleased/1567.feat.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Session `details() <https://docs.quantum.ibm.com/api/qiskit-ibm-runtime/qiskit_ibm_runtime.Session#details>`__
now includes a new field, `usage_time`. Usage is defined as the time a quantum system
is committed to complete a job.
Loading

0 comments on commit 933d38e

Please sign in to comment.