Source code for qbraid.providers.ibm.device

# Copyright (C) 2023 qBraid
#
# This file is part of the qBraid-SDK
#
# The qBraid-SDK is free software released under the GNU General Public License v3
# or later. You can redistribute and/or modify it under the terms of the GPL v3.
# See the LICENSE file in the project root or <https://www.gnu.org/licenses/gpl-3.0.html>.
#
# THERE IS NO WARRANTY for the qBraid-SDK, as per Section 15 of the GPL v3.

"""
Module defining QiskitBackend Class

"""
import re
from typing import TYPE_CHECKING, List, Union

from qiskit import transpile
from qiskit.transpiler import TranspilerError

from qbraid.programs.libs.qiskit import QiskitCircuit
from qbraid.providers.device import QuantumDevice
from qbraid.providers.enums import DeviceStatus, DeviceType

from .job import QiskitJob

if TYPE_CHECKING:
    import qiskit
    import qiskit_ibm_provider
    import qiskit_ibm_runtime


[docs] class QiskitBackend(QuantumDevice): """Wrapper class for IBM Qiskit ``Backend`` objects."""
[docs] def __init__( self, ibm_device: Union["qiskit_ibm_provider.IBMBackend", "qiskit_ibm_runtime.IBMBackend"] ): """Create a QiskitBackend.""" super().__init__(ibm_device) self._vendor = "IBM" self._run_package = "qiskit"
def _get_device_name(self) -> str: """Get the name of the device.""" backend = self._device if hasattr(backend, "backend_name"): return backend.backend_name if hasattr(backend, "name"): return backend.name() if callable(backend.name) else backend.name match = re.search(r"<\w+\('([^']+)'\)>", str(backend)) return match.group(1) if match else str(backend) def _populate_metadata( self, device: Union["qiskit_ibm_provider.IBMBackend", "qiskit_ibm_runtime.IBMBackend"] ) -> None: """Populate device metadata using IBMBackend object.""" # pylint: disable=attribute-defined-outside-init device_name = self._get_device_name() self._vendor_id = device_name self._name = device_name self._provider = "IBM" lower_device_name = device_name.lower() if lower_device_name.startswith("fake"): self._device_type = DeviceType.FAKE elif "simulator" in lower_device_name or "aer" in lower_device_name: self._device_type = DeviceType.SIMULATOR else: self._device_type = ( DeviceType.SIMULATOR if getattr(device, "simulator", True) else DeviceType.QPU ) try: if self._device_type == DeviceType.FAKE: self._num_qubits = getattr(device, "configuration", lambda: device)().n_qubits else: self._num_qubits = device.num_qubits except (AttributeError, TranspilerError): self._num_qubits = ( 5000 if device.name == "simulator_stabilizer" else 63 if device.name == "simulator_extended_stabilizer" else None ) def _transpile(self, run_input): if self._device_type.name != "FAKE" and self._device.local: program = QiskitCircuit(run_input) program.remove_idle_qubits() run_input = program.program return transpile(run_input, backend=self._device) def _compile(self, run_input): return run_input def status(self): """Return the status of this Device. Returns: str: The status of this Device """ if self._device_type == DeviceType("FAKE"): return DeviceStatus.ONLINE backend_status = self._device.status() if not backend_status.operational or backend_status.status_msg != "active": return DeviceStatus.OFFLINE return DeviceStatus.ONLINE def queue_depth(self) -> int: """Return the number of jobs in the queue for the ibm backend""" if self._device_type == DeviceType("FAKE"): return 0 return self._device.status().pending_jobs def _run( self, run_input: "Union[qiskit.QuantumCircuit, List[qiskit.QuantumCircuit]]", *args, **kwargs, ): """Runs circuit(s) on qiskit backend via :meth:`~qiskit.execute` Uses the :meth:`~qiskit.execute` method to create a :class:`~qiskit.providers.QuantumJob` object, applies a :class:`~qbraid.providers.ibm.QiskitJob`, and return the result. Args: run_input: A circuit object to run on the IBM device. Keyword Args: shots (int): The number of times to run the task on the device. Default is 1024. Returns: qbraid.providers.ibm.QiskitJob: The job like object for the run. """ backend = self._device shots = kwargs.pop("shots", backend.options.get("shots")) memory = kwargs.pop("memory", True) # Needed to get measurements qiskit_job = backend.run(run_input, shots=shots, memory=memory) try: tags_lst = qiskit_job.update_tags(kwargs.get("tags", [])) except AttributeError: # BasicAerJob does not have update_tags tags_lst = [] tags = {tag: "*" for tag in tags_lst} qiskit_job_id = qiskit_job.job_id() return { "vendor_job_id": qiskit_job_id, "tags": tags, "shots": shots, "vendor_job_obj": qiskit_job, "qbraid_job_obj": QiskitJob, } def _run_batch(self, run_input: "List[qiskit.QuantumCircuit]", *args, **kwargs): """Runs circuit(s) on qiskit backend via :meth:`~qiskit.execute` Uses the :meth:`~qiskit.execute` method to create a :class:`~qiskit.providers.QuantumJob` object, applies a :class:`~qbraid.providers.ibm.QiskitJob`, and return the result. Args: run_input: A circuit object list to run on the wrapped device. Keyword Args: shots (int): The number of times to run the task on the device. Default is 1024. Returns: qbraid.providers.ibm.QiskitJob: The job like object for the run. """ return self._run(run_input, **kwargs)