メインコンテンツへスキップ

セッションでジョブを実行する

Package versions

このページのコードは、以下の要件を使用して開発されました。 これらのバージョン以降を使用することをお勧めします。

qiskit[all]~=2.3.0
qiskit-ibm-runtime~=0.43.1
scipy~=1.16.3
Note

Open Plan のユーザーはセッションジョブを送信できません。ワークロードは ジョブモード または バッチモード で実行する必要があります。

QPU への専用かつ排他的なアクセスが必要な場合は、セッションを使用してください。

セッションを使用するための設定

セッションを開始する前に、Qiskit Runtime をセットアップしてサービスとして初期化する必要があります。

# Added by doQumentation — required packages for this notebook
!pip install -q numpy qiskit qiskit-ibm-runtime scipy
from qiskit_ibm_runtime import (
QiskitRuntimeService,
Session,
SamplerV2 as Sampler,
EstimatorV2 as Estimator,
)

service = QiskitRuntimeService()

セッションを開く

コンテキストマネージャー with Session(...) を使用するか、Session クラスを初期化することで、ランタイムセッションを開くことができます。セッションを開始する際は、backend オブジェクトを渡して QPU を指定する必要があります。セッションは最初のジョブが実行を開始したときに始まります。

備考

セッションを開いてから 30 分以内にジョブを送信しない場合、セッションは自動的に閉じられます。

Session クラス

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

backend = service.least_busy(operational=True, simulator=False)
session = Session(backend=backend)
estimator = Estimator(mode=session)
sampler = Sampler(mode=session)
# Close the session because no context manager was used.
session.close()

コンテキストマネージャー

コンテキストマネージャーはセッションを自動的に開き、閉じます。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

from qiskit_ibm_runtime import (
Session,
SamplerV2 as Sampler,
EstimatorV2 as Estimator,
)

backend = service.least_busy(operational=True, simulator=False)
with Session(backend=backend):
estimator = Estimator()
sampler = Sampler()

セッションの長さ

セッションの最大存続時間(TTL)は、セッションが実行できる時間を決定します。この値は max_time パラメーターで設定できます。この値は最も長いジョブの実行時間を超えるように設定してください。

このタイマーはセッションが開始したときに始まります。値に達すると、セッションは閉じられます。実行中のジョブは完了しますが、キューに残っているジョブは失敗します。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

with Session(backend=backend, max_time="25m"):
...

設定できないインタラクティブ存続時間(interactive TTL)の値もあります。その時間内にセッションジョブがキューに入らない場合、セッションは一時的に非アクティブ化されます。

デフォルト値:

インスタンスタイプ(Open または Premium Plan)インタラクティブ TTL最大 TTL
Premium Plan60 秒*8 時間*
* 一部の Premium Plan インスタンスは、異なる値に設定されている場合があります。

セッションの最大 TTL またはインタラクティブ TTL を確認するには、セッションの詳細を確認する の手順に従い、それぞれ max_time または interactive_timeout の値を確認してください。

セッションを終了する

セッションは以下の状況で終了します。

  • 最大タイムアウト(TTL)値に達し、キューに入っているすべてのジョブがキャンセルされます。
  • セッションが手動でキャンセルされ、キューに入っているすべてのジョブがキャンセルされます。
  • セッションが手動で閉じられます。セッションは新しいジョブの受け付けを停止しますが、キューに入っているジョブは優先的に実行され続けます。
  • with Session() のようにセッションをコンテキストマネージャーとして使用している場合、コンテキストが終了するとセッションは自動的に閉じられます(session.close() を使用した場合と同じ動作です)。

セッションを閉じる

セッションはコンテキストマネージャーを抜けると自動的に閉じられます。セッションのコンテキストマネージャーを抜けると、セッションは「進行中、新しいジョブを受け付けていない」ステータスになります。これは、最大タイムアウト値に達するまで、実行中またはキューに入っているすべてのジョブの処理を完了することを意味します。すべてのジョブが完了すると、セッションは直ちに閉じられます。これにより、スケジューラーはセッションのインタラクティブタイムアウトを待たずに次のジョブを実行でき、平均ジョブキュー時間が短縮されます。閉じられたセッションにジョブを送信することはできません。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

from qiskit.quantum_info import SparsePauliOp
from qiskit.circuit import QuantumCircuit, Parameter
from qiskit.transpiler import generate_preset_pass_manager
import numpy as np

