diff --git a/.github/workflows/functional_tests.yaml b/.github/workflows/functional_tests.yaml index cde7e8ba..9b913d09 100644 --- a/.github/workflows/functional_tests.yaml +++ b/.github/workflows/functional_tests.yaml @@ -4,7 +4,7 @@ name: Python package on: - push: + push: pull_request: jobs: @@ -17,16 +17,16 @@ jobs: python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - python -m pip install flake8 pytest qiskit-aer qiskit_ibm_runtime + python -m pip install flake8 pytest - name: Lint with flake8 run: | # stop the build if there are Python syntax errors or undefined names diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 4495c0e7..6af4cfc2 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -14,9 +14,9 @@ jobs: lint: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Setup Python 3.8 - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_VERSION }} - name: Update pip and install lint utilities diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index 5d423f1f..35746edd 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -9,8 +9,8 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ env.PYTHON_VERSION }} - uses: pre-commit/action@v2.0.3 diff --git a/requirements.txt b/requirements.txt index 8bf4d45c..fc2e954c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,10 +8,10 @@ opt_einsum pathos>=0.2.7 pylatexenc>=2.10 pyscf>=2.0.1 -qiskit>=0.39.0,<1.0.0 +qiskit>=1.0.0 recommonmark -qiskit_ibm_runtime==0.20.0 -qiskit-aer==0.13.3 +qiskit-ibm-runtime>=0.20.0 +qiskit-aer>=0.13.3 scipy>=1.5.2 setuptools>=52.0.0 diff --git a/test/functional/test_controlled_unitary.py b/test/functional/test_controlled_unitary.py index 652ece59..d7e78660 100644 --- a/test/functional/test_controlled_unitary.py +++ b/test/functional/test_controlled_unitary.py @@ -22,10 +22,12 @@ SOFTWARE. """ -import torchquantum as tq from test.utils import check_all_close + import numpy as np +import torchquantum as tq + def test_controlled_unitary(): state = tq.QuantumDevice(n_wires=2) diff --git a/test/functional/test_func_mat_exp.py b/test/functional/test_func_mat_exp.py index e2a2c293..ad8e17c1 100644 --- a/test/functional/test_func_mat_exp.py +++ b/test/functional/test_func_mat_exp.py @@ -22,9 +22,10 @@ SOFTWARE. """ +import numpy as np import torch + import torchquantum as tq -import numpy as np def test_func_mat_exp(): diff --git a/test/hadamard_grad/test_hadamard_grad.py b/test/hadamard_grad/test_hadamard_grad.py index 62fdb21e..2eb387b8 100644 --- a/test/hadamard_grad/test_hadamard_grad.py +++ b/test/hadamard_grad/test_hadamard_grad.py @@ -1,7 +1,9 @@ import numpy as np +import pytest + from examples.hadamard_grad.circ import Circ1, Circ2, Circ3 from examples.hadamard_grad.hadamard_grad import hadamard_grad -import pytest + @pytest.mark.skip def test_hadamard_grad(): @@ -38,4 +40,4 @@ def test_hadamard_grad(): if __name__ == "__main__": - test_hadamard_grad() \ No newline at end of file + test_hadamard_grad() diff --git a/test/layers/test_nlocal.py b/test/layers/test_nlocal.py index 62387190..83bd1f6e 100644 --- a/test/layers/test_nlocal.py +++ b/test/layers/test_nlocal.py @@ -1,12 +1,13 @@ -import torchquantum as tq from qiskit.circuit.library import ( - TwoLocal, EfficientSU2, ExcitationPreserving, PauliTwoDesign, RealAmplitudes, + TwoLocal, ) +import torchquantum as tq + def compare_tq_to_qiskit(tq_circuit, qiskit_circuit, instance_info=""): """ @@ -16,8 +17,8 @@ def compare_tq_to_qiskit(tq_circuit, qiskit_circuit, instance_info=""): qiskit_ops = [] for bit in qiskit_circuit.decompose(): wires = [] - for qu in bit.qubits: - wires.append(qu.index) + for qb in bit.qubits: + wires.append(qiskit_circuit.find_bit(qb).index) qiskit_ops.append( { "name": bit.operation.name, @@ -29,9 +30,9 @@ def compare_tq_to_qiskit(tq_circuit, qiskit_circuit, instance_info=""): tq_ops = [ { "name": op["name"], - "wires": (op["wires"],) - if isinstance(op["wires"], int) - else tuple(op["wires"]), + "wires": ( + (op["wires"],) if isinstance(op["wires"], int) else tuple(op["wires"]) + ), } for op in tq_circuit.op_history ] diff --git a/test/layers/test_rotgate.py b/test/layers/test_rotgate.py index 30f24b8a..593563c7 100644 --- a/test/layers/test_rotgate.py +++ b/test/layers/test_rotgate.py @@ -1,14 +1,10 @@ -import torchquantum as tq -import qiskit -from qiskit import Aer, execute - -from torchquantum.util import ( - switch_little_big_endian_matrix, - find_global_phase, -) - -from qiskit.circuit.library import GR, GRX, GRY, GRZ import numpy as np +from qiskit import transpile +from qiskit.circuit.library import GR, GRX, GRY, GRZ +from qiskit_aer import AerSimulator + +import torchquantum as tq +from torchquantum.util import find_global_phase, switch_little_big_endian_matrix all_pairs = [ {"qiskit": GR, "tq": tq.layer.GlobalR, "params": 2}, @@ -19,6 +15,7 @@ ITERATIONS = 2 + def test_rotgates(): # test each pair for pair in all_pairs: @@ -28,15 +25,18 @@ def test_rotgates(): for _ in range(ITERATIONS): # generate random parameters params = [ - np.random.uniform(-2 * np.pi, 2 * np.pi) for _ in range(pair["params"]) + np.random.uniform(-2 * np.pi, 2 * np.pi) + for _ in range(pair["params"]) ] # create the qiskit circuit qiskit_circuit = pair["qiskit"](num_wires, *params) # get the unitary from qiskit - backend = Aer.get_backend("unitary_simulator") - result = execute(qiskit_circuit, backend).result() + backend = AerSimulator(method="unitary") + qiskit_circuit = transpile(qiskit_circuit, backend) + qiskit_circuit.save_unitary() + result = backend.run(qiskit_circuit).result() unitary_qiskit = result.get_unitary(qiskit_circuit) # create tq circuit diff --git a/test/measurement/test_eval_observable.py b/test/measurement/test_eval_observable.py index 58245ee0..499c2ad1 100644 --- a/test/measurement/test_eval_observable.py +++ b/test/measurement/test_eval_observable.py @@ -22,19 +22,21 @@ SOFTWARE. """ -from qiskit import QuantumCircuit -import numpy as np import random -from qiskit.opflow import StateFn, X, Y, Z, I -import torchquantum as tq +import numpy as np +from qiskit import QuantumCircuit +from qiskit.quantum_info import Pauli, Statevector +import torchquantum as tq from torchquantum.measurement import expval_joint_analytical, expval_joint_sampling from torchquantum.plugin import op_history2qiskit from torchquantum.util import switch_little_big_endian_state -import torch - +X = Pauli("X") +Y = Pauli("Y") +Z = Pauli("Z") +I = Pauli("I") pauli_str_op_dict = { "X": X, "Y": Y, @@ -67,20 +69,19 @@ def test_expval_observable(): for ob in obs[1:]: # note here the order is reversed because qiskit is in little endian operator = pauli_str_op_dict[ob] ^ operator - psi = StateFn(qiskit_circ) - psi_evaled = psi.eval()._primitive._data + psi = Statevector(qiskit_circ) state_tq = switch_little_big_endian_state( qdev.get_states_1d().detach().numpy() )[0] - assert np.allclose(psi_evaled, state_tq, atol=1e-5) + assert np.allclose(psi.data, state_tq, atol=1e-5) - expval_qiskit = (~psi @ operator @ psi).eval().real + expval_qiskit = psi.expectation_value(operator).real # print(expval_tq, expval_qiskit) assert np.isclose(expval_tq, expval_qiskit, atol=1e-5) if ( n_wires <= 3 ): # if too many wires, the stochastic method is not accurate due to limited shots - assert np.isclose(expval_tq_sampling, expval_qiskit, atol=1e-2) + assert np.isclose(expval_tq_sampling, expval_qiskit, atol=0.015) print("expval observable test passed") @@ -92,25 +93,25 @@ def util0(): qc.x(0) operator = Z ^ I - psi = StateFn(qc) - expectation_value = (~psi @ operator @ psi).eval() + psi = Statevector(qc) + expectation_value = psi.expectation_value(operator) print(expectation_value.real) # result: 1.0, means measurement result is 0, so Z is on qubit 1 operator = I ^ Z - psi = StateFn(qc) - expectation_value = (~psi @ operator @ psi).eval() + psi = Statevector(qc) + expectation_value = psi.expectation_value(operator) print(expectation_value.real) # result: -1.0 means measurement result is 1, so Z is on qubit 0 operator = I ^ I - psi = StateFn(qc) - expectation_value = (~psi @ operator @ psi).eval() + psi = Statevector(qc) + expectation_value = psi.expectation_value(operator) print(expectation_value.real) operator = Z ^ Z - psi = StateFn(qc) - expectation_value = (~psi @ operator @ psi).eval() + psi = Statevector(qc) + expectation_value = psi.expectation_value(operator) print(expectation_value.real) qc = QuantumCircuit(3) @@ -118,8 +119,8 @@ def util0(): qc.x(0) operator = I ^ I ^ Z - psi = StateFn(qc) - expectation_value = (~psi @ operator @ psi).eval() + psi = Statevector(qc) + expectation_value = psi.expectation_value(operator) print(expectation_value.real) diff --git a/test/measurement/test_expval_joint_sampling_grouping.py b/test/measurement/test_expval_joint_sampling_grouping.py index 09492458..8a759518 100644 --- a/test/measurement/test_expval_joint_sampling_grouping.py +++ b/test/measurement/test_expval_joint_sampling_grouping.py @@ -22,15 +22,16 @@ SOFTWARE. """ +import random + +import numpy as np + import torchquantum as tq from torchquantum.measurement import ( expval_joint_analytical, expval_joint_sampling_grouping, ) -import numpy as np -import random - def test_expval_joint_sampling_grouping(): n_obs = 20 @@ -54,7 +55,7 @@ def test_expval_joint_sampling_grouping(): ) for obs in obs_all: # assert - assert np.isclose(expval_ana[obs], expval_sam[obs][0].item(), atol=1e-2) + assert np.isclose(expval_ana[obs], expval_sam[obs][0].item(), atol=0.015) print(obs, expval_ana[obs], expval_sam[obs][0].item()) diff --git a/test/measurement/test_measure.py b/test/measurement/test_measure.py index 38c45df6..5fafa180 100644 --- a/test/measurement/test_measure.py +++ b/test/measurement/test_measure.py @@ -22,11 +22,12 @@ SOFTWARE. """ -import torchquantum as tq +import numpy as np +from qiskit import transpile +from qiskit_aer import AerSimulator +import torchquantum as tq from torchquantum.plugin import op_history2qiskit -from qiskit import Aer, transpile -import numpy as np def test_measure(): @@ -42,7 +43,7 @@ def test_measure(): circ = op_history2qiskit(qdev.n_wires, qdev.op_history) circ.measure_all() - simulator = Aer.get_backend("aer_simulator") + simulator = AerSimulator() circ = transpile(circ, simulator) qiskit_res = simulator.run(circ, shots=n_shots).result() qiskit_counts = qiskit_res.get_counts() diff --git a/test/operator/test_ControlledU.py b/test/operator/test_ControlledU.py index 5bc01096..e80dee1d 100644 --- a/test/operator/test_ControlledU.py +++ b/test/operator/test_ControlledU.py @@ -25,14 +25,13 @@ # test the controlled unitary function -import torchquantum as tq -import torchquantum.functional as tqf from test.utils import check_all_close # import pdb # pdb.set_trace() import numpy as np +import torchquantum as tq flag = 4 diff --git a/test/plugin/test_qiskit2tq_op_history.py b/test/plugin/test_qiskit2tq_op_history.py index 67a67e80..b94fb7fd 100644 --- a/test/plugin/test_qiskit2tq_op_history.py +++ b/test/plugin/test_qiskit2tq_op_history.py @@ -22,11 +22,11 @@ SOFTWARE. """ -from torchquantum.plugin import qiskit2tq_op_history -import torchquantum as tq -from qiskit.circuit.random import random_circuit from qiskit import QuantumCircuit +import torchquantum as tq +from torchquantum.plugin import qiskit2tq_op_history + def test_qiskit2tp_op_history(): circ = QuantumCircuit(3, 3) diff --git a/test/plugin/test_qiskit_plugins.py b/test/plugin/test_qiskit_plugins.py index 684dbfc6..76d0a4db 100644 --- a/test/plugin/test_qiskit_plugins.py +++ b/test/plugin/test_qiskit_plugins.py @@ -22,26 +22,24 @@ SOFTWARE. """ -from qiskit import QuantumCircuit -import numpy as np import random -from qiskit.opflow import StateFn, X, Y, Z, I -import torchquantum as tq +import numpy as np +import pytest +from qiskit.quantum_info import Pauli, Statevector -from torchquantum.plugin import op_history2qiskit, QiskitProcessor +import torchquantum as tq +from torchquantum.plugin import QiskitProcessor, op_history2qiskit from torchquantum.util import switch_little_big_endian_state -import torch -import pytest - pauli_str_op_dict = { - "X": X, - "Y": Y, - "Z": Z, - "I": I, + "X": Pauli("X"), + "Y": Pauli("Y"), + "Z": Pauli("Z"), + "I": Pauli("I"), } + @pytest.mark.skip def test_expval_observable(): # seed = 0 @@ -67,19 +65,18 @@ def test_expval_observable(): for ob in obs[1:]: # note here the order is reversed because qiskit is in little endian operator = pauli_str_op_dict[ob] ^ operator - psi = StateFn(qiskit_circ) - psi_evaled = psi.eval()._primitive._data + psi = Statevector(qiskit_circ) state_tq = switch_little_big_endian_state( qdev.get_states_1d().detach().numpy() )[0] - assert np.allclose(psi_evaled, state_tq, atol=1e-5) + assert np.allclose(psi.data, state_tq, atol=1e-5) - expval_qiskit = (~psi @ operator @ psi).eval().real + expval_qiskit = psi.expectation_value(operator).real # print(expval_qiskit_processor, expval_qiskit) if ( n_wires <= 3 ): # if too many wires, the stochastic method is not accurate due to limited shots - assert np.isclose(expval_qiskit_processor, expval_qiskit, atol=1e-2) + assert np.isclose(expval_qiskit_processor, expval_qiskit, atol=0.015) print("expval observable test passed") diff --git a/test/qiskit_plugin_test.py b/test/qiskit_plugin_test.py index d8b7e94b..a5aed71a 100644 --- a/test/qiskit_plugin_test.py +++ b/test/qiskit_plugin_test.py @@ -24,21 +24,22 @@ import argparse import pdb -import torch -import torchquantum as tq -import numpy as np +from test.static_mode_test import QLayer as AllRandomLayer -from qiskit import Aer, execute +import numpy as np +import torch +from qiskit_aer import AerSimulator from torchpack.utils.logging import logger + +import torchquantum as tq +from torchquantum.macro import F_DTYPE +from torchquantum.plugin import tq2qiskit from torchquantum.util import ( + find_global_phase, + get_expectations_from_counts, switch_little_big_endian_matrix, switch_little_big_endian_state, - get_expectations_from_counts, - find_global_phase, ) -from test.static_mode_test import QLayer as AllRandomLayer -from torchquantum.plugin import tq2qiskit -from torchquantum.macro import F_DTYPE def unitary_tq_vs_qiskit_test(): @@ -59,8 +60,9 @@ def unitary_tq_vs_qiskit_test(): # qiskit circ = tq2qiskit(q_layer, x) - simulator = Aer.get_backend("unitary_simulator") - result = execute(circ, simulator).result() + simulator = AerSimulator(method="unitary") + circ.save_unitary() + result = simulator.run(circ).result() unitary_qiskit = result.get_unitary(circ) stable_threshold = 1e-5 @@ -115,10 +117,11 @@ def state_tq_vs_qiskit_test(): # qiskit circ = tq2qiskit(q_layer, x) # Select the StatevectorSimulator from the Aer provider - simulator = Aer.get_backend("statevector_simulator") + simulator = AerSimulator(method="statevector") + circ.save_statevector() # Execute and get counts - result = execute(circ, simulator).result() + result = simulator.run(circ).result() state_qiskit = result.get_statevector(circ) stable_threshold = 1e-5 @@ -175,11 +178,10 @@ def measurement_tq_vs_qiskit_test(): circ = tq2qiskit(q_layer, x) circ.measure(list(range(n_wires)), list(range(n_wires))) - # Select the QasmSimulator from the Aer provider - simulator = Aer.get_backend("qasm_simulator") + simulator = AerSimulator() # Execute and get counts - result = execute(circ, simulator, shots=1000000).result() + result = simulator.run(circ, shots=1000000).result() counts = result.get_counts(circ) measured_qiskit = get_expectations_from_counts(counts, n_wires=n_wires) diff --git a/torchquantum/functional/func_controlled_unitary.py b/torchquantum/functional/func_controlled_unitary.py index dc909815..f5d745c0 100644 --- a/torchquantum/functional/func_controlled_unitary.py +++ b/torchquantum/functional/func_controlled_unitary.py @@ -24,8 +24,9 @@ import numpy as np import torch + from torchquantum.functional.gate_wrapper import gate_wrapper -from torchquantum.macro import * +from torchquantum.macro import C_DTYPE def controlled_unitary( @@ -97,7 +98,7 @@ def controlled_unitary( n_wires = n_c_wires + n_t_wires # compute the new unitary, then permute - unitary = torch.tensor(torch.zeros(2**n_wires, 2**n_wires, dtype=C_DTYPE)) + unitary = torch.zeros(2**n_wires, 2**n_wires, dtype=C_DTYPE) for k in range(2**n_wires - 2**n_t_wires): unitary[k, k] = 1.0 + 0.0j diff --git a/torchquantum/functional/gate_wrapper.py b/torchquantum/functional/gate_wrapper.py index a266e0c7..cab7379f 100644 --- a/torchquantum/functional/gate_wrapper.py +++ b/torchquantum/functional/gate_wrapper.py @@ -1,15 +1,13 @@ import functools -import torch +from typing import TYPE_CHECKING, Callable + import numpy as np +import torch -from typing import Callable, Union, Optional, List, Dict, TYPE_CHECKING -from ..macro import C_DTYPE, F_DTYPE, ABC, ABC_ARRAY, INV_SQRT2 -from ..util.utils import pauli_eigs, diag -from torchpack.utils.logging import logger -from torchquantum.util import normalize_statevector +from ..macro import ABC, ABC_ARRAY, C_DTYPE, F_DTYPE if TYPE_CHECKING: - from torchquantum.device import QuantumDevice, NoiseDevice + from torchquantum.device import QuantumDevice else: QuantumDevice = None @@ -58,7 +56,7 @@ def apply_unitary_einsum(state, mat, wires): # All affected indices will be summed over, so we need the same number # of new indices - new_indices = ABC[total_wires: total_wires + len(device_wires)] + new_indices = ABC[total_wires : total_wires + len(device_wires)] # The new indices of the state are given by the old ones with the # affected indices replaced by the new_indices @@ -186,7 +184,7 @@ def apply_unitary_density_einsum(density, mat, wires): # All affected indices will be summed over, so we need the same number # of new indices - new_indices = ABC[total_wires: total_wires + len(device_wires)] + new_indices = ABC[total_wires : total_wires + len(device_wires)] # The new indices of the state are given by the old ones with the # affected indices replaced by the new_indices @@ -210,7 +208,7 @@ def apply_unitary_density_einsum(density, mat, wires): new_density = torch.einsum(einsum_indices, mat, density) - """ + r""" Compute U \rho U^\dagger """ @@ -224,7 +222,7 @@ def apply_unitary_density_einsum(density, mat, wires): # All affected indices will be summed over, so we need the same number # of new indices - new_indices = ABC[total_wires: total_wires + len(device_wires)] + new_indices = ABC[total_wires : total_wires + len(device_wires)] # The new indices of the state are given by the old ones with the # affected indices replaced by the new_indices @@ -273,7 +271,9 @@ def apply_unitary_density_bmm(density, mat, wires): permute_to = permute_to[:1] + devices_dims + permute_to[1:] permute_back = list(np.argsort(permute_to)) original_shape = density.shape - permuted = density.permute(permute_to).reshape([original_shape[0], mat.shape[-1], -1]) + permuted = density.permute(permute_to).reshape( + [original_shape[0], mat.shape[-1], -1] + ) if len(mat.shape) > 2: # both matrix and state are in batch mode @@ -284,8 +284,8 @@ def apply_unitary_density_bmm(density, mat, wires): expand_shape = [bsz] + list(mat.shape) new_density = mat.expand(expand_shape).bmm(permuted) new_density = new_density.view(original_shape).permute(permute_back) - """ - Compute \rho U^\dagger + r""" + Compute \rho U^\dagger """ matdag = mat.conj() @@ -302,7 +302,9 @@ def apply_unitary_density_bmm(density, mat, wires): del permute_to_dag[d] permute_to_dag = permute_to_dag + devices_dims_dag permute_back_dag = list(np.argsort(permute_to_dag)) - permuted_dag = new_density.permute(permute_to_dag).reshape([original_shape[0], -1, matdag.shape[-1]]) + permuted_dag = new_density.permute(permute_to_dag).reshape( + [original_shape[0], -1, matdag.shape[0]] + ) if len(matdag.shape) > 2: # both matrix and state are in batch mode @@ -323,17 +325,17 @@ def apply_unitary_density_bmm(density, mat, wires): def gate_wrapper( - name, - mat, - method, - q_device: QuantumDevice, - wires, - paramnum=0, - params=None, - n_wires=None, - static=False, - parent_graph=None, - inverse=False, + name, + mat, + method, + q_device: QuantumDevice, + wires, + paramnum=0, + params=None, + n_wires=None, + static=False, + parent_graph=None, + inverse=False, ): """Perform the phaseshift gate. @@ -389,9 +391,11 @@ def gate_wrapper( { "name": name, # type: ignore "wires": np.array(wires).squeeze().tolist(), - "params": params.squeeze().detach().cpu().numpy().tolist() - if params is not None - else None, + "params": ( + params.squeeze().detach().cpu().numpy().tolist() + if params is not None + else None + ), "inverse": inverse, "trainable": params.requires_grad if params is not None else False, } @@ -438,6 +442,7 @@ def gate_wrapper( else: matrix = matrix.permute(1, 0) assert np.log2(matrix.shape[-1]) == len(wires) + # TODO: There might be a better way to discriminate noisedevice and normal statevector device if q_device.device_name == "noisedevice": density = q_device.densities diff --git a/torchquantum/noise_model/noise_models.py b/torchquantum/noise_model/noise_models.py index 571314e9..2309c7e8 100644 --- a/torchquantum/noise_model/noise_models.py +++ b/torchquantum/noise_model/noise_models.py @@ -24,12 +24,11 @@ import numpy as np import torch -import torchquantum as tq - +from qiskit_aer.noise import NoiseModel from torchpack.utils.logging import logger -from qiskit.providers.aer.noise import NoiseModel -from torchquantum.util import get_provider +import torchquantum as tq +from torchquantum.util import get_provider __all__ = [ "NoiseModelTQ", @@ -50,31 +49,31 @@ def cos_adjust_noise( orig_noise_total_prob, ): """ - Adjust the noise probability based on the current epoch and a cosine schedule. + Adjust the noise probability based on the current epoch and a cosine schedule. + + Args: + current_epoch (int): The current epoch. + n_epochs (int): The total number of epochs. + prob_schedule (str): The probability schedule type. Possible values are: + - None: No schedule, use the original noise probability. + - "increase": Increase the noise probability using a cosine schedule. + - "decrease": Decrease the noise probability using a cosine schedule. + - "increase_decrease": Increase the noise probability until a separator epoch, + then decrease it using cosine schedules. + prob_schedule_separator (int): The epoch at which the schedule changes for + "increase_decrease" mode. + orig_noise_total_prob (float): The original noise probability. + + Returns: + float: The adjusted noise probability based on the schedule. + + Note: + The adjusted noise probability is returned as a float between 0 and 1. + + Raises: + None. - Args: - current_epoch (int): The current epoch. - n_epochs (int): The total number of epochs. - prob_schedule (str): The probability schedule type. Possible values are: - - None: No schedule, use the original noise probability. - - "increase": Increase the noise probability using a cosine schedule. - - "decrease": Decrease the noise probability using a cosine schedule. - - "increase_decrease": Increase the noise probability until a separator epoch, - then decrease it using cosine schedules. - prob_schedule_separator (int): The epoch at which the schedule changes for - "increase_decrease" mode. - orig_noise_total_prob (float): The original noise probability. - - Returns: - float: The adjusted noise probability based on the schedule. - - Note: - The adjusted noise probability is returned as a float between 0 and 1. - - Raises: - None. - - """ + """ if prob_schedule is None: noise_total_prob = orig_noise_total_prob @@ -134,31 +133,31 @@ def cos_adjust_noise( def apply_readout_error_func(x, c2p_mapping, measure_info): """ - Apply readout error to the measurement outcomes. - - Args: - x (torch.Tensor): The measurement outcomes, represented as a tensor of shape (batch_size, num_qubits). - c2p_mapping (dict): Mapping from qubit indices to physical wire indices. - measure_info (dict): Measurement information dictionary containing the probabilities for different outcomes. - - Returns: - torch.Tensor: The measurement outcomes after applying the readout error, represented as a tensor of the same shape as x. - - Note: - The readout error is applied based on the given mapping and measurement information. - The measurement information dictionary should have the following structure: - { - (wire_1,): {"probabilities": [[p_0, p_1], [p_0, p_1]]}, - (wire_2,): {"probabilities": [[p_0, p_1], [p_0, p_1]]}, - ... - } - where wire_1, wire_2, ... are the physical wire indices, and p_0 and p_1 are the probabilities of measuring 0 and 1, respectively, - for each wire. - - Raises: - None. + Apply readout error to the measurement outcomes. + + Args: + x (torch.Tensor): The measurement outcomes, represented as a tensor of shape (batch_size, num_qubits). + c2p_mapping (dict): Mapping from qubit indices to physical wire indices. + measure_info (dict): Measurement information dictionary containing the probabilities for different outcomes. + + Returns: + torch.Tensor: The measurement outcomes after applying the readout error, represented as a tensor of the same shape as x. + + Note: + The readout error is applied based on the given mapping and measurement information. + The measurement information dictionary should have the following structure: + { + (wire_1,): {"probabilities": [[p_0, p_1], [p_0, p_1]]}, + (wire_2,): {"probabilities": [[p_0, p_1], [p_0, p_1]]}, + ... + } + where wire_1, wire_2, ... are the physical wire indices, and p_0 and p_1 are the probabilities of measuring 0 and 1, respectively, + for each wire. + + Raises: + None. - """ + """ # add readout error noise_free_0_probs = (x + 1) / 2 noise_free_1_probs = 1 - (x + 1) / 2 @@ -196,21 +195,22 @@ def apply_readout_error_func(x, c2p_mapping, measure_info): class NoiseCounter: """ - A class for counting the occurrences of Pauli error gates. + A class for counting the occurrences of Pauli error gates. - Attributes: - counter_x (int): Counter for Pauli X errors. - counter_y (int): Counter for Pauli Y errors. - counter_z (int): Counter for Pauli Z errors. - counter_X (int): Counter for Pauli X errors (for two-qubit gates). - counter_Y (int): Counter for Pauli Y errors (for two-qubit gates). - counter_Z (int): Counter for Pauli Z errors (for two-qubit gates). + Attributes: + counter_x (int): Counter for Pauli X errors. + counter_y (int): Counter for Pauli Y errors. + counter_z (int): Counter for Pauli Z errors. + counter_X (int): Counter for Pauli X errors (for two-qubit gates). + counter_Y (int): Counter for Pauli Y errors (for two-qubit gates). + counter_Z (int): Counter for Pauli Z errors (for two-qubit gates). - Methods: - add(error): Adds a Pauli error to the counters based on the error type. - __str__(): Returns a string representation of the counters. + Methods: + add(error): Adds a Pauli error to the counters based on the error type. + __str__(): Returns a string representation of the counters. + + """ - """ def __init__(self): self.counter_x = 0 self.counter_y = 0 @@ -220,51 +220,51 @@ def __init__(self): self.counter_Z = 0 def add(self, error): - if error == 'x': + if error == "x": self.counter_x += 1 - elif error == 'y': + elif error == "y": self.counter_y += 1 - elif error == 'z': + elif error == "z": self.counter_z += 1 - if error == 'X': + if error == "X": self.counter_X += 1 - elif error == 'Y': + elif error == "Y": self.counter_Y += 1 - elif error == 'Z': + elif error == "Z": self.counter_Z += 1 else: pass - - def __str__(self) -> str: - return f'single qubit error: pauli x = {self.counter_x}, pauli y = {self.counter_y}, pauli z = {self.counter_z}\n' + \ - f'double qubit error: pauli x = {self.counter_X}, pauli y = {self.counter_Y}, pauli z = {self.counter_Z}' + def __str__(self) -> str: + return ( + f"single qubit error: pauli x = {self.counter_x}, pauli y = {self.counter_y}, pauli z = {self.counter_z}\n" + + f"double qubit error: pauli x = {self.counter_X}, pauli y = {self.counter_Y}, pauli z = {self.counter_Z}" + ) class NoiseModelTQ(object): """ - A class for applying gate insertion and readout errors. - - Attributes: - noise_model_name (str): Name of the noise model. - n_epochs (int): Number of epochs. - noise_total_prob (float): Total probability of noise. - ignored_ops (tuple): Operations to be ignored. - prob_schedule (list): Probability schedule. - prob_schedule_separator (str): Separator for probability schedule. - factor (float): Factor for adjusting probabilities. - add_thermal (bool): Flag indicating whether to add thermal relaxation. - - Methods: - adjust_noise(current_epoch): Adjusts the noise based on the current epoch. - clean_parsed_noise_model_dict(nm_dict, ignored_ops): Cleans the parsed noise model dictionary. - parse_noise_model_dict(nm_dict): Parses the noise model dictionary. - magnify_probs(probs): Magnifies the probabilities based on a factor. - sample_noise_op(op_in): Samples a noise operation based on the given operation. - apply_readout_error(x): Applies readout error to the input. - - """ + A class for applying gate insertion and readout errors. + + Attributes: + noise_model_name (str): Name of the noise model. + n_epochs (int): Number of epochs. + noise_total_prob (float): Total probability of noise. + ignored_ops (tuple): Operations to be ignored. + prob_schedule (list): Probability schedule. + prob_schedule_separator (str): Separator for probability schedule. + factor (float): Factor for adjusting probabilities. + add_thermal (bool): Flag indicating whether to add thermal relaxation. + + Methods: + adjust_noise(current_epoch): Adjusts the noise based on the current epoch. + clean_parsed_noise_model_dict(nm_dict, ignored_ops): Cleans the parsed noise model dictionary. + parse_noise_model_dict(nm_dict): Parses the noise model dictionary. + magnify_probs(probs): Magnifies the probabilities based on a factor. + sample_noise_op(op_in): Samples a noise operation based on the given operation. + apply_readout_error(x): Applies readout error to the input. + """ def __init__( self, @@ -295,7 +295,9 @@ def __init__( self.ignored_ops = ignored_ops self.parsed_dict = self.parse_noise_model_dict(self.noise_model_dict) - self.parsed_dict = self.clean_parsed_noise_model_dict(self.parsed_dict, ignored_ops) + self.parsed_dict = self.clean_parsed_noise_model_dict( + self.parsed_dict, ignored_ops + ) self.n_epochs = n_epochs self.prob_schedule = prob_schedule self.prob_schedule_separator = prob_schedule_separator @@ -313,39 +315,66 @@ def adjust_noise(self, current_epoch): @staticmethod def clean_parsed_noise_model_dict(nm_dict, ignored_ops): - # remove the ignored operation in the instructions and probs + # remove the ignored operation in the instructions and probs # --> only get the pauli-x,y,z errors. ignore the thermal relaxation errors (kraus operator) def filter_inst(inst_list: list) -> list: new_inst_list = [] for inst in inst_list: - if inst['name'] in ignored_ops: + if inst["name"] in ignored_ops: continue new_inst_list.append(inst) return new_inst_list - ignored_ops = set(ignored_ops) - single_depolarization = set(['x', 'y', 'z']) - double_depolarization = set(['IX', 'IY', 'IZ', 'XI', 'XX', 'XY', 'XZ', 'YI', 'YX', 'YY', 'YZ', 'ZI', 'ZX', 'ZY', 'ZZ']) # 16 - 1 = 15 combinations + ignored_ops = set(ignored_ops) + single_depolarization = set(["x", "y", "z"]) + double_depolarization = set( + [ + "IX", + "IY", + "IZ", + "XI", + "XX", + "XY", + "XZ", + "YI", + "YX", + "YY", + "YZ", + "ZI", + "ZX", + "ZY", + "ZZ", + ] + ) # 16 - 1 = 15 combinations for operation, operation_info in nm_dict.items(): for qubit, qubit_info in operation_info.items(): inst_all = [] prob_all = [] if qubit_info["type"] == "qerror": - for inst, prob in zip(qubit_info["instructions"], qubit_info["probabilities"]): - if operation in ['x', 'sx', 'id', 'reset']: # single qubit gate - if any([inst_one["name"] in single_depolarization for inst_one in inst]): + for inst, prob in zip( + qubit_info["instructions"], qubit_info["probabilities"] + ): + if operation in ["x", "sx", "id", "reset"]: # single qubit gate + if any( + [ + inst_one["name"] in single_depolarization + for inst_one in inst + ] + ): inst_all.append(filter_inst(inst)) prob_all.append(prob) - elif operation in ['cx']: # double qubit gate + elif operation in ["cx"]: # double qubit gate try: - if inst[0]['params'][0] in double_depolarization and (inst[1]['name'] == 'id' or inst[2]['name'] == 'id'): + if inst[0]["params"][0] in double_depolarization and ( + inst[1]["name"] == "id" or inst[2]["name"] == "id" + ): inst_all.append(filter_inst(inst)) prob_all.append(prob) except: pass # don't know how to deal with this case else: - raise Exception(f'{operation} not considered...') + raise Exception(f"{operation} not considered...") nm_dict[operation][qubit]["instructions"] = inst_all nm_dict[operation][qubit]["probabilities"] = prob_all return nm_dict @@ -364,8 +393,13 @@ def parse_noise_model_dict(nm_dict): } if info["operations"][0] not in parsed.keys(): - parsed[info["operations"][0]] = {tuple(info["gate_qubits"][0]): val_dict} - elif tuple(info["gate_qubits"][0]) not in parsed[info["operations"][0]].keys(): + parsed[info["operations"][0]] = { + tuple(info["gate_qubits"][0]): val_dict + } + elif ( + tuple(info["gate_qubits"][0]) + not in parsed[info["operations"][0]].keys() + ): parsed[info["operations"][0]][tuple(info["gate_qubits"][0])] = val_dict else: raise ValueError @@ -432,30 +466,36 @@ def sample_noise_op(self, op_in): ops = [] for instruction in instructions: - v_wires = [self.p_v_reg_mapping["p2v"][qubit] for qubit in instruction["qubits"]] + v_wires = [ + self.p_v_reg_mapping["p2v"][qubit] for qubit in instruction["qubits"] + ] if instruction["name"] == "x": ops.append(tq.PauliX(wires=v_wires)) - self.noise_counter.add('x') + self.noise_counter.add("x") elif instruction["name"] == "y": ops.append(tq.PauliY(wires=v_wires)) - self.noise_counter.add('y') + self.noise_counter.add("y") elif instruction["name"] == "z": ops.append(tq.PauliZ(wires=v_wires)) - self.noise_counter.add('z') + self.noise_counter.add("z") elif instruction["name"] == "reset": ops.append(tq.Reset(wires=v_wires)) elif instruction["name"] == "pauli": - twoqubit_depolarization = list(instruction['params'][0]) # ['XY'] --> ['X', 'Y'] - for singlequbit_deloparization, v_wire in zip(twoqubit_depolarization, v_wires): - if singlequbit_deloparization == 'X': + twoqubit_depolarization = list( + instruction["params"][0] + ) # ['XY'] --> ['X', 'Y'] + for singlequbit_deloparization, v_wire in zip( + twoqubit_depolarization, v_wires + ): + if singlequbit_deloparization == "X": ops.append(tq.PauliX(wires=[v_wire])) - self.noise_counter.add('X') - elif singlequbit_deloparization == 'Y': + self.noise_counter.add("X") + elif singlequbit_deloparization == "Y": ops.append(tq.PauliY(wires=[v_wire])) - self.noise_counter.add('Y') - elif singlequbit_deloparization == 'Z': + self.noise_counter.add("Y") + elif singlequbit_deloparization == "Z": ops.append(tq.PauliZ(wires=[v_wire])) - self.noise_counter.add('Z') + self.noise_counter.add("Z") else: pass # 'I' case else: @@ -474,25 +514,24 @@ def apply_readout_error(self, x): class NoiseModelTQActivation(object): """ - A class for adding noise to the activations. - - Attributes: - mean (tuple): Mean values of the noise. - std (tuple): Standard deviation values of the noise. - n_epochs (int): Number of epochs. - prob_schedule (list): Probability schedule. - prob_schedule_separator (str): Separator for probability schedule. - after_norm (bool): Flag indicating whether noise should be added after normalization. - factor (float): Factor for adjusting the noise. - - Methods: - adjust_noise(current_epoch): Adjusts the noise based on the current epoch. - sample_noise_op(op_in): Samples a noise operation. - apply_readout_error(x): Applies readout error to the input. - add_noise(x, node_id, is_after_norm): Adds noise to the activations. - - """ + A class for adding noise to the activations. + + Attributes: + mean (tuple): Mean values of the noise. + std (tuple): Standard deviation values of the noise. + n_epochs (int): Number of epochs. + prob_schedule (list): Probability schedule. + prob_schedule_separator (str): Separator for probability schedule. + after_norm (bool): Flag indicating whether noise should be added after normalization. + factor (float): Factor for adjusting the noise. + + Methods: + adjust_noise(current_epoch): Adjusts the noise based on the current epoch. + sample_noise_op(op_in): Samples a noise operation. + apply_readout_error(x): Applies readout error to the input. + add_noise(x, node_id, is_after_norm): Adds noise to the activations. + """ def __init__( self, @@ -560,23 +599,23 @@ def add_noise(self, x, node_id, is_after_norm=False): class NoiseModelTQPhase(object): """ - A class for adding noise to rotation parameters. - - Attributes: - mean (float): Mean value of the noise. - std (float): Standard deviation value of the noise. - n_epochs (int): Number of epochs. - prob_schedule (list): Probability schedule. - prob_schedule_separator (str): Separator for probability schedule. - factor (float): Factor for adjusting the noise. - - Methods: - adjust_noise(current_epoch): Adjusts the noise based on the current epoch. - sample_noise_op(op_in): Samples a noise operation. - apply_readout_error(x): Applies readout error to the input. - add_noise(phase): Adds noise to the rotation parameters. + A class for adding noise to rotation parameters. + + Attributes: + mean (float): Mean value of the noise. + std (float): Standard deviation value of the noise. + n_epochs (int): Number of epochs. + prob_schedule (list): Probability schedule. + prob_schedule_separator (str): Separator for probability schedule. + factor (float): Factor for adjusting the noise. + + Methods: + adjust_noise(current_epoch): Adjusts the noise based on the current epoch. + sample_noise_op(op_in): Samples a noise operation. + apply_readout_error(x): Applies readout error to the input. + add_noise(phase): Adds noise to the rotation parameters. - """ + """ def __init__( self, @@ -638,40 +677,43 @@ def add_noise(self, phase): class NoiseModelTQReadoutOnly(NoiseModelTQ): """ - A subclass of NoiseModelTQ that applies readout errors only. + A subclass of NoiseModelTQ that applies readout errors only. + + This class inherits from NoiseModelTQ and overrides the sample_noise_op method to exclude the insertion of any noise operations other than readout errors. It is designed for scenarios where only readout errors are considered, and all other noise sources are ignored. - This class inherits from NoiseModelTQ and overrides the sample_noise_op method to exclude the insertion of any noise operations other than readout errors. It is designed for scenarios where only readout errors are considered, and all other noise sources are ignored. + Methods: + sample_noise_op(op_in): Returns an empty list, indicating no noise operations are applied. + """ - Methods: - sample_noise_op(op_in): Returns an empty list, indicating no noise operations are applied. - """ def sample_noise_op(self, op_in): return [] class NoiseModelTQQErrorOnly(NoiseModelTQ): """ - A subclass of NoiseModelTQ that applies only readout errors. + A subclass of NoiseModelTQ that applies only readout errors. - This class inherits from NoiseModelTQ and overrides the apply_readout_error method to apply readout errors. It removes activation noise and only focuses on readout errors in the noise model. + This class inherits from NoiseModelTQ and overrides the apply_readout_error method to apply readout errors. It removes activation noise and only focuses on readout errors in the noise model. - Methods: - apply_readout_error(x): Applies readout error to the given activation values. + Methods: + apply_readout_error(x): Applies readout error to the given activation values. + + """ - """ def apply_readout_error(self, x): return x class NoiseModelTQActivationReadout(NoiseModelTQActivation): """ - A subclass of NoiseModelTQActivation that applies readout errors. + A subclass of NoiseModelTQActivation that applies readout errors. - This class inherits from NoiseModelTQActivation and overrides the apply_readout_error method to incorporate readout errors. It combines activation noise and readout errors into the noise model. + This class inherits from NoiseModelTQActivation and overrides the apply_readout_error method to incorporate readout errors. It combines activation noise and readout errors into the noise model. + + Methods: + apply_readout_error(x): Applies readout error to the given activation values + """ - Methods: - apply_readout_error(x): Applies readout error to the given activation values - """ def __init__( self, noise_model_name, @@ -713,13 +755,14 @@ def apply_readout_error(self, x): class NoiseModelTQPhaseReadout(NoiseModelTQPhase): """ - A subclass of NoiseModelTQPhase that applies readout errors to phase values. + A subclass of NoiseModelTQPhase that applies readout errors to phase values. - This class inherits from NoiseModelTQPhase and overrides the apply_readout_error method to apply readout errors specifically to phase values. It uses the noise model provided to introduce readout errors. + This class inherits from NoiseModelTQPhase and overrides the apply_readout_error method to apply readout errors specifically to phase values. It uses the noise model provided to introduce readout errors. + + Methods: + apply_readout_error(x): Applies readout error to the given phase values. + """ - Methods: - apply_readout_error(x): Applies readout error to the given phase values. - """ def __init__( self, noise_model_name, diff --git a/torchquantum/operator/standard_gates/qubit_unitary.py b/torchquantum/operator/standard_gates/qubit_unitary.py index 5f7fd9b1..4b087cd1 100644 --- a/torchquantum/operator/standard_gates/qubit_unitary.py +++ b/torchquantum/operator/standard_gates/qubit_unitary.py @@ -1,11 +1,12 @@ -from ..op_types import * from abc import ABCMeta -from torchquantum.macro import C_DTYPE -import torchquantum as tq + +import numpy as np import torch -from torchquantum.functional import mat_dict + import torchquantum.functional as tqf -import numpy as np +from torchquantum.macro import C_DTYPE + +from ..op_types import * class QubitUnitary(Operation, metaclass=ABCMeta): @@ -118,7 +119,7 @@ def from_controlled_operation( n_wires = n_c_wires + n_t_wires # compute the new unitary, then permute - unitary = torch.tensor(torch.zeros(2**n_wires, 2**n_wires, dtype=C_DTYPE)) + unitary = torch.zeros(2**n_wires, 2**n_wires, dtype=C_DTYPE) for k in range(2**n_wires - 2**n_t_wires): unitary[k, k] = 1.0 + 0.0j diff --git a/torchquantum/plugin/qiskit/qiskit_plugin.py b/torchquantum/plugin/qiskit/qiskit_plugin.py index bca3a7d2..385a0d0f 100644 --- a/torchquantum/plugin/qiskit/qiskit_plugin.py +++ b/torchquantum/plugin/qiskit/qiskit_plugin.py @@ -22,24 +22,25 @@ SOFTWARE. """ -import torch -import torchquantum as tq -import torchquantum.functional as tqf -import qiskit.circuit.library.standard_gates as qiskit_gate -import numpy as np +from typing import Iterable -from qiskit import QuantumCircuit, ClassicalRegister -from qiskit import Aer, execute +import numpy as np +import qiskit +import qiskit.circuit.library.standard_gates as qiskit_gate +import torch +from qiskit import ClassicalRegister, QuantumCircuit, transpile from qiskit.circuit import Parameter +from qiskit_aer import AerSimulator from torchpack.utils.logging import logger + +import torchquantum as tq +import torchquantum.functional as tqf +from torchquantum.functional import mat_dict from torchquantum.util import ( - switch_little_big_endian_matrix, find_global_phase, + switch_little_big_endian_matrix, switch_little_big_endian_state, ) -from typing import Iterable, List -from torchquantum.functional import mat_dict - __all__ = [ "tq2qiskit", @@ -63,7 +64,7 @@ def qiskit2tq_op_history(circ): if getattr(circ, "_layout", None) is not None: try: p2v_orig = circ._layout.final_layout.get_physical_bits().copy() - except: + except AttributeError: p2v_orig = circ._layout.get_physical_bits().copy() p2v = {} for p, v in p2v_orig.items(): @@ -79,13 +80,15 @@ def qiskit2tq_op_history(circ): ops = [] for gate in circ.data: op_name = gate[0].name - wires = list(map(lambda x: x.index, gate[1])) + wires = [circ.find_bit(qb).index for qb in gate.qubits] wires = [p2v[wire] for wire in wires] # sometimes the gate.params is ParameterExpression class init_params = ( list(map(float, gate[0].params)) if len(gate[0].params) > 0 else None ) - print(op_name,) + print( + op_name, + ) if op_name in [ "h", @@ -104,12 +107,12 @@ def qiskit2tq_op_history(circ): ]: ops.append( { - "name": op_name, # type: ignore - "wires": np.array(wires), - "params": None, - "inverse": False, - "trainable": False, - } + "name": op_name, # type: ignore + "wires": np.array(wires), + "params": None, + "inverse": False, + "trainable": False, + } ) elif op_name in [ "rx", @@ -138,12 +141,13 @@ def qiskit2tq_op_history(circ): ]: ops.append( { - "name": op_name, # type: ignore - "wires": np.array(wires), - "params": init_params, - "inverse": False, - "trainable": True - }) + "name": op_name, # type: ignore + "wires": np.array(wires), + "params": init_params, + "inverse": False, + "trainable": True, + } + ) elif op_name in ["barrier", "measure"]: continue else: @@ -206,7 +210,10 @@ def append_parameterized_gate(func, circ, input_idx, params, wires): ) elif func == "u2": from qiskit.circuit.library import U2Gate - circ.append(U2Gate(phi=params[input_idx[0]], lam=params[input_idx[1]]), wires, []) + + circ.append( + U2Gate(phi=params[input_idx[0]], lam=params[input_idx[1]]), wires, [] + ) # circ.u2(phi=params[input_idx[0]], lam=params[input_idx[1]], qubit=wires[0]) elif func == "u3": circ.u( @@ -224,7 +231,7 @@ def append_parameterized_gate(func, circ, input_idx, params, wires): ) else: raise NotImplementedError( - f"{func} cannot be converted to " f"parameterized Qiskit QuantumCircuit" + f"{func} cannot be converted to parameterized Qiskit QuantumCircuit" ) @@ -251,7 +258,7 @@ def append_fixed_gate(circ, func, params, wires, inverse): elif func == "sx": circ.sx(*wires) elif func in ["cnot", "cx"]: - circ.cnot(*wires) + circ.cx(*wires) elif func == "cz": circ.cz(*wires) elif func == "cy": @@ -297,6 +304,7 @@ def append_fixed_gate(circ, func, params, wires, inverse): circ.cu1(params, *wires) elif func == "u2": from qiskit.circuit.library import U2Gate + circ.append(U2Gate(phi=params[0], lam=params[1]), wires, []) # circ.u2(*list(params), *wires) elif func == "u3": @@ -336,7 +344,7 @@ def append_fixed_gate(circ, func, params, wires, inverse): def tq2qiskit_initialize(q_device: tq.QuantumDevice, all_states): - """Call the qiskit initialize funtion and encoder the current quantum state + """Call the qiskit initialize function and encoder the current quantum state using initialize and return circuits Args: @@ -436,7 +444,7 @@ def tq2qiskit( # generate only one qiskit QuantumCircuit assert module.params is None or module.params.shape[0] == 1 except AssertionError: - logger.exception(f"Cannot convert batch model tq module") + logger.exception("Cannot convert batch model tq module") n_removed_ops = 0 @@ -489,7 +497,7 @@ def tq2qiskit( elif module.name == "SX": circ.sx(*module.wires) elif module.name == "CNOT": - circ.cnot(*module.wires) + circ.cx(*module.wires) elif module.name == "CZ": circ.cz(*module.wires) elif module.name == "CY": @@ -535,7 +543,15 @@ def tq2qiskit( circ.cu1(module.params[0][0].item(), *module.wires) elif module.name == "U2": from qiskit.circuit.library import U2Gate - circ.append(U2Gate(phi=module.params[0].data.cpu().numpy()[0], lam=module.params[0].data.cpu().numpy()[0]), module.wires, []) + + circ.append( + U2Gate( + phi=module.params[0].data.cpu().numpy()[0], + lam=module.params[0].data.cpu().numpy()[0], + ), + module.wires, + [], + ) # circ.u2(*list(module.params[0].data.cpu().numpy()), *module.wires) elif module.name == "U3": circ.u3(*list(module.params[0].data.cpu().numpy()), *module.wires) @@ -579,7 +595,7 @@ def tq2qiskit( if n_removed_ops > 0: logger.warning( - f"Remove {n_removed_ops} operations with small " f"parameter magnitude." + f"Remove {n_removed_ops} operations with small parameter magnitude." ) return circ @@ -665,11 +681,9 @@ def op_history2qiskit_expand_params(n_wires, op_history, bsz): param = op["params"][i] else: param = None - - append_fixed_gate( - circ, op["name"], param, op["wires"], op["inverse"] - ) - + + append_fixed_gate(circ, op["name"], param, op["wires"], op["inverse"]) + circs_all.append(circ) return circs_all @@ -681,10 +695,10 @@ def qiskit2tq_Operator(circ: QuantumCircuit): if getattr(circ, "_layout", None) is not None: try: p2v_orig = circ._layout.final_layout.get_physical_bits().copy() - except: + except AttributeError: try: p2v_orig = circ._layout.get_physical_bits().copy() - except: + except AttributeError: p2v_orig = circ._layout.initial_layout.get_physical_bits().copy() p2v = {} for p, v in p2v_orig.items(): @@ -700,7 +714,7 @@ def qiskit2tq_Operator(circ: QuantumCircuit): ops = [] for gate in circ.data: op_name = gate[0].name - wires = list(map(lambda x: x.index, gate[1])) + wires = [circ.find_bit(qb).index for qb in gate.qubits] wires = [p2v[wire] for wire in wires] # sometimes the gate.params is ParameterExpression class init_params = ( @@ -762,7 +776,7 @@ def qiskit2tq_Operator(circ: QuantumCircuit): raise NotImplementedError( f"{op_name} conversion to tq is currently not supported." ) - + return ops @@ -789,11 +803,11 @@ def test_qiskit2tq(): circ.sx(3) circ.crx(theta=0.4, control_qubit=0, target_qubit=1) - circ.cnot(control_qubit=2, target_qubit=1) + circ.cx(control_qubit=2, target_qubit=1) circ.u3(theta=-0.1, phi=-0.2, lam=-0.4, qubit=3) - circ.cnot(control_qubit=3, target_qubit=0) - circ.cnot(control_qubit=0, target_qubit=2) + circ.cx(control_qubit=3, target_qubit=0) + circ.cx(control_qubit=0, target_qubit=2) circ.x(2) circ.x(3) circ.u2(phi=-0.2, lam=-0.9, qubit=3) @@ -801,8 +815,10 @@ def test_qiskit2tq(): m = qiskit2tq(circ) - simulator = Aer.get_backend("unitary_simulator") - result = execute(circ, simulator).result() + backend = AerSimulator(method="unitary") + circ = transpile(circ, backend) + circ.save_unitary() + result = backend.run(circ).result() unitary_qiskit = result.get_unitary(circ) unitary_tq = m.get_unitary(q_dev) @@ -966,8 +982,10 @@ def test_tq2qiskit(): circuit = tq2qiskit(test_module, inputs) - simulator = Aer.get_backend("unitary_simulator") - result = execute(circuit, simulator).result() + backend = AerSimulator(method="unitary") + circuit = transpile(circuit, backend) + circuit.save_unitary() + result = backend.run(circuit).result() unitary_qiskit = result.get_unitary(circuit) unitary_tq = test_module.get_unitary(q_dev, inputs) @@ -994,8 +1012,10 @@ def test_tq2qiskit_parameterized(): for k, x in enumerate(inputs[0]): binds[params[k]] = x.item() - simulator = Aer.get_backend("unitary_simulator") - result = execute(circuit, simulator, parameter_binds=[binds]).result() + backend = AerSimulator(method="unitary") + circuit = transpile(circuit, backend) + circuit.save_unitary() + result = backend.run(circuit, parameter_binds=[binds]).result() unitary_qiskit = result.get_unitary(circuit) # print(unitary_qiskit) diff --git a/torchquantum/plugin/qiskit/qiskit_processor.py b/torchquantum/plugin/qiskit/qiskit_processor.py index 2d91e7c3..1a484d7b 100644 --- a/torchquantum/plugin/qiskit/qiskit_processor.py +++ b/torchquantum/plugin/qiskit/qiskit_processor.py @@ -22,34 +22,29 @@ SOFTWARE. """ -import torch -import torchquantum as tq -import pathos.multiprocessing as multiprocessing +import datetime import itertools -from qiskit import Aer, execute, IBMQ, transpile, QuantumCircuit -from qiskit.providers.aer.noise import NoiseModel -from qiskit.tools.monitor import job_monitor +import numpy as np +import pathos.multiprocessing as multiprocessing +import torch +from qiskit import QuantumCircuit, transpile from qiskit.exceptions import QiskitError -from .qiskit_plugin import ( - tq2qiskit, - tq2qiskit_parameterized, - tq2qiskit_measurement, -) +from qiskit.transpiler import PassManager +from qiskit_aer import AerSimulator +from qiskit_aer.noise import NoiseModel +from torchpack.utils.logging import logger +from tqdm import tqdm + +import torchquantum as tq from torchquantum.util import ( + get_circ_stats, get_expectations_from_counts, - get_provider, get_provider_hub_group_project, - get_circ_stats, ) -from .qiskit_macros import IBMQ_NAMES -from tqdm import tqdm -from torchpack.utils.logging import logger -from qiskit.transpiler import PassManager -import numpy as np -import datetime -from .my_job_monitor import my_job_monitor +from .qiskit_macros import IBMQ_NAMES +from .qiskit_plugin import tq2qiskit, tq2qiskit_measurement, tq2qiskit_parameterized class EmptyPassManager(PassManager): @@ -60,13 +55,10 @@ def run(self, circuits, output_name: str = None, callback=None): def run_job_worker(data): while True: try: - job = execute(**(data[0])) - qiskit_verbose = data[1] - if qiskit_verbose: - job_monitor(job, interval=1) + job = AerSimulator(**(data)) result = job.result() counts = result.get_counts() - # circ_num = len(data[0]['parameter_binds']) + # circ_num = len(data['parameter_binds']) # logger.info( # f'run job worker successful, circuit number = {circ_num}') break @@ -191,7 +183,6 @@ def qiskit_init(self): if self.backend is None: # initialize now - IBMQ.load_account() self.provider = get_provider_hub_group_project( hub=self.hub, group=self.group, @@ -203,9 +194,7 @@ def qiskit_init(self): self.coupling_map = self.get_coupling_map(self.backend_name) else: # use simulator - self.backend = Aer.get_backend( - "qasm_simulator", max_parallel_experiments=0 - ) + self.backend = AerSimulator(max_parallel_experiments=0) self.noise_model = self.get_noise_model(self.noise_model_name) self.coupling_map = self.get_coupling_map(self.coupling_map_name) self.basis_gates = self.get_basis_gates(self.basis_gates_name) @@ -320,7 +309,6 @@ def process_parameterized( for i in range(0, len(binds_all), chunk_size) ] - qiskit_verbose = self.max_jobs <= 6 feed_dicts = [] for split_bind in split_binds: feed_dict = { @@ -332,7 +320,7 @@ def process_parameterized( "noise_model": self.noise_model, "parameter_binds": split_bind, } - feed_dicts.append([feed_dict, qiskit_verbose]) + feed_dicts.append(feed_dict) p = multiprocessing.Pool(self.max_jobs) results = p.map(run_job_worker, feed_dicts) @@ -345,16 +333,14 @@ def process_parameterized( results[-1] = [results[-1]] counts = list(itertools.chain(*results)) else: - job = execute( - experiments=transpiled_circ, - backend=self.backend, + job = self.backend.run( + transpiled_circ, pass_manager=self.empty_pass_manager, shots=self.n_shots, seed_simulator=self.seed_simulator, noise_model=self.noise_model, parameter_binds=binds_all, ) - job_monitor(job, interval=1) result = job.result() counts = result.get_counts() @@ -497,7 +483,6 @@ def process_parameterized_and_shift( for i in range(0, len(binds_all), chunk_size) ] - qiskit_verbose = self.max_jobs <= 6 feed_dicts = [] for split_bind in split_binds: feed_dict = { @@ -509,7 +494,7 @@ def process_parameterized_and_shift( "noise_model": self.noise_model, "parameter_binds": split_bind, } - feed_dicts.append([feed_dict, qiskit_verbose]) + feed_dicts.append(feed_dict) p = multiprocessing.Pool(self.max_jobs) results = p.map(run_job_worker, feed_dicts) @@ -533,16 +518,15 @@ def process_parameterized_and_shift( for circ in split_circs: while True: try: - job = execute( - experiments=circ, - backend=self.backend, + job = self.backend.run( + circ, pass_manager=self.empty_pass_manager, shots=self.n_shots, seed_simulator=self.seed_simulator, noise_model=self.noise_model, parameter_binds=binds_all, ) - job_monitor(job, interval=1) + result = ( job.result() ) # qiskit.providers.ibmq.job.exceptions.IBMQJobFailureError:Job has failed. Use the error_message() method to get more details @@ -555,7 +539,7 @@ def process_parameterized_and_shift( # total_cont += 1 # print(total_time_spent / total_cont) break - except (QiskitError) as e: + except QiskitError as e: logger.warning("Job failed, rerun now.") print(e.message) @@ -613,7 +597,6 @@ def process_multi_measure( circ_all[i : i + chunk_size] for i in range(0, len(circ_all), chunk_size) ] - qiskit_verbose = self.max_jobs <= 2 feed_dicts = [] for split_circ in split_circs: feed_dict = { @@ -624,7 +607,7 @@ def process_multi_measure( "seed_simulator": self.seed_simulator, "noise_model": self.noise_model, } - feed_dicts.append([feed_dict, qiskit_verbose]) + feed_dicts.append(feed_dict) p = multiprocessing.Pool(self.max_jobs) results = p.map(run_job_worker, feed_dicts) @@ -661,9 +644,8 @@ def process( transpiled_circs = self.transpile(circs) self.transpiled_circs = transpiled_circs - job = execute( - experiments=transpiled_circs, - backend=self.backend, + job = self.backend.run( + transpiled_circs, shots=self.n_shots, # initial_layout=self.initial_layout, seed_transpiler=self.seed_transpiler, @@ -673,7 +655,6 @@ def process( noise_model=self.noise_model, optimization_level=self.optimization_level, ) - job_monitor(job, interval=1) result = job.result() counts = result.get_counts() @@ -704,7 +685,6 @@ def process_ready_circs_get_counts(self, circs_all, parallel=True): for i in range(0, len(circs_all), chunk_size) ] - qiskit_verbose = self.max_jobs <= 6 feed_dicts = [] for split_circ in split_circs: feed_dict = { @@ -715,7 +695,7 @@ def process_ready_circs_get_counts(self, circs_all, parallel=True): "seed_simulator": self.seed_simulator, "noise_model": self.noise_model, } - feed_dicts.append([feed_dict, qiskit_verbose]) + feed_dicts.append(feed_dict) p = multiprocessing.Pool(self.max_jobs) results = p.map(run_job_worker, feed_dicts) @@ -728,15 +708,13 @@ def process_ready_circs_get_counts(self, circs_all, parallel=True): results[-1] = [results[-1]] counts = list(itertools.chain(*results)) else: - job = execute( - experiments=circs_all, - backend=self.backend, + job = self.backend.run( + circs_all, pass_manager=self.empty_pass_manager, shots=self.n_shots, seed_simulator=self.seed_simulator, noise_model=self.noise_model, ) - job_monitor(job, interval=1) result = job.result() counts = [result.get_counts()] @@ -758,9 +736,9 @@ def process_circs_get_joint_expval(self, circs_all, observable, parallel=True): for circ_ in circs_all: circ = circ_.copy() for k, obs in enumerate(observable): - if obs == 'X': + if obs == "X": circ.h(k) - elif obs == 'Y': + elif obs == "Y": circ.z(k) circ.s(k) circ.h(k) @@ -771,8 +749,10 @@ def process_circs_get_joint_expval(self, circs_all, observable, parallel=True): mask = np.ones(len(observable), dtype=bool) mask[np.array([*observable]) == "I"] = False - - counts = self.process_ready_circs_get_counts(circs_all_diagonalized, parallel=parallel) + + counts = self.process_ready_circs_get_counts( + circs_all_diagonalized, parallel=parallel + ) # here we need to switch the little and big endian of distribution bitstrings distributions = [] @@ -786,19 +766,25 @@ def process_circs_get_joint_expval(self, circs_all, observable, parallel=True): n_eigen_one = 0 n_eigen_minus_one = 0 for bitstring, n_count in distri.items(): - if np.dot(list(map(lambda x: eval(x), [*bitstring])), mask).sum() % 2 == 0: + if ( + np.dot(list(map(lambda x: eval(x), [*bitstring])), mask).sum() % 2 + == 0 + ): n_eigen_one += n_count else: n_eigen_minus_one += n_count - - expval = n_eigen_one / self.n_shots + (-1) * n_eigen_minus_one / self.n_shots + + expval = ( + n_eigen_one / self.n_shots + (-1) * n_eigen_minus_one / self.n_shots + ) expval_all.append(expval) return expval_all -if __name__ == '__main__': +if __name__ == "__main__": import pdb + pdb.set_trace() circ = QuantumCircuit(3) circ.h(0) @@ -806,11 +792,9 @@ def process_circs_get_joint_expval(self, circs_all, observable, parallel=True): circ.cx(1, 2) circ.rx(0.1, 0) - qiskit_processor = QiskitProcessor( - use_real_qc=False - ) + qiskit_processor = QiskitProcessor(use_real_qc=False) - qiskit_processor.process_circs_get_joint_expval([circ], 'XII') + qiskit_processor.process_circs_get_joint_expval([circ], "XII") qdev = tq.QuantumDevice(n_wires=3, bsz=1) qdev.h(0) @@ -819,5 +803,5 @@ def process_circs_get_joint_expval(self, circs_all, observable, parallel=True): qdev.rx(0, 0.1) from torchquantum.measurement import expval_joint_sampling - print(expval_joint_sampling(qdev, 'XII', n_shots=8192)) + print(expval_joint_sampling(qdev, "XII", n_shots=8192)) diff --git a/torchquantum/plugin/qiskit/qiskit_pulse.py b/torchquantum/plugin/qiskit/qiskit_pulse.py index b9c78760..ab28774f 100644 --- a/torchquantum/plugin/qiskit/qiskit_pulse.py +++ b/torchquantum/plugin/qiskit/qiskit_pulse.py @@ -22,12 +22,8 @@ SOFTWARE. """ -import torch -import torchquantum as tq -from qiskit import pulse, QuantumCircuit -from qiskit.pulse import library -from qiskit.test.mock import FakeQuito, FakeArmonk, FakeBogota -from qiskit.compiler import assemble, schedule +from qiskit import pulse + from .qiskit_macros import IBMQ_PNAMES diff --git a/torchquantum/plugin/qiskit/qiskit_unitary_gate.py b/torchquantum/plugin/qiskit/qiskit_unitary_gate.py index ce46ff04..b60427dd 100644 --- a/torchquantum/plugin/qiskit/qiskit_unitary_gate.py +++ b/torchquantum/plugin/qiskit/qiskit_unitary_gate.py @@ -15,19 +15,16 @@ """ from collections import OrderedDict -import numpy -from qiskit.circuit import Gate, ControlledGate -from qiskit.circuit import QuantumCircuit -from qiskit.circuit import QuantumRegister, Qubit -from qiskit.circuit.exceptions import CircuitError +import numpy +import qiskit +from qiskit.circuit import ControlledGate, Gate, QuantumCircuit, QuantumRegister, Qubit from qiskit.circuit._utils import _compute_control_matrix +from qiskit.circuit.exceptions import CircuitError from qiskit.circuit.library.standard_gates import U3Gate -from qiskit.quantum_info.operators.predicates import matrix_equal -from qiskit.quantum_info.operators.predicates import is_unitary_matrix -from qiskit.quantum_info import OneQubitEulerDecomposer -from qiskit.quantum_info.synthesis.two_qubit_decompose import two_qubit_cnot_decompose -from qiskit.extensions.exceptions import ExtensionError +from qiskit.quantum_info.operators.predicates import is_unitary_matrix, matrix_equal +from qiskit.synthesis import OneQubitEulerDecomposer +from qiskit.synthesis.two_qubit.two_qubit_decompose import two_qubit_cnot_decompose _DECOMPOSER1Q = OneQubitEulerDecomposer("U3") @@ -58,12 +55,12 @@ def __init__(self, data, label=None): data = numpy.array(data, dtype=complex) # Check input is unitary if not is_unitary_matrix(data, atol=1e-5): - raise ExtensionError("Input matrix is not unitary.") + raise ValueError("Input matrix is not unitary.") # Check input is N-qubit matrix input_dim, output_dim = data.shape num_qubits = int(numpy.log2(input_dim)) if input_dim != output_dim or 2**num_qubits != input_dim: - raise ExtensionError("Input matrix is not an N-qubit operator.") + raise ValueError("Input matrix is not an N-qubit operator.") self._qasm_name = None self._qasm_definition = None @@ -116,7 +113,9 @@ def _define(self): else: q = QuantumRegister(self.num_qubits, "q") qc = QuantumCircuit(q, name=self.name) - qc.append(qiskit.circuit.library.Isometry(self.to_matrix(), 0, 0), qargs=q[:]) + qc.append( + qiskit.circuit.library.Isometry(self.to_matrix(), 0, 0), qargs=q[:] + ) self.definition = qc def control(self, num_ctrl_qubits=1, label=None, ctrl_state=None): @@ -155,7 +154,7 @@ def control(self, num_ctrl_qubits=1, label=None, ctrl_state=None): pmat = Operator(iso.inverse()).data @ cmat diag = numpy.diag(pmat) if not numpy.allclose(diag, diag[0]): - raise ExtensionError("controlled unitary generation failed") + raise ValueError("controlled unitary generation failed") phase = numpy.angle(diag[0]) if phase: # need to apply to _definition since open controls creates temporary definition diff --git a/torchquantum/plugin/qiskit_pulse.py b/torchquantum/plugin/qiskit_pulse.py index 81775b0d..30a4b162 100644 --- a/torchquantum/plugin/qiskit_pulse.py +++ b/torchquantum/plugin/qiskit_pulse.py @@ -1,10 +1,6 @@ -import torch -import torchquantum as tq -from qiskit import pulse, QuantumCircuit -from qiskit.pulse import library -from qiskit.test.mock import FakeQuito, FakeArmonk, FakeBogota -from qiskit.compiler import assemble, schedule -from .qiskit_macros import IBMQ_PNAMES +from qiskit import pulse + +from .qiskit.qiskit_macros import IBMQ_PNAMES def circ2pulse(circuits, name): @@ -24,7 +20,7 @@ def circ2pulse(circuits, name): >>> qc.cx(0, 1) >>> circ2pulse(qc, 'ibmq_oslo') """ - + if name in IBMQ_PNAMES: backend = name() with pulse.build(backend) as pulse_tq: diff --git a/torchquantum/pulse/pulse_utils.py b/torchquantum/pulse/pulse_utils.py index 68c66568..51803ab0 100644 --- a/torchquantum/pulse/pulse_utils.py +++ b/torchquantum/pulse/pulse_utils.py @@ -23,55 +23,30 @@ """ import copy -import sched -import qiskit -import itertools -import numpy as np +from typing import Union -from itertools import repeat -from qiskit.providers import aer -from qiskit.providers.fake_provider import * -from qiskit.circuit import Gate +import numpy as np +import qiskit +from qiskit import QuantumCircuit, pulse from qiskit.compiler import assemble -from qiskit import pulse, QuantumCircuit, IBMQ +from qiskit.providers.fake_provider import * +from qiskit.pulse import Schedule from qiskit.pulse.instructions import Instruction from qiskit.pulse.transforms import block_to_schedule -from qiskit_nature.drivers import UnitsType, Molecule -from scipy.optimize import minimize, LinearConstraint -from qiskit_nature.converters.second_quantization import QubitConverter -from qiskit_nature.properties.second_quantization.electronic import ParticleNumber -from qiskit_nature.problems.second_quantization import ElectronicStructureProblem -from typing import List, Tuple, Iterable, Union, Dict, Callable, Set, Optional, Any -from qiskit.pulse import ( - Schedule, - GaussianSquare, - Drag, - Delay, - Play, - ControlChannel, - DriveChannel, -) -from qiskit_nature.mappers.second_quantization import ParityMapper, JordanWignerMapper -from qiskit_nature.transformers.second_quantization.electronic import ( - ActiveSpaceTransformer, -) -from qiskit_nature.drivers.second_quantization import ( - ElectronicStructureDriverType, - ElectronicStructureMoleculeDriver, -) +from scipy.optimize import LinearConstraint def is_parametric_pulse(t0, *inst: Union["Schedule", Instruction]): """ - Check if the instruction is a parametric pulse. + Check if the instruction is a parametric pulse. - Args: - t0 (tuple): Tuple containing the time and instruction. - inst (tuple): Tuple containing the instruction. + Args: + t0 (tuple): Tuple containing the time and instruction. + inst (tuple): Tuple containing the instruction. - Returns: - bool: True if the instruction is a parametric pulse, False otherwise. - """ + Returns: + bool: True if the instruction is a parametric pulse, False otherwise. + """ inst = t0[1] t0 = t0[0] if isinstance(inst, pulse.Play): @@ -82,14 +57,14 @@ def is_parametric_pulse(t0, *inst: Union["Schedule", Instruction]): def extract_ampreal(pulse_prog): """ - Extract the real part of pulse amplitudes from the pulse program. + Extract the real part of pulse amplitudes from the pulse program. - Args: - pulse_prog (Schedule): The pulse program. + Args: + pulse_prog (Schedule): The pulse program. - Returns: - np.array: Array of real parts of pulse amplitudes. - """ + Returns: + np.array: Array of real parts of pulse amplitudes. + """ # extract the real part of pulse amplitude, igonred the imaginary part. amp_list = list( map( @@ -104,14 +79,14 @@ def extract_ampreal(pulse_prog): def extract_amp(pulse_prog): """ - Extract the pulse amplitudes from the pulse program. + Extract the pulse amplitudes from the pulse program. - Args: - pulse_prog (Schedule): The pulse program. + Args: + pulse_prog (Schedule): The pulse program. - Returns: - np.array: Array of pulse amplitudes. - """ + Returns: + np.array: Array of pulse amplitudes. + """ # extract the pulse amplitdue. amp_list = list( map( @@ -132,15 +107,15 @@ def extract_amp(pulse_prog): def is_phase_pulse(t0, *inst: Union["Schedule", Instruction]): """ - Check if the instruction is a phase pulse. + Check if the instruction is a phase pulse. - Args: - t0 (tuple): Tuple containing the time and instruction. - inst (tuple): Tuple containing the instruction. + Args: + t0 (tuple): Tuple containing the time and instruction. + inst (tuple): Tuple containing the instruction. - Returns: - bool: True if the instruction is a phase pulse, False otherwise. - """ + Returns: + bool: True if the instruction is a phase pulse, False otherwise. + """ inst = t0[1] t0 = t0[0] if isinstance(inst, pulse.ShiftPhase): @@ -150,14 +125,14 @@ def is_phase_pulse(t0, *inst: Union["Schedule", Instruction]): def extract_phase(pulse_prog): """ - Extract the phase values from the pulse program. + Extract the phase values from the pulse program. - Args: - pulse_prog (Schedule): The pulse program. + Args: + pulse_prog (Schedule): The pulse program. - Returns: - list: List of phase values. - """ + Returns: + list: List of phase values. + """ for _, ShiftPhase in pulse_prog.filter(is_phase_pulse).instructions: # print(play.pulse.amp) @@ -175,15 +150,15 @@ def extract_phase(pulse_prog): def cir2pul(circuit, backend): """ - Transform a quantum circuit to a pulse schedule. + Transform a quantum circuit to a pulse schedule. - Args: - circuit (QuantumCircuit): The quantum circuit. - backend: The backend for the pulse schedule. + Args: + circuit (QuantumCircuit): The quantum circuit. + backend: The backend for the pulse schedule. - Returns: - Schedule: The pulse schedule. - """ + Returns: + Schedule: The pulse schedule. + """ # transform quantum circuit to pulse schedule with pulse.build(backend) as pulse_prog: pulse.call(circuit) @@ -192,15 +167,15 @@ def cir2pul(circuit, backend): def snp(qubit, backend): """ - Create a Schedule for the simultaneous z measurement of a qubit and a control qubit. + Create a Schedule for the simultaneous z measurement of a qubit and a control qubit. - Args: - qubit (int): The target qubit. - backend: The backend for the pulse schedule. + Args: + qubit (int): The target qubit. + backend: The backend for the pulse schedule. - Returns: - Schedule: The pulse schedule for simultaneous z measurement. - """ + Returns: + Schedule: The pulse schedule for simultaneous z measurement. + """ circuit = QuantumCircuit(qubit + 1) circuit.h(qubit) sched = cir2pul(circuit, backend) @@ -210,16 +185,16 @@ def snp(qubit, backend): def tnp(qubit, cqubit, backend): """ - Create a Schedule for the simultaneous controlled-x measurement of a qubit and a control qubit. + Create a Schedule for the simultaneous controlled-x measurement of a qubit and a control qubit. - Args: - qubit (int): The target qubit. - cqubit (int): The control qubit. - backend: The backend for the pulse schedule. + Args: + qubit (int): The target qubit. + cqubit (int): The control qubit. + backend: The backend for the pulse schedule. - Returns: - Schedule: The pulse schedule for simultaneous controlled-x measurement. - """ + Returns: + Schedule: The pulse schedule for simultaneous controlled-x measurement. + """ circuit = QuantumCircuit(cqubit + 1) circuit.cx(qubit, cqubit) sched = cir2pul(circuit, backend) @@ -229,30 +204,30 @@ def tnp(qubit, cqubit, backend): def pul_append(sched1, sched2): """ - Append two pulse schedules. + Append two pulse schedules. - Args: - sched1 (Schedule): The first pulse schedule. - sched2 (Schedule): The second pulse schedule. + Args: + sched1 (Schedule): The first pulse schedule. + sched2 (Schedule): The second pulse schedule. - Returns: - Schedule: The combined pulse schedule. - """ + Returns: + Schedule: The combined pulse schedule. + """ sched = sched1.append(sched2) return sched def map_amp(pulse_ansatz, modified_list): """ - Map modified pulse amplitudes to the pulse ansatz. + Map modified pulse amplitudes to the pulse ansatz. - Args: - pulse_ansatz (Schedule): The pulse ansatz. - modified_list (list): List of modified pulse amplitudes. + Args: + pulse_ansatz (Schedule): The pulse ansatz. + modified_list (list): List of modified pulse amplitudes. - Returns: - Schedule: The pulse schedule with modified amplitudes. - """ + Returns: + Schedule: The pulse schedule with modified amplitudes. + """ sched = Schedule() for inst, amp in zip( pulse_ansatz.filter(is_parametric_pulse).instructions, modified_list @@ -274,18 +249,18 @@ def get_from(d: dict, key: str): def run_pulse_sim(measurement_pulse): """ - Run pulse simulations for the given measurement pulses. + Run pulse simulations for the given measurement pulses. - Args: - measurement_pulse (list): List of measurement pulses. + Args: + measurement_pulse (list): List of measurement pulses. - Returns: - list: List of measurement results. - """ + Returns: + list: List of measurement results. + """ measure_result = [] for measure_pulse in measurement_pulse: shots = 1024 - pulse_sim = qiskit.providers.aer.PulseSimulator.from_backend(FakeJakarta()) + pulse_sim = qiskit_aer.PulseSimulator.from_backend(FakeJakarta()) pul_sim = assemble( measure_pulse, backend=pulse_sim, @@ -306,14 +281,14 @@ def run_pulse_sim(measurement_pulse): def gen_LC(parameters_array): """ - Generate linear constraints for the optimization. + Generate linear constraints for the optimization. - Args: - parameters_array (np.array): Array of parameters. + Args: + parameters_array (np.array): Array of parameters. - Returns: - LinearConstraint: Linear constraint for the optimization. - """ + Returns: + LinearConstraint: Linear constraint for the optimization. + """ dim_design = int(len(parameters_array)) Mid = int(len(parameters_array) / 2) bound = np.ones((dim_design, 2)) * np.array([0, 0.9]) @@ -327,15 +302,15 @@ def gen_LC(parameters_array): def observe_genearte(pulse_ansatz, backend): """ - Generate measurement pulses for observing the pulse ansatz. + Generate measurement pulses for observing the pulse ansatz. - Args: - pulse_ansatz (Schedule): The pulse ansatz. - backend: The backend for the pulse schedule. + Args: + pulse_ansatz (Schedule): The pulse ansatz. + backend: The backend for the pulse schedule. - Returns: - list: List of measurement pulses. - """ + Returns: + list: List of measurement pulses. + """ qubits = 0, 1 with pulse.build(backend) as pulse_measurez0: # z measurement of qubit 0 and 1 diff --git a/torchquantum/pulse/templates/pulse_utils.py b/torchquantum/pulse/templates/pulse_utils.py index bad2a9b5..30d4c2f7 100644 --- a/torchquantum/pulse/templates/pulse_utils.py +++ b/torchquantum/pulse/templates/pulse_utils.py @@ -1,40 +1,15 @@ import copy -import sched -import qiskit -import itertools -import numpy as np +from typing import Union -from itertools import repeat -from qiskit.providers import aer -from qiskit.providers.fake_provider import * -from qiskit.circuit import Gate +import numpy as np +import qiskit +from qiskit import QuantumCircuit, pulse from qiskit.compiler import assemble -from qiskit import pulse, QuantumCircuit, IBMQ +from qiskit.providers.fake_provider import * +from qiskit.pulse import Schedule from qiskit.pulse.instructions import Instruction from qiskit.pulse.transforms import block_to_schedule -from qiskit_nature.drivers import UnitsType, Molecule -from scipy.optimize import minimize, LinearConstraint -from qiskit_nature.converters.second_quantization import QubitConverter -from qiskit_nature.properties.second_quantization.electronic import ParticleNumber -from qiskit_nature.problems.second_quantization import ElectronicStructureProblem -from typing import List, Tuple, Iterable, Union, Dict, Callable, Set, Optional, Any -from qiskit.pulse import ( - Schedule, - GaussianSquare, - Drag, - Delay, - Play, - ControlChannel, - DriveChannel, -) -from qiskit_nature.mappers.second_quantization import ParityMapper, JordanWignerMapper -from qiskit_nature.transformers.second_quantization.electronic import ( - ActiveSpaceTransformer, -) -from qiskit_nature.drivers.second_quantization import ( - ElectronicStructureDriverType, - ElectronicStructureMoleculeDriver, -) +from scipy.optimize import LinearConstraint def is_parametric_pulse(t0, *inst: Union["Schedule", Instruction]): @@ -154,7 +129,7 @@ def run_pulse_sim(measurement_pulse): measure_result = [] for measure_pulse in measurement_pulse: shots = 1024 - pulse_sim = qiskit.providers.aer.PulseSimulator.from_backend(FakeJakarta()) + pulse_sim = qiskit_aer.PulseSimulator.from_backend(FakeJakarta()) pul_sim = assemble( measure_pulse, backend=pulse_sim, diff --git a/torchquantum/util/utils.py b/torchquantum/util/utils.py index db191647..23f67e5b 100644 --- a/torchquantum/util/utils.py +++ b/torchquantum/util/utils.py @@ -22,15 +22,16 @@ SOFTWARE. """ +from __future__ import annotations + import copy -from typing import Dict, Iterable, List, TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from opt_einsum import contract -from qiskit_ibm_runtime import QiskitRuntimeService from qiskit.exceptions import QiskitError from torchpack.utils.config import Config from torchpack.utils.logging import logger @@ -38,10 +39,9 @@ import torchquantum as tq from torchquantum.macro import C_DTYPE - if TYPE_CHECKING: - from torchquantum.module import QuantumModule from torchquantum.device import QuantumDevice + from torchquantum.module import QuantumModule else: QuantumModule = None QuantumDevice = None @@ -97,14 +97,14 @@ def pauli_eigs(n) -> np.ndarray: def diag(x): """ - Compute the diagonal matrix from a given input tensor. + Compute the diagonal matrix from a given input tensor. - Args: - x (torch.Tensor): Input tensor. + Args: + x (torch.Tensor): Input tensor. - Returns: - torch.Tensor: Diagonal matrix with the diagonal elements from the input tensor. - """ + Returns: + torch.Tensor: Diagonal matrix with the diagonal elements from the input tensor. + """ # input tensor, output tensor with diagonal as the input # manual implementation because torch.diag does not support autograd of # complex number @@ -119,20 +119,21 @@ def diag(x): class Timer(object): """ - Timer class to measure the execution time of a code block. + Timer class to measure the execution time of a code block. - Args: - device (str): Device to use for timing. Can be "gpu" or "cpu". - name (str): Name of the task being measured. - times (int): Number of times the task will be executed. + Args: + device (str): Device to use for timing. Can be "gpu" or "cpu". + name (str): Name of the task being measured. + times (int): Number of times the task will be executed. - Example: - # Measure the execution time of a code block on the GPU - with Timer(device="gpu", name="MyTask", times=100): - # Code block to be measured - ... + Example: + # Measure the execution time of a code block on the GPU + with Timer(device="gpu", name="MyTask", times=100): + # Code block to be measured + ... + + """ - """ def __init__(self, device="gpu", name="", times=100): self.device = device self.name = name @@ -157,20 +158,20 @@ def __exit__(self, exc_type, exc_value, tb): def get_unitary_loss(model: nn.Module): """ - Calculate the unitary loss of a model. + Calculate the unitary loss of a model. - The unitary loss measures the deviation of the trainable unitary matrices - in the model from the identity matrix. + The unitary loss measures the deviation of the trainable unitary matrices + in the model from the identity matrix. - Args: - model (nn.Module): The model containing trainable unitary matrices. + Args: + model (nn.Module): The model containing trainable unitary matrices. - Returns: - torch.Tensor: The unitary loss. + Returns: + torch.Tensor: The unitary loss. - Example: - loss = get_unitary_loss(model) - """ + Example: + loss = get_unitary_loss(model) + """ loss = 0 for name, params in model.named_parameters(): if "TrainableUnitary" in name: @@ -186,21 +187,21 @@ def get_unitary_loss(model: nn.Module): def legalize_unitary(model: nn.Module): """ - Legalize the unitary matrices in the model. + Legalize the unitary matrices in the model. - The function modifies the trainable unitary matrices in the model by applying - singular value decomposition (SVD) and reassembling the matrices using the - reconstructed singular values. + The function modifies the trainable unitary matrices in the model by applying + singular value decomposition (SVD) and reassembling the matrices using the + reconstructed singular values. - Args: - model (nn.Module): The model containing trainable unitary matrices. + Args: + model (nn.Module): The model containing trainable unitary matrices. - Returns: - None + Returns: + None - Example: - legalize_unitary(model) - """ + Example: + legalize_unitary(model) + """ with torch.no_grad(): for name, params in model.named_parameters(): if "TrainableUnitary" in name: @@ -211,22 +212,22 @@ def legalize_unitary(model: nn.Module): def switch_little_big_endian_matrix(mat): """ - Switches the little-endian and big-endian order of a multi-dimensional matrix. + Switches the little-endian and big-endian order of a multi-dimensional matrix. - The function reshapes the input matrix to a 2D or multi-dimensional matrix with dimensions - that are powers of 2. It then switches the order of the dimensions, effectively changing - the little-endian order to big-endian, or vice versa. The function can handle both - batched and non-batched matrices. + The function reshapes the input matrix to a 2D or multi-dimensional matrix with dimensions + that are powers of 2. It then switches the order of the dimensions, effectively changing + the little-endian order to big-endian, or vice versa. The function can handle both + batched and non-batched matrices. - Args: - mat (numpy.ndarray): The input matrix. + Args: + mat (numpy.ndarray): The input matrix. - Returns: - numpy.ndarray: The matrix with the switched endian order. + Returns: + numpy.ndarray: The matrix with the switched endian order. - Example: - switched_mat = switch_little_big_endian_matrix(mat) - """ + Example: + switched_mat = switch_little_big_endian_matrix(mat) + """ if len(mat.shape) % 2 == 1: is_batch_matrix = True bsz = mat.shape[0] @@ -250,25 +251,25 @@ def switch_little_big_endian_matrix(mat): def switch_little_big_endian_state(state): """ - Switches the little-endian and big-endian order of a quantum state vector. + Switches the little-endian and big-endian order of a quantum state vector. - The function reshapes the input state vector to a 1D or multi-dimensional state vector with - dimensions that are powers of 2. It then switches the order of the dimensions, effectively - changing the little-endian order to big-endian, or vice versa. The function can handle both - batched and non-batched state vectors. + The function reshapes the input state vector to a 1D or multi-dimensional state vector with + dimensions that are powers of 2. It then switches the order of the dimensions, effectively + changing the little-endian order to big-endian, or vice versa. The function can handle both + batched and non-batched state vectors. - Args: - state (numpy.ndarray): The input state vector. + Args: + state (numpy.ndarray): The input state vector. - Returns: - numpy.ndarray: The state vector with the switched endian order. + Returns: + numpy.ndarray: The state vector with the switched endian order. - Raises: - ValueError: If the dimension of the state vector is not 1 or 2. + Raises: + ValueError: If the dimension of the state vector is not 1 or 2. - Example: - switched_state = switch_little_big_endian_state(state) - """ + Example: + switched_state = switch_little_big_endian_state(state) + """ if len(state.shape) > 1: is_batch_state = True @@ -278,7 +279,7 @@ def switch_little_big_endian_state(state): is_batch_state = False reshape = [2] * int(np.log2(state.size)) else: - logger.exception(f"Dimension of statevector should be 1 or 2") + logger.exception("Dimension of statevector should be 1 or 2") raise ValueError original_shape = state.shape @@ -309,25 +310,25 @@ def switch_little_big_endian_state_test(): def get_expectations_from_counts(counts, n_wires): """ - Calculate expectation values from counts. + Calculate expectation values from counts. - This function takes a counts dictionary or a list of counts dictionaries - and calculates the expectation values based on the probability of measuring - the state '1' on each wire. The expectation values are computed as the - flipped difference between the probability of measuring '1' and the probability - of measuring '0' on each wire. + This function takes a counts dictionary or a list of counts dictionaries + and calculates the expectation values based on the probability of measuring + the state '1' on each wire. The expectation values are computed as the + flipped difference between the probability of measuring '1' and the probability + of measuring '0' on each wire. - Args: - counts (dict or list[dict]): The counts dictionary or a list of counts dictionaries. - n_wires (int): The number of wires. + Args: + counts (dict or list[dict]): The counts dictionary or a list of counts dictionaries. + n_wires (int): The number of wires. - Returns: - numpy.ndarray: The expectation values. + Returns: + numpy.ndarray: The expectation values. - Example: - counts = {'000': 10, '100': 5, '010': 15} - expectations = get_expectations_from_counts(counts, 3) - """ + Example: + counts = {'000': 10, '100': 5, '010': 15} + expectations = get_expectations_from_counts(counts, 3) + """ exps = [] if isinstance(counts, dict): counts = [counts] @@ -348,29 +349,33 @@ def get_expectations_from_counts(counts, n_wires): def find_global_phase(mat1, mat2, threshold): """ - Find a numerical stable global phase between two matrices. - - This function compares the elements of two matrices `mat1` and `mat2` - and identifies a numerical stable global phase by finding the first - non-zero element pair with absolute values greater than the specified - threshold. The global phase is calculated as the ratio of the corresponding - elements in `mat2` and `mat1`. - - Args: - mat1 (numpy.ndarray): The first matrix. - mat2 (numpy.ndarray): The second matrix. - threshold (float): The threshold for identifying non-zero elements. - - Returns: - float or None: The global phase ratio if a numerical stable phase is found, - None otherwise. - - Example: - mat1 = np.array([[1+2j, 0+1j], [0-1j, 2+3j]]) - mat2 = np.array([[2+4j, 0+2j], [0-2j, 4+6j]]) - threshold = 0.5 - global_phase = find_global_phase(mat1, mat2, threshold) - """ + Find a numerical stable global phase between two matrices. + + This function compares the elements of two matrices `mat1` and `mat2` + and identifies a numerical stable global phase by finding the first + non-zero element pair with absolute values greater than the specified + threshold. The global phase is calculated as the ratio of the corresponding + elements in `mat2` and `mat1`. + + Args: + mat1 (numpy.ndarray): The first matrix. + mat2 (numpy.ndarray): The second matrix. + threshold (float): The threshold for identifying non-zero elements. + + Returns: + float or None: The global phase ratio if a numerical stable phase is found, + None otherwise. + + Example: + mat1 = np.array([[1+2j, 0+1j], [0-1j, 2+3j]]) + mat2 = np.array([[2+4j, 0+2j], [0-2j, 4+6j]]) + threshold = 0.5 + global_phase = find_global_phase(mat1, mat2, threshold) + """ + if not isinstance(mat1, np.ndarray): + mat1 = np.asarray(mat1) + if not isinstance(mat2, np.ndarray): + mat2 = np.asarray(mat2) for i in range(mat1.shape[0]): for j in range(mat1.shape[1]): # find a numerical stable global phase @@ -379,7 +384,7 @@ def find_global_phase(mat1, mat2, threshold): return None -def build_module_op_list(m: QuantumModule, x=None) -> List: +def build_module_op_list(m: QuantumModule, x=None) -> list: """ serialize all operations in the module and generate a list with [{'name': RX, 'has_params': True, 'trainable': True, 'wires': [0], @@ -434,39 +439,39 @@ def build_module_op_list(m: QuantumModule, x=None) -> List: def build_module_from_op_list( - op_list: List[Dict], remove_ops=False, thres=None + op_list: list[dict], remove_ops=False, thres=None ) -> QuantumModule: """ - Build a quantum module from an operation list. - - This function takes an operation list, which contains dictionaries representing - quantum operations, and constructs a quantum module from those operations. - The module can optionally remove operations based on certain criteria, such as - low parameter values. The removed operations can be counted and logged. - - Args: - op_list (List[Dict]): The operation list, where each dictionary represents - an operation with keys: "name", "has_params", "trainable", "wires", - "n_wires", and "params". - remove_ops (bool): Whether to remove operations based on certain criteria. - Defaults to False. - thres (float): The threshold for removing operations. If a parameter value - is smaller in absolute value than this threshold, the corresponding - operation is removed. Defaults to None, in which case a threshold of - 1e-5 is used. - - Returns: - QuantumModule: The constructed quantum module. - - Example: - op_list = [ - {"name": "RX", "has_params": True, "trainable": True, "wires": [0], "n_wires": 2, "params": [0.5]}, - {"name": "CNOT", "has_params": False, "trainable": False, "wires": [0, 1], "n_wires": 2, "params": None}, - {"name": "RY", "has_params": True, "trainable": True, "wires": [1], "n_wires": 2, "params": [1.2]}, - ] - module = build_module_from_op_list(op_list, remove_ops=True, thres=0.1) - """ - logger.info(f"Building module from op_list...") + Build a quantum module from an operation list. + + This function takes an operation list, which contains dictionaries representing + quantum operations, and constructs a quantum module from those operations. + The module can optionally remove operations based on certain criteria, such as + low parameter values. The removed operations can be counted and logged. + + Args: + op_list (List[Dict]): The operation list, where each dictionary represents + an operation with keys: "name", "has_params", "trainable", "wires", + "n_wires", and "params". + remove_ops (bool): Whether to remove operations based on certain criteria. + Defaults to False. + thres (float): The threshold for removing operations. If a parameter value + is smaller in absolute value than this threshold, the corresponding + operation is removed. Defaults to None, in which case a threshold of + 1e-5 is used. + + Returns: + QuantumModule: The constructed quantum module. + + Example: + op_list = [ + {"name": "RX", "has_params": True, "trainable": True, "wires": [0], "n_wires": 2, "params": [0.5]}, + {"name": "CNOT", "has_params": False, "trainable": False, "wires": [0, 1], "n_wires": 2, "params": None}, + {"name": "RY", "has_params": True, "trainable": True, "wires": [1], "n_wires": 2, "params": [1.2]}, + ] + module = build_module_from_op_list(op_list, remove_ops=True, thres=0.1) + """ + logger.info("Building module from op_list...") thres = 1e-5 if thres is None else thres n_removed_ops = 0 ops = [] @@ -498,38 +503,38 @@ def build_module_from_op_list( if n_removed_ops > 0: logger.warning(f"Remove in total {n_removed_ops} pruned operations.") else: - logger.info(f"Do not remove any operations.") + logger.info("Do not remove any operations.") return tq.QuantumModuleFromOps(ops) def build_module_description_test(): """ - Test function for building module descriptions. - - This function demonstrates the usage of `build_module_op_list` and `build_module_from_op_list` - functions to build module descriptions and create quantum modules from those descriptions. - - Example: - import pdb - from torchquantum.plugins import tq2qiskit - from examples.core.models.q_models import QFCModel12 - - pdb.set_trace() - q_model = QFCModel12({"n_blocks": 4}) - desc = build_module_op_list(q_model.q_layer) - print(desc) - q_dev = tq.QuantumDevice(n_wires=4) - m = build_module_from_op_list(desc) - tq2qiskit(q_dev, m, draw=True) - - desc = build_module_op_list( - tq.RandomLayerAllTypes(n_ops=200, wires=[0, 1, 2, 3], qiskit_compatible=True) - ) - print(desc) - m1 = build_module_from_op_list(desc) - tq2qiskit(q_dev, m1, draw=True) - """ + Test function for building module descriptions. + + This function demonstrates the usage of `build_module_op_list` and `build_module_from_op_list` + functions to build module descriptions and create quantum modules from those descriptions. + + Example: + import pdb + from torchquantum.plugins import tq2qiskit + from examples.core.models.q_models import QFCModel12 + + pdb.set_trace() + q_model = QFCModel12({"n_blocks": 4}) + desc = build_module_op_list(q_model.q_layer) + print(desc) + q_dev = tq.QuantumDevice(n_wires=4) + m = build_module_from_op_list(desc) + tq2qiskit(q_dev, m, draw=True) + + desc = build_module_op_list( + tq.RandomLayerAllTypes(n_ops=200, wires=[0, 1, 2, 3], qiskit_compatible=True) + ) + print(desc) + m1 = build_module_from_op_list(desc) + tq2qiskit(q_dev, m1, draw=True) + """ import pdb from torchquantum.plugin import tq2qiskit @@ -559,7 +564,7 @@ def get_p_v_reg_mapping(circ): """ try: p2v_orig = circ._layout.final_layout.get_physical_bits().copy() - except: + except AttributeError: p2v_orig = circ._layout.get_physical_bits().copy() mapping = { "p2v": {}, @@ -600,7 +605,7 @@ def get_v_c_reg_mapping(circ): """ try: p2v_orig = circ._layout.final_layout.get_physical_bits().copy() - except: + except AttributeError: p2v_orig = circ._layout.get_physical_bits().copy() p2v = {} for p, v in p2v_orig.items(): @@ -629,15 +634,15 @@ def get_v_c_reg_mapping(circ): def get_cared_configs(conf, mode) -> Config: """ - Get the relevant configurations based on the mode. + Get the relevant configurations based on the mode. - Args: - conf (Config): The configuration object. - mode (str): The mode indicating the desired configuration. + Args: + conf (Config): The configuration object. + mode (str): The mode indicating the desired configuration. - Returns: - Config: The modified configuration object with only the relevant configurations preserved. - """ + Returns: + Config: The modified configuration object with only the relevant configurations preserved. + """ conf = copy.deepcopy(conf) ignores = [ @@ -716,23 +721,28 @@ def get_success_rate(properties, transpiled_circ): """ raise NotImplementedError + def get_provider(backend_name, hub=None): """ - Get the provider object for a specific backend from IBM Quantum. + Get the provider object for a specific backend from IBM Quantum. - Args: - backend_name (str): Name of the backend. - hub (str): Optional hub name. + Args: + backend_name (str): Name of the backend. + hub (str): Optional hub name. - Returns: - IBMQProvider: The provider object. - """ + Returns: + IBMQProvider: The provider object. + """ # mass-inst-tech-1 or MIT-1 if backend_name in ["ibmq_casablanca", "ibmq_rome", "ibmq_bogota", "ibmq_jakarta"]: if hub == "mass" or hub is None: - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q-research/mass-inst-tech-1/main") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance="ibm-q-research/mass-inst-tech-1/main" + ) elif hub == "mit": - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q-research/MIT-1/main") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance="ibm-q-research/MIT-1/main" + ) else: raise ValueError(f"not supported backend {backend_name} in hub " f"{hub}") elif backend_name in [ @@ -742,38 +752,51 @@ def get_provider(backend_name, hub=None): "ibmq_guadalupe", "ibmq_montreal", ]: - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q-ornl/anl/csc428") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance="ibm-q-ornl/anl/csc428" + ) else: if hub == "mass" or hub is None: try: - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q-research/mass-inst-tech-1/main") + provider = QiskitRuntimeService( + channel="ibm_quantum", + instance="ibm-q-research/mass-inst-tech-1/main", + ) except QiskitError: # logger.warning(f"Cannot use MIT backend, roll back to open") - logger.warning(f"Use the open backend") - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q/open/main") + logger.warning("Use the open backend") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance="ibm-q/open/main" + ) elif hub == "mit": - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q-research/MIT-1/main") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance="ibm-q-research/MIT-1/main" + ) else: - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = "ibm-q/open/main") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance="ibm-q/open/main" + ) return provider def get_provider_hub_group_project(hub="ibm-q", group="open", project="main"): - provider = QiskitRuntimeService(channel = "ibm_quantum", instance = f"{hub}/{group}/{project}") + provider = QiskitRuntimeService( + channel="ibm_quantum", instance=f"{hub}/{group}/{project}" + ) return provider def normalize_statevector(states): """ - Normalize a statevector to ensure the square magnitude of the statevector sums to 1. + Normalize a statevector to ensure the square magnitude of the statevector sums to 1. - Args: - states (torch.Tensor): The statevector tensor. + Args: + states (torch.Tensor): The statevector tensor. - Returns: - torch.Tensor: The normalized statevector tensor. - """ + Returns: + torch.Tensor: The normalized statevector tensor. + """ # make sure the square magnitude of statevector sum to 1 # states = states.contiguous() original_shape = states.shape @@ -804,7 +827,7 @@ def get_circ_stats(circ): for gate in circ.data: op_name = gate[0].name - wires = list(map(lambda x: x.index, gate[1])) + wires = [circ.find_bit(qb).index for qb in gate.qubits] if op_name in n_gates_dict.keys(): n_gates_dict[op_name] += 1 else: @@ -832,7 +855,7 @@ def get_circ_stats(circ): def partial_trace( q_device: QuantumDevice, - keep_indices: List[int], + keep_indices: list[int], ) -> torch.Tensor: """Returns a density matrix with only some qubits kept. Args: @@ -935,22 +958,22 @@ def dm_to_mixture_of_state(dm: torch.Tensor, atol=1e-10): def partial_trace_test(): """ - Test function for performing partial trace on a quantum device. + Test function for performing partial trace on a quantum device. - This function demonstrates how to use the `partial_trace` function from `torchquantum.functional` - to perform partial trace on a quantum device. + This function demonstrates how to use the `partial_trace` function from `torchquantum.functional` + to perform partial trace on a quantum device. - The function applies Hadamard gate on the first qubit and a CNOT gate between the first and second qubits. - Then, it performs partial trace on the first qubit and converts the resulting density matrices into - mixtures of states. + The function applies Hadamard gate on the first qubit and a CNOT gate between the first and second qubits. + Then, it performs partial trace on the first qubit and converts the resulting density matrices into + mixtures of states. - Prints the resulting mixture of states. + Prints the resulting mixture of states. - Note: This function assumes that you have already imported the necessary modules and functions. + Note: This function assumes that you have already imported the necessary modules and functions. - Returns: - None - """ + Returns: + None + """ import torchquantum.functional as tqf n_wires = 4 @@ -965,7 +988,8 @@ def partial_trace_test(): print(mixture) -def pauli_string_to_matrix(pauli: str, device=torch.device('cpu')) -> torch.Tensor: + +def pauli_string_to_matrix(pauli: str, device=torch.device("cpu")) -> torch.Tensor: mat_dict = { "paulix": torch.tensor([[0, 1], [1, 0]], dtype=C_DTYPE), "pauliy": torch.tensor([[0, -1j], [1j, 0]], dtype=C_DTYPE), @@ -986,68 +1010,82 @@ def pauli_string_to_matrix(pauli: str, device=torch.device('cpu')) -> torch.Tens matrix = torch.kron(matrix, pauli_dict[op].to(device)) return matrix + if __name__ == "__main__": build_module_description_test() switch_little_big_endian_matrix_test() switch_little_big_endian_state_test() -def parameter_shift_gradient(model, input_data, expectation_operator, shift_rate=np.pi*0.5, shots=1024): - ''' - This function calculates the gradient of a parametrized circuit using the parameter shift rule to be fed into - a classical optimizer, its formula is given by - gradient for the ith parameter =( expectation_value(the_ith_parameter + shift_rate)-expectation_value(the_ith_parameter - shift_rate) ) *0.5 - Args: +def parameter_shift_gradient( + model, input_data, expectation_operator, shift_rate=np.pi * 0.5, shots=1024 +): + """ + This function calculates the gradient of a parametrized circuit using the parameter shift rule to be fed into + a classical optimizer, its formula is given by + gradient for the ith parameter =( expectation_value(the_ith_parameter + shift_rate)-expectation_value(the_ith_parameter - shift_rate) ) *0.5 + Args: model(tq.QuantumModule): the model that you want to use, which includes the quantum device and the parameters input(torch.tensor): the input data that you are using - expectation_operator(str): the observable that you want to calculate the expectation value of, usually the Z operator + expectation_operator(str): the observable that you want to calculate the expectation value of, usually the Z operator (i.e 'ZZZ' for 3 qubits or 3 wires) shift_rate(float , optional): the rate that you would like to shift the parameter with at every iteration, by default pi*0.5 shots(int , optional): the number of shots to use per parameter ,(for 10 parameters and 1024 shots = 10240 shots in total) by default = 1024. Returns: - torch.tensor : An array of the gradients of all the parameters in the circuit. - ''' + torch.tensor : An array of the gradients of all the parameters in the circuit. + """ par_num = [] - for p in model.parameters():#since the model.parameters() Returns an iterator over module parameters,to get the number of parameter i have to iterate over all of them + for ( + p + ) in ( + model.parameters() + ): # since the model.parameters() Returns an iterator over module parameters,to get the number of parameter i have to iterate over all of them par_num.append(p) gradient_of_par = torch.zeros(len(par_num)) - - def clone_model(model_to_clone):#i have to note:this clone_model function was made with GPT + + def clone_model( + model_to_clone, + ): # i have to note:this clone_model function was made with GPT cloned_model = type(model_to_clone)() # Create a new instance of the same class - cloned_model.load_state_dict(model_to_clone.state_dict()) # Copy the state dictionary + cloned_model.load_state_dict( + model_to_clone.state_dict() + ) # Copy the state dictionary return cloned_model # Clone the models - model_plus_shift = clone_model(model) + model_plus_shift = clone_model(model) model_minus_shift = clone_model(model) - state_dict_plus_shift = model_plus_shift.state_dict() + state_dict_plus_shift = model_plus_shift.state_dict() state_dict_minus_shift = model_minus_shift.state_dict() ##################### for idx, key in enumerate(state_dict_plus_shift): - if idx < 2: # Skip the first two keys because they are not paramters + if idx < 2: # Skip the first two keys because they are not parameters continue - state_dict_plus_shift[key] += shift_rate - state_dict_minus_shift[key] -= shift_rate - - model_plus_shift.load_state_dict(state_dict_plus_shift ) + state_dict_plus_shift[key] += shift_rate + state_dict_minus_shift[key] -= shift_rate + + model_plus_shift.load_state_dict(state_dict_plus_shift) model_minus_shift.load_state_dict(state_dict_minus_shift) - + model_plus_shift.forward(input_data) model_minus_shift.forward(input_data) - - state_dict_plus_shift = model_plus_shift.state_dict() + + state_dict_plus_shift = model_plus_shift.state_dict() state_dict_minus_shift = model_minus_shift.state_dict() - - - - expectation_plus_shift = tq.expval_joint_sampling(model_plus_shift.q_device, observable=expectation_operator, n_shots=shots) - expectation_minus_shift = tq.expval_joint_sampling(model_minus_shift.q_device, observable=expectation_operator, n_shots=shots) + expectation_plus_shift = tq.expval_joint_sampling( + model_plus_shift.q_device, observable=expectation_operator, n_shots=shots + ) + expectation_minus_shift = tq.expval_joint_sampling( + model_minus_shift.q_device, observable=expectation_operator, n_shots=shots + ) + + state_dict_plus_shift[key] -= shift_rate + state_dict_minus_shift[key] += shift_rate - state_dict_plus_shift[key] -= shift_rate - state_dict_minus_shift[key] += shift_rate - - gradient_of_par[idx-2] = (expectation_plus_shift - expectation_minus_shift) * 0.5 + gradient_of_par[idx - 2] = ( + expectation_plus_shift - expectation_minus_shift + ) * 0.5 return gradient_of_par