Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache for estimator expectation and fix an issue #197

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions quafu/algorithms/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def execute_circuit(circ: QuantumCircuit, observables: Hamiltonian):
return sum(expectations)


# TODO: cache measure results values and reuse for expectation calculation
class Estimator:
"""Estimate expectation for quantum circuits and observables"""

Expand Down Expand Up @@ -61,12 +62,18 @@ def __init__(
self._task = Task()
self._task.config(backend=self._backend, **task_options)

def _run_real_machine(self, observables: Hamiltonian):
# Caching expectation calculation results
self._exp_cache = {}

def _run_real_machine(
self, observables: Hamiltonian, cache_key: Optional[str] = None
):
"""
Execute the circuit with observable expectation measurement task.
Args:
qc (QuantumCircuit): Quantum circuit that need to be executed on backend.
obslist (list[str, list[int]]): List of pauli string and its position.
cache_key: if set, check if cache hit and use cached measurement results.

Returns:
List of executed results and list of measured observable
Expand All @@ -86,7 +93,7 @@ def _run_real_machine(self, observables: Hamiltonian):
# TODO(zhaoyilun):
# investigate the best implementation for calculating
# expectation on real devices.
obslist = observables.to_legacy_quafu_pauli_list()
obslist = observables.to_pauli_list()

# save input circuit
inputs = copy.deepcopy(self._circ.gates)
Expand All @@ -108,20 +115,29 @@ def _run_real_machine(self, observables: Hamiltonian):
print("Job start, need measured in ", measure_basis)

exec_res = []
lst_task_id = []
for measure_base in measure_basis:
res = self._measure_obs(self._circ, measure_base=measure_base)
self._circ.gates = copy.deepcopy(inputs)
lst_task_id.append(res.taskid)

for tid in lst_task_id:
# retrieve task results
while True:
res = self._task.retrieve(tid)
if res.task_status == "Completed":
exec_res.append(res)
break
time.sleep(0.2)
if cache_key is not None and cache_key in self._exp_cache:
# try to retrieve exe results from cache
exec_res = self._exp_cache[cache_key]
else:
# send tasks to cloud platform
lst_task_id = []
for measure_base in measure_basis:
res = self._measure_obs(self._circ, measure_base=measure_base)
self._circ.gates = copy.deepcopy(inputs)
lst_task_id.append(res.taskid)

for tid in lst_task_id:
# retrieve task results
while True:
res = self._task.retrieve(tid)
if res.task_status == "Completed":
exec_res.append(res)
break
time.sleep(0.2)

if cache_key is not None:
# put into cache
self._exp_cache[cache_key] = exec_res

measure_results = []
for obi in range(len(obslist)):
Expand Down Expand Up @@ -168,20 +184,35 @@ def _run_simulation(self, observables: Hamiltonian):
# return expectation
return execute_circuit(self._circ, observables)

def run(self, observables: Hamiltonian, params: List[float]):
def clear_cache(self):
"""clean expectation cache"""
self._exp_cache.clear()

def run(
self,
observables: Hamiltonian,
params: List[float],
cache_key: Optional[str] = None,
):
"""Calculate estimation for given observables

Args:
observables: observables to be estimated.
paras_list: list of parameters of self.circ.

params: list of parameters of self.circ.
cache_key: if this value is set, we will first look into the _exp_cache to see
if previous measurement results can be reused. Note that it is the user's duty
to guarantee correctness.
Returns:
Expectation value
"""
res = None
if params is not None:
self._circ._update_params(params)

if self._backend == "sim":
return self._run_simulation(observables)
res = self._run_simulation(observables)
else:
return self._run_real_machine(observables)
# currently cache only work for real machine (cloud systems)
res = self._run_real_machine(observables, cache_key=cache_key)

return res
20 changes: 15 additions & 5 deletions quafu/algorithms/gradients/param_shift.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""Quafu parameter shift"""

from typing import List
from typing import List, Optional

import numpy as np

Expand All @@ -28,15 +28,17 @@ class ParamShift:
def __init__(self, estimator: Estimator) -> None:
self._est = estimator

def __call__(self, obs: Hamiltonian, params: List[float]):
def __call__(
self, obs: Hamiltonian, params: List[float], cache_key: Optional[str] = None
):
"""Calculate gradients using paramshift.

Args:
estimator (Estimator): estimator to calculate expectation values
params (List[float]): params to optimize
"""
if self._est._backend != "sim":
return self.grad(obs, params)
return self.grad(obs, params, cache_key=cache_key)
return self.new_grad(obs, params)

def _gen_param_shift_vals(self, params):
Expand All @@ -48,18 +50,26 @@ def _gen_param_shift_vals(self, params):
minus_params = params - offsets * np.pi / 2
return plus_params.tolist() + minus_params.tolist()

def grad(self, obs: Hamiltonian, params: List[float]):
def grad(
self, obs: Hamiltonian, params: List[float], cache_key: Optional[str] = None
):
"""grad.

Args:
obs (Hamiltonian): obs
params (List[float]): params
cache_key: cache prefix, currently the sample id in a batch
"""
shifted_params_lists = self._gen_param_shift_vals(params)

res = np.zeros(len(shifted_params_lists))
for i, shifted_params in enumerate(shifted_params_lists):
res[i] = self._est.run(obs, shifted_params)
final_cache_key = None
if cache_key is not None:
# parameters is uniquely determined by
# <sample-id-in-the-batch><order-in-shifted-parameters>
final_cache_key = cache_key + str(i)
res[i] = self._est.run(obs, shifted_params, cache_key=final_cache_key)

num_shift_params = len(res)
grads = (res[: num_shift_params // 2] - res[num_shift_params // 2 :]) / 2
Expand Down
36 changes: 24 additions & 12 deletions quafu/algorithms/gradients/vjp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def run_circ(
estimator = Estimator(circ, backend=backend)
if params is None:
params = [g.paras for g in circ.parameterized_gates]
output = [estimator.run(obs, params) for obs in obs_list]
output = [estimator.run(obs, params, cache_key="00") for obs in obs_list]
estimator.clear_cache()
return np.array(output)


Expand All @@ -66,6 +67,21 @@ def jacobian(
Args:
circ (QuantumCircuit): circ
params_input (np.ndarray): params_input, with shape [batch_size, num_params]
estimator (Estimator): estimator for calculating expectations.


Notes:
Since now we only use Z-axis expectations for all qubits as outputs
i.e., the observable is Z0,Z1,..., for the same circuit we only need
to send one task and the execution results can be used for calculating
expectations for all these Pauli-Z operators.

Thus we use cache here, to uniquely identity a circuit, we use the id
of parameters. Here we have batch_size * num_parameters * 2 lists of
parameters. Let batch_size be $M$, $N = num_parameters * 2$,

let $i\\in\\[0, M-1\\]$, $j\\in\\[0, M-1\\]$, the cache_key is then set
to be "{i}{j}"
"""
batch_size, num_params = params_input.shape
obs_list = _generate_expval_z(circ.num)
Expand All @@ -75,10 +91,16 @@ def jacobian(
calc_grad = ParamShift(estimator)
output = np.zeros((batch_size, num_outputs, num_params))
for i in range(batch_size):
# Same circuit, i.e., measurement results with the same parameters may be reused
cache_key_prefix = str(i)
grad_list = [
np.array(calc_grad(obs, params_input[i, :].tolist())) for obs in obs_list
np.array(
calc_grad(obs, params_input[i, :].tolist(), cache_key=cache_key_prefix)
)
for obs in obs_list
]
output[i, :, :] = np.stack(grad_list)
estimator.clear_cache()
return output


Expand Down Expand Up @@ -117,13 +139,3 @@ def compute_vjp(jac: np.ndarray, dy: np.ndarray):
vjp = np.einsum("ijk,ij->ik", jac, dy)

return vjp


# class QNode:
# """Quantum node which essentially wraps the execution of a quantum circuit"""
#
# def __init__(self, circ: QuantumCircuit) -> None:
# self._circ = circ
#
# def __call__(self):
# return execu
18 changes: 12 additions & 6 deletions quafu/algorithms/hamiltonian.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,20 @@ def get_matrix(self, qnum, big_endian=False):

return mat

# TODO(zhaoyilun): delete this in the future
def to_legacy_quafu_pauli_list(self):
"""Transform to legacy quafu pauli list format,
this is a temperal function and should be deleted later"""
def to_pauli_list(self):
"""
Transform to pauli list format for ease of
expectation calculation on cloud systems

Currently coeff does not make sense because expectation calculation
on cloud systems does not support it

Examples:
("Z0 Z1 Z2 Z3") -> ["ZZZZ", [0, 1, 2, 3]]
"""
res = []
for pauli_str in self.paulis:
for i, pos in enumerate(pauli_str.pos):
res.append([pauli_str.paulistr[i], [pos]])
res.append([pauli_str.paulistr, pauli_str.pos])
return res


Expand Down
3 changes: 2 additions & 1 deletion quafu/results/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ def __init__(self, input_dict):
0: "In Queue",
1: "Running",
2: "Completed",
"Canceled": 3,
3: "Canceled",
4: "Failed",
5: "Pending",
}
self.taskid = input_dict["task_id"]
self.taskname = input_dict["task_name"]
Expand Down
2 changes: 1 addition & 1 deletion tests/quafu/algorithms/estimator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def test_run(self, mock_retrieve, mock_send, mock_backends, mock_load_account):
estimator = Estimator(circ, backend="ScQ-P10")
expectation = estimator.run(test_ising, None)
task = Task()
res_org, obsexp_org = task.submit(circ, test_ising.to_legacy_quafu_pauli_list())
res_org, obsexp_org = task.submit(circ, test_ising.to_pauli_list())
assert expectation == sum(obsexp_org)

@pytest.mark.skipif(
Expand Down
Loading