# This cell is hidden from users
service = QiskitRuntimeService()
backend = service.least_busy()

# Define two circuits, each with one parameter with two parameters.
circuit = QuantumCircuit(2)
circuit.h(0)
circuit.cx(0, 1)
circuit.ry(Parameter("a"), 0)
circuit.cx(0, 1)
circuit.h(0)
circuit.measure_all()

pm = generate_preset_pass_manager(optimization_level=1, backend=backend)
transpiled_circuit = pm.run(circuit)
transpiled_circuit_sampler = transpiled_circuit
transpiled_circuit_sampler.measure_all()

# Create parameters and mapped observables to submit
params = np.random.uniform(size=(2, 3)).T
observables = [
SparsePauliOp(["XX", "IY"], [0.5, 0.5]),
SparsePauliOp("XX"),
SparsePauliOp("IY"),
]
mapped_observables = [
[observable.apply_layout(transpiled_circuit.layout)]
for observable in observables
]

sampler_pub = (transpiled_circuit_sampler, params)
estimator_pub = (transpiled_circuit_sampler, mapped_observables, params)
with Session(backend=backend) as session:
estimator = Estimator()
sampler = Sampler()
job1 = estimator.run([estimator_pub])
job2 = sampler.run([sampler_pub])

# The session is no longer accepting jobs but the submitted job will run to completion.
result = job1.result()
result2 = job2.result()
ヒント

コンテキストマネージャーを使用していない場合は、不要なコストを避けるために手動でセッションを閉じてください。ジョブの送信が完了したらすぐにセッションを閉じることができます。session.close() でセッションを閉じると、新しいジョブは受け付けられなくなりますが、すでに送信されたジョブは完了まで実行され、その結果を取得できます。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

session = Session(backend=backend)

# If using qiskit-ibm-runtime earlier than 0.24.0, change `mode=` to `session=`
estimator = Estimator(mode=session)
sampler = Sampler(mode=session)
job1 = estimator.run([estimator_pub])
job2 = sampler.run([sampler_pub])
print(f"Result1: {job1.result()}")
print(f"Result2: {job2.result()}")

# Manually close the session. Running and queued jobs will run to completion.
session.close()
Result1: PrimitiveResult([PubResult(data=DataBin(evs=np.ndarray(<shape=(3, 2), dtype=float64>), stds=np.ndarray(<shape=(3, 2), dtype=float64>), ensemble_standard_error=np.ndarray(<shape=(3, 2), dtype=float64>), shape=(3, 2)), metadata={'shots': 4096, 'target_precision': 0.015625, 'circuit_metadata': {}, 'resilience': {}, 'num_randomizations': 32})], metadata={'dynamical_decoupling': {'enable': False, 'sequence_type': 'XX', 'extra_slack_distribution': 'middle', 'scheduling_method': 'alap'}, 'twirling': {'enable_gates': False, 'enable_measure': True, 'num_randomizations': 'auto', 'shots_per_randomization': 'auto', 'interleave_randomizations': True, 'strategy': 'active-accum'}, 'resilience': {'measure_mitigation': True, 'zne_mitigation': False, 'pec_mitigation': False}, 'version': 2})
Result2: PrimitiveResult([SamplerPubResult(data=DataBin(meas=BitArray(<shape=(3, 2), num_shots=4096, num_bits=2>), meas0=BitArray(<shape=(3, 2), num_shots=4096, num_bits=133>), shape=(3, 2)), metadata={'circuit_metadata': {}})], metadata={'execution': {'execution_spans': ExecutionSpans([DoubleSliceSpan(<start='2026-01-15 07:53:15', stop='2026-01-15 07:53:21', size=24576>)])}, 'version': 2})

セッションのステータスを確認する

session.status() を使用するか、ワークロード ページを表示することで、セッションの現在の状態を確認できます。

セッションのステータスは次のいずれかになります。

  • Pending(保留中): セッションがまだ開始されていないか、非アクティブ化されています。次のセッションジョブは、他のジョブと同様にキューで待機する必要があります。
  • In progress, accepting new jobs(進行中、新しいジョブを受け付けている): セッションがアクティブで新しいジョブを受け付けています。
  • In progress, not accepting new jobs(進行中、新しいジョブを受け付けていない): セッションはアクティブですが、新しいジョブを受け付けていません。セッションへのジョブ送信は拒否されますが、未完了のセッションジョブは完了まで実行されます。すべてのジョブが完了すると、セッションは自動的に閉じられます。
  • Closed(閉じられた): セッションの最大タイムアウト値に達したか、セッションが明示的に閉じられました。

