.rich-text-blog p figure,.rich-text-blog figure { margin-bottom:20px !important;} .rich-text-blog ul{ margin-bottom:20px !important;} Rayはオープンソースの統合コンピュートフレームワークで、分散環境におけるAIとPythonワークロードのスケーリングを簡素化します。 Databricks上でのRayの実行サポートを導入して以来、予測や深層強化学習からLLMの微調整に至るまで、数多くのお客様が機械学習のユースケースの導入に成功しています。 Rayバージョン2.8.0のリリースに伴い、Ray on Databricksのオートスケーリングサポートが追加されました。 オートスケーリングは、変動する需要に対してリソースを動的に調整することができるため、不可欠です。 処理のニーズは時間と共に大きく変化する可能性があるため、オートスケーリングにより、最適なパフォーマンスとコスト効率を保証し、手動介入を必要とせずに計算能力と費用のバランスを維持するのに役立ちます。 Databricks上のRayオートスケーリングは、必要に応じてワーカーノードを追加または削除することができ、Sparkフレームワークを活用して分散コンピューティング環境におけるスケーラビリティ、コスト効率、応答性を向上させます。 この統合されたアプローチは、複雑な権限、クラウドの初期化スクリプト、ログの設定を定義する必要がないため、OSSオートスケールを実装する代替案よりもはるかにシンプルです。 完全に管理され、本番稼動可能な統合オートスケーリングソリューションにより、Rayワークロードの複雑さとコストを大幅に削減することができます。 オートスケールを有効にしてDatabricks上にRayクラスタを作成する Rayの最新バージョンをインストールするだけで開始できます。 # Install Ray with the 'default','tune' extensions for # Ray dashboard, and tuning support %pip install ray[default,tune]>=2.8.0 次のステップでは、`ray.util.spark.setup_ray_cluster() `関数を使用して、これから開始するRayクラスタの設定を行います。 オートスケール機能を利用するには、Rayクラスタが使用できるワーカーノードの最大数を指定し、割り当てられたコンピュートリソースを定義し、オートスケールフラグをTrueに設定します。 さらに、Databricksクラスタが自動スケーリングを有効にして起動されていることを確認することが重要です。 詳しくはドキュメントをご覧ください。 これらのパラメータを設定した後、Rayクラスタを初期化すると、オートスケーリングはDatabricksのオートスケーリングと全く同じように機能します。 以下は、オートスケール機能を持つRayクラスタのセットアップ例です。 from ray.util.spark import setup_ray_cluster setup_ray_cluster( num_worker_nodes,#set to max number of nodes to Autoscale num_cpus_head_node,# set to the cores used in the driver node num_gpus_head_node, # set for GPU enabled cluster num_cpus_per_node,# cores added from each worker node num_gpus_per_node, #set for GPU enabled cluster autoscale = True #set only for clusters with Auto Scaling Enabled ) この機能は、Databricks Runtimeバージョン14.0以上を実行しているDatabricksクラスタと互換性があります。 Spark上でRayクラスタを構成するために利用可能なパラメータの詳細については、setup_ray_clusterドキュメントを参照してください。 Rayクラスタが初期化されると、RayヘッドノードがRayダッシュボードに表示されます。 from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster ray_conf = setup_ray_cluster( num_worker_nodes= 4, num_cpus_head_node=3, num_cpus_per_node=4, autoscale = True ) ジョブがRayクラスタにサブミットされると、Ray Autoscaler APIは必要なCPUとGPUコンピュート要件のタスクをサブミットしてSparkクラスタにリソースを要求します。 Sparkスケジューラは、現在のクラスタリソースがタスクの計算要求を満たすことができない場合にワーカーノードをスケールアップします。タスクが完了し、保留中の追加タスクがない場合にはクラスタをスケールダウンします。 autoscale_upscaling_speedと autoscale_idle_timeout_minutesパラメータを調整することで、スケールアップとスケールダウンの速度を制御できます。 これらの制御パラメータの詳細については、マニュアルを参照してください。 処理が完了すると、Rayは割り当てられたリソースをすべてSparkクラスタに戻し、他のタスクやダウンスケーリングのために使用することで、リソースの効率的な利用を保証します。 オートスケーリングプロセスを示すために、ハイパーパラメータチューニングの例を見てみましょう。 この例では、CIFAR10データセットでPyTorchモデルを学習します。コードはRayのドキュメントを参考にしました。 チューニングしたいPyTorchモデルを定義することから始めます。 import torch.nn as nn import torch.nn.functional as F class Net(nn.Module): def __init__(self, l1=120, l2=84): super(Net, self).__init__() self.conv1 = nn.Conv2d(3, 6, 5) self.pool = nn.MaxPool2d(2, 2) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16 * 5 * 5, l1) self.fc2 = nn.Linear(l1, l2) self.fc3 = nn.Linear(l2, 10) def forward(self, x): x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = x.view(-1, 16 * 5 * 5) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x データローダーを独自の関数でラップし、グローバルデータディレクトリを渡します。 こうすることで、異なる試験間でデータディレクトリを共有することができます。 import torchvision import torchvision.transforms as transforms from filelock import FileLock def load_data(data_dir="./data"): transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] ) # We add FileLock here because multiple workers will want to # download data, and this may cause overwrites since # DataLoader is not threadsafe. with FileLock(os.path.expanduser("~/.data.lock")): trainset = torchvision.datasets.CIFAR10( root=data_dir, train=True, download=True, transform=transform ) testset = torchvision.datasets.CIFAR10( root=data_dir, train=False, download=True, transform=transform ) return trainset, testset 次に、コンフィグを取り込み、トーチモデルのトレーニングループを1回実行する関数を定義します。 各トライアルが終了すると、重みをチェックポイントし、`train, report` APIを使用して評価された損失を報告します。 モデルの損失特性を改善しない効果のない試行をスケジューラが停止できるようにするためです。 import os import torch import torch.optim as optim from torch.utils.data import random_split import ray from ray import train, tune from ray.train import Checkpoint def train_cifar(config, loc): # location to store the checkpoints net = Net(config["l1"], config["l2"]) # check whether to load in CPU or GPU device = "cpu" if torch.cuda.is_available(): device = "cuda:0" net.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(net.parameters(), lr=config["lr"], momentum=0.9) # load the Dataset data_dir = os.path.abspath("./data") trainset, testset = load_data(data_dir) test_abs = int(len(trainset) * 0.8) train_subset, val_subset = random_split( trainset, [test_abs, len(trainset) - test_abs] ) trainloader = torch.utils.data.DataLoader( train_subset, batch_size=int(config["batch_size"]), shuffle=True, num_workers=8 ) valloader = torch.utils.data.DataLoader( val_subset, batch_size=int(config["batch_size"]), shuffle=True, num_workers=8 ) 次に、設定ファイルで指定された合計エポック数だけ実行される学習ループを定義します: トレーニングループ - 学習データセットを繰り返し、最適なパラメータに収束させようとします。 検証/テストループ - テストデータセットを繰り返し、モデルのパフォーマンスが向上しているかどうかをチェックします。 for epoch in range(config["max_epoch"]): # loop over the dataset multiple times running_loss = 0.0 epoch_steps = 0 for i, data in enumerate(trainloader, 0): # get the inputs; data is a list of [inputs, labels] inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) # zero the parameter gradients optimizer.zero_grad() # forward + backward + optimize outputs = net(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() # print statistics running_loss += loss.item() epoch_steps += 1 if i % 2000 == 1999: # print every 2000 mini-batches print( "[%d, %5d] loss: %.3f" % (epoch + 1, i + 1, running_loss / epoch_steps) ) running_loss = 0.0 # Validation loss val_loss = 0.0 val_steps = 0 total = 0 correct = 0 for i, data in enumerate(valloader, 0): with torch.no_grad(): inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) outputs = net(inputs) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() loss = criterion(outputs, labels) val_loss += loss.cpu().numpy() val_steps += 1 最後に、まずチェックポイントを保存し、いくつかのメトリクスをRay Tuneに報告します。 具体的には、検証の損失と精度をRay Tuneに送り返します。 Ray Tuneは、これらのメトリクスを使用して、どのハイパーパラメータ構成が最良の結果につながるかを決定します。 # Here we save a checkpoint. It is automatically registered with # Ray Tune and can be accessed through `train.get_checkpoint()` # API in future iterations. import os import torch import ray from ray import train from ray.train import Checkpoint os.makedirs(f"{loc}/mymodel", exist_ok=True) torch.save((net.state_dict(), optimizer.state_dict()), f"{loc}/mymodel/checkpoint.pt") checkpoint = Checkpoint.from_directory(f"{loc}/mymodel/") train.report( {"loss": (val_loss / val_steps), "try_gpu": False, "accuracy": correct / total}, checkpoint=checkpoint, ) print("Finished Training") 次に、与えられたハイパーパラメータに対してオプティマイザが選択する探索空間を指定することで、チューニング作業を開始するための主要コンポーネントを定義します。 探索空間の定義 以下の設定は、ハイパーパラメータとその検索選択範囲を辞書として表現しています。 指定されたパラメータタイプごとに、適切なセレクタアルゴリズム(定義されるパラメータの性質に応じて、sample_from、loguniform、choiceなど)を使用します。 from ray import tune config = { "l1": tune.sample_from(lambda _: 2 ** np.random.randint(2, 9)), "l2": tune.sample_from(lambda _: 2 ** np.random.randint(2, 9)), "lr": tune.loguniform(1e-4, 1e-1), "batch_size": tune.choice([2, 4, 8, 16]), "max_epoch":20 } 各試行で、Ray Tuneはこれらの探索空間からパラメーターの組み合わせをランダムにサンプリングします。 上で定義したコンフィグの範囲内で各パラメータの値を選択した後、グループの中で最もパフォーマンスの高いモデルを見つけるために、複数のモデルを並行して訓練します。 うまく機能していないパラメータ選択の反復を短絡させるために、ASHASchedulerを使用します。このASHASchedulerは、効果のないトライアルを早期に終了させます。 from ray.tune.schedulers import ASHAScheduler scheduler = ASHAScheduler( max_t=config['max_epoch'], grace_period=5, reduction_factor=2 ) APIを調整する 最後に、Tuner APIを呼び出して実行を開始します。 トレーニング開始メソッドを呼び出す際に、トライアルごとにRay Tuneに使用を許可するリソース、チェックポイントのデフォルトの保存場所、反復最適化中に最適化するターゲットメトリックを定義する追加設定オプションを渡します。 Ray Tuneで使用可能な各種パラメーターの詳細については、こちらをご参照ください。 import os from ray import train, tune tuner = tune.Tuner( tune.with_resources( tune.with_parameters(train_cifar, loc=loc), resources={"cpu": cpus_per_trial, "gpu": gpus_per_trial}, ), tune_config=tune.TuneConfig( metric="loss", mode="min", scheduler=scheduler, num_samples=num_samples, # total trails to run given the search space ), run_config=train.RunConfig( storage_path=os.path.expanduser(loc), name="tune_checkpointing_location" ), param_space=config, ) results = tuner.fit() このコードを特定のリソース制約を宣言して実行するとどうなるかを確認するため、cpus_per_trial = 3、gpu = 0、total_epochs = 20を使用して、CPUのみで実行をトリガーしてみましょう。 上図のようにオートスケーラーがリソースのリクエストを開始し、下図のように保留中のリソースがUIに記録されるのがわかります。 Rayクラスタによるリソースの現在の需要を満たすことができない場合、同様にdatabricksクラスタのオートスケーリングを開始します。 最後に、ジョブの出力が終了すると、いくつかの悪い試行が早期に終了され、計算量の節約につながったことがわかります。 GPUリソースを使用しても、コードを変更することなく、同じ処理を実行できます。 ノートブックをクローンして、あなたの環境で自由に実行してください: 次の記事 Rayワークロードの自動スケーリングをサポートすることで、RayとDatabricksの統合をさらに強化し、ダイナミックなワークロードのスケーリングを支援します。 この統合のロードマップは、さらにエキサイティングな展開をお約束します。 続報にご期待ください!