Tracking distributed training jobs with Neptune#
Use cases addressed in this tutorial
- How to track experiments in DDP mode and avoid creating a new run for every process
- How to track single and multi node distributed training jobs
When training models in a distributed environment – for example, over multiple GPUs or nodes – you can use Neptune to track all the metadata across multiple processes into a single run. This lets you evaluate the results of the entire model training, validation, or testing session.
In this tutorial, we show two separate examples: One for single-node training and one for multi-node.
View example scripts on GitHub 
Before you start#
- Sign up at neptune.ai/register.
- Create a project for storing your metadata.
-
Install Neptune:
Passing your Neptune credentials
Once you've registered and created a project, set your Neptune API token and full project name to the
NEPTUNE_API_TOKEN
andNEPTUNE_PROJECT
environment variables, respectively.To find your API token: In the bottom-left corner of the Neptune app, expand the user menu and select Get my API token.
Your full project name has the form
workspace-name/project-name
. You can copy it from the project settings: Click the menu in the top-right → Details & privacy.On Windows, navigate to Settings → Edit the system environment variables, or enter the following in Command Prompt:
setx SOME_NEPTUNE_VARIABLE 'some-value'
While it's not recommended especially for the API token, you can also pass your credentials in the code when initializing Neptune.
run = neptune.init_run( project="ml-team/classification", # your full project name here api_token="h0dHBzOi8aHR0cHM6Lkc78ghs74kl0jvYh...3Kb8", # your API token here )
For more help, see Set Neptune credentials.
Tracking a single-node multi-GPU job#
To configure Neptune to track your single-node, multi-GPU distributed training job:
-
Create a new Neptune run only on the main process.
if __name__ == "__main__": ... rank = int(os.environ["RANK"]) local_rank = int(os.environ["LOCAL_RANK"]) ... net = nn.parallel.DistributedDataParallel(net, device_ids=[local_rank]) ... run = ( neptune.init_run() if rank == 0 else None )
This ensures that you create and log metadata to a single run.
-
Log metadata only from the main process.
Expected outcome
All metadata of the training job is logged to the same run.
Logging to multiple instances of the same run#
You can initialize multiple run instances that log metadata to the same run by creating a custom run identifier.
You can then pass the custom ID to the custom_run_id
argument of the init_run()
function across all processes. Initializing each run with the same ID ensures that all the metadata is ultimately logged to a single run.
if __name__ == "__main__":
...
if rank == 0:
custom_run_id = [hashlib.md5(str(time.time()).encode()).hexdigest()] # (1)!
monitoring_namespace = "monitoring"
else:
custom_run_id = [None]
monitoring_namespace = f"monitoring/{rank}"
dist.broadcast_object_list(custom_run_id, src=0)
custom_run_id = custom_run_id[0]
run = neptune.init_run(
custom_run_id=custom_run_id,
monitoring_namespace=monitoring_namespace,
)
# Train model
...
- Automatically create and broadcast
custom_run_id
to all processes.
Related
You can also export the custom run ID as an environment variable. To ensure a unique ID for every run, you can use the following example:
For more instructions, see Set custom run ID.
Tracking a multi node DDP job#
This example shows how to configure Neptune for distributed training jobs that use multiple nodes or GPUs.
As for single-node training, the idea is to ensure that each run instance is initialized with the same custom run ID. This ensures that all the metadata is logged to the same run.
-
In each node, export the same
NEPTUNE_CUSTOM_RUN_ID
environment variable.To ensure a unique ID for every run, you can use the following example:
-
To log metadata across processes only from the main process, you can use the PyTorch distributed
reduce()
method. It reduces the tensor data across all machines and makes the final tensor available to the main process.
def train(net, trainloader, run, rank, params):
if rank == 0:
# Log params
run["parameters"] = params
print("Start training...")
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
net.parameters(), lr=params["lr"], momentum=params["momentum"]
)
num_of_batches = len(trainloader)
for epoch in range(params["epochs"]):
trainloader.sampler.set_epoch(epoch)
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
inputs, labels = data
images, labels = inputs.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
optimizer.zero_grad()
outputs = net(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# Gather loss value from all processes on main process for logging
dist.reduce(tensor=loss, dst=0)
dist.barrier() # synchronizes all the threads
if rank == 0:
running_loss += loss.item() / dist.get_world_size()
if rank == 0:
epoch_loss = running_loss / num_of_batches
# Log metrics
run["metrics/train/loss"].append(epoch_loss)
print(f'[Epoch {epoch + 1}/{params["epochs"]}] loss: {epoch_loss:.3f}')
print("Finished training.")
def test(net, testloader, run, rank):
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
images, labels = images.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
# Gather labels and predicted tensors from all processes on main process for logging
dist.reduce(tensor=labels, dst=0)
dist.barrier() # synchronizes all the threads
dist.reduce(tensor=predicted, dst=0)
dist.barrier() # synchronizes all the threads
if rank == 0:
correct += (predicted == labels).sum().item()
if rank == 0:
acc = 100 * correct // total
# Log metrics
run["metrics/valid/acc"] = acc
print(f"Accuracy of the network on the 10 000 test images: {acc} %")
if __name__ == "__main__":
init_distributed()
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
params = {"batch_size": 256, "lr": 0.001, "epochs": 2, "momentum": 0.9}
trainloader, testloader = create_data_loader_cifar10(
rank=rank, batch_size=params["batch_size"]
)
net = torchvision.models.resnet50(weights=None).to(f"cuda:{rank}")
net = nn.SyncBatchNorm.convert_sync_batchnorm(net)
net = nn.parallel.DistributedDataParallel(net, device_ids=[rank])
# To correctly monitor each GPU usage
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)
# Creates multiple run instances
run = neptune.init_run(monitoring_namespace=f"monitoring/rank/{rank}")
# We can leave out the custom_run_id argument, as Neptune picks it up
# from the environment variable
# Train model
train(net, trainloader, run, rank, params)
# Test model
test(net, testloader, run, rank)
Expected outcome
Multiple run instances are created per node, but by exporting the custom_run_id
as an env
argument on each node terminal, you ensure that all instances log metadata to the same run.
Logging from different ranks#
If you are concerned about the performance of sending files to the main process (rank 0), you can modify your script to separately track large metadata types, such as files and dataset samples, from each node.
For a clear and accurate representation in charts, metrics or other values logged as a FloatSeries
must still be tracked from the main process.
...
loss.backward()
optimizer.step()
if i % 10 == 0:
for img in images:
# (Neptune) Save batch of images from every node
run[f"images/samples/rank_{rank}"].append(
File.as_image(img.cpu().squeeze().permute(2, 1, 0).clip(0, 1))
)
...
Note: In order to create an image series from image-like objects, as in the example above, import the File
Neptune type:
def test(net, testloader, run, rank):
classes = [
"airplane",
"automobile",
"bird",
...
]
correct = 0
total = 0
with torch.no_grad():
for i, data in enumerate(testloader):
images, labels = data
images, labels = images.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
if i % 10 == 0:
probs = F.softmax(outputs, dim=1)
probs = probs.cpu().numpy()
for i, ps in enumerate(probs):
pred = classes[np.argmax(ps)]
gt = classes[labels[i]]
# (Neptune) Save images and predictions from every node
run[f"images/predictions/rank_{rank}"].append(
File.as_image(
images[i].cpu().squeeze().permute(2, 1, 0).clip(0, 1)
),
name=f"Predicted: {pred}, Ground Truth: {gt}",
)
...
For the complete code examples, see the example scripts on GitHub.