セッションの詳細を確認する

セッションの設定とステータスの包括的な概要については、session.details() メソッドを使用してください。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

from qiskit_ibm_runtime import (
QiskitRuntimeService,
Session,
EstimatorV2 as Estimator,
)

service = QiskitRuntimeService()
backend = service.least_busy(operational=True, simulator=False)

with Session(backend=backend) as session:
print(session.details())
{'id': 'be84569d-86b5-4a7f-be5e-7d33e80dc220', 'backend_name': 'ibm_torino', 'interactive_timeout': 60, 'max_time': 28800, 'active_timeout': 28800, 'state': 'open', 'accepting_jobs': True, 'last_job_started': None, 'last_job_completed': None, 'started_at': None, 'closed_at': None, 'activated_at': None, 'mode': 'dedicated', 'usage_time': None}

使用パターン

セッションは、古典リソースと量子リソース間で頻繁な通信が必要なアルゴリズムに特に役立ちます。

例:古典的な SciPy オプティマイザーを使用してコスト関数を最小化する反復ワークロードを実行します。このモデルでは、SciPy はコスト関数の出力を使用して次の入力を計算します。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

from scipy.optimize import minimize
from qiskit.circuit.library import efficient_su2

def cost_func(params, ansatz, hamiltonian, estimator):
# Return estimate of energy from estimator

energy = sum(
estimator.run([(ansatz, hamiltonian, params)]).result()[0].data.evs
)
return energy

hamiltonian = SparsePauliOp.from_list(
[("YZ", 0.3980), ("ZI", -0.3980), ("ZZ", -0.0113), ("XX", 0.1810)]
)
su2_ansatz = efficient_su2(hamiltonian.num_qubits)
pm = generate_preset_pass_manager(backend=backend, optimization_level=3)
ansatz = pm.run(su2_ansatz)
mapped_hamiltonian = [
operator.apply_layout(ansatz.layout) for operator in hamiltonian
]

num_params = ansatz.num_parameters
x0 = 2 * np.pi * np.random.random(num_params)

session = Session(backend=backend)

# If using qiskit-ibm-runtime earlier than 0.24.0, change `mode=` to `session=`
estimator = Estimator(mode=session, options={"default_shots": int(1e4)})
res = minimize(
cost_func,
x0,
args=(ansatz, mapped_hamiltonian, estimator),
method="cobyla",
options={"maxiter": 25},
)

# Close the session because no context manager was used.
session.close()

スレッドを使用してセッションで 2 つの VQE アルゴリズムを実行する

複数のワークロードを同時に実行することで、セッションをより効果的に活用できます。次の例では、異なる古典的オプティマイザーを使用する 2 つの VQE アルゴリズムを、単一のセッション内で同時に実行する方法を示します。ジョブタグは各ワークロードのジョブを区別するためにも使用されます。

注意

以下のコードブロックは、セッションを使用しているため、Open Plan のユーザーにはエラーが返されます。Open Plan のワークロードは、ジョブモード または バッチモード でのみ実行できます。

from concurrent.futures import ThreadPoolExecutor
from qiskit_ibm_runtime import EstimatorV2 as Estimator

def minimize_thread(estimator, method):
return minimize(
cost_func,
x0,
args=(ansatz, mapped_hamiltonian, estimator),
method=method,
options={"maxiter": 25},
)

with Session(backend=backend), ThreadPoolExecutor() as executor:
estimator1 = Estimator()
estimator2 = Estimator()

# Use different tags to differentiate the jobs.
estimator1.options.environment.job_tags = ["cobyla"]
estimator2.options.environment.job_tags = ["nelder-mead"]

# Submit the two workloads.
cobyla_future = executor.submit(minimize_thread, estimator1, "cobyla")
nelder_mead_future = executor.submit(
minimize_thread, estimator2, "nelder-mead"
)

# Get workload results.
cobyla_result = cobyla_future.result()
nelder_mead_result = nelder_mead_future.result()

次のステップ

Recommendations