Qiskit Serverlessのコンピュートおよびデータリソースを管理する
パッケージバージョン
このページのコードは、以下の要件を使用して開発されました。 これらのバージョン以降を使用することを推奨します。
qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0
Qiskit Serverlessを使用すると、CPU、QPU、その他のコンピュートアクセラレーターを含むQiskitパターン全体のコンピュートとデータを管理できます。
詳細なステータスを設定する
Serverlessのワークロードには、ワークフロー全体に わたって複数のステージがあります。デフォルトでは、job.status()で以下のステータスを確認できます。
QUEUED: ワークロードがクラシカルリソースのキューに入っていますINITIALIZING: ワークロードをセットアップ中ですRUNNING: ワークロードがクラシカルリソース上で実行中ですDONE: ワークロードが正常に完了しました
また、特定のワークフローステージをより詳しく説明するカスタムステータスを設定することもできます。
# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path
Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py
from qiskit_serverless import update_status, Job
## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)
## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)
## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)
## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)
## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py
このワークロードが正常に完了すると(save_result()を使用)、ステータスは自動的にDONEに更新されます。
並列ワークフロー
並列化できるクラシカルタスクには、@distribute_taskデコレーターを使用してタスクの実行に必要なコンピュート要件を定義します。まず、Write your first Qiskit Serverless programのトピックにあるtranspile_remote.pyの例を以下のコードで参照してください。
以下のコードを使用するには、事前に認証情報を保存しておく必要があります。
%%writefile ./source_files/transpile_remote.py
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task
service = QiskitRuntimeService()
@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py
この例では、transpile_remote()関数を@distribute_task(target={"cpu": 1})でデコレートしています。実行すると、単一CPUコアを持つ非同期の並列ワーカータスクが作成され、ワーカーを追跡するための参照が返されます。結果を取得するには、その参照をget()関数に渡します。これを使用して複数の並列タスクを実行できます。
%%writefile --append ./source_files/transpile_remote.py
from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job
# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation
update_status(Job.OPTIMIZING_HARDWARE)
start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]
transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata
result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}
save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.
def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid
title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
さまざまなタスク設定を試す
@distribute_task()を使用して、タスクのCPU、GPU、メモリを柔軟に割り当てることができます。IBM Quantum® Platform上のQiskit Serverlessでは、各プログラムに16個のCPUコアと32 GB RAMが搭載されており、必要に応じて動的に割り当てることができます。
CPUコアは、完全なCPUコアとして、またはフラクショナル割り当て(部分割り当て)として割り当てることができます。
メモリはバイト数で割り当てます。1キロバイトに1024バイト、1メガバイトに1024キロバイト、1ギガバイトに1024メガバイトがあることを覚えておいてください。ワーカーに2 GBのメモリを割り当てるには、"mem": 2 * 1024 * 1024 * 1024を指定する必要があります。
%%writefile --append ./source_files/transpile_remote.py
@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
プログラム全体でデータを管理する
Qiskit Serverlessでは、すべてのプログラムにわたって/dataディレクトリ内のファイルを管理できます。いくつかの制限事項があります。
- 現在サポートされているのは
tarファイルとh5ファイルのみです - これはフラットな
/dataストレージのみであり、/data/folder/のようなサブディレクトリは使用できませ ん
以下では、ファイルのアップロード方法を示します。IBM Quantumアカウントを使用してQiskit Serverlessに認証されていることを確認してください(手順についてはDeploy to IBM Quantum Platformを参照してください)。
import tarfile
from qiskit_serverless import IBMServerlessClient
# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()
# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)
# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'
次に、dataディレクトリ内のすべてのファイルを一覧表示できます。このデータはすべてのプログラムからアクセス可能です。
serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']
これは、プログラムからfile_download()を使用してファイルをプログラム環境にダウンロードし、tarを解凍することで実行できます。
%%writefile ./source_files/extract_tarfile.py
import tarfile
from qiskit_serverless import IBMServerlessClient
serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)
with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()
この時点で、ローカル実験と同様にプログラムからファイルを操作できます。file_upload()、file_download()、file_delete()は、ローカル実験からも、アップロードされたプログラムからも呼び出すことができ、一貫性のある柔軟なデータ管理が実 現できます。
次のステップ
- 既存のコードをQiskit Serverlessに移植する完全な例をご覧ください。
- 研究者がQiskit Serverlessと量子中心スーパーコンピューティングを使用して量子化学を探求した論文をお読みください。