Skip to content

Tracking distributed training jobs with Neptune#

Use cases addressed in this tutorial

  • How to track experiments in DDP mode and avoid creating a new run for every process
  • How to track single- and multi-node distributed training jobs

When training models in a distributed environment – for example, over multiple GPUs or nodes – you can use Neptune to track all the metadata across multiple processes into a single run. This lets you evaluate the results of the entire model training, validation, or testing session.

In this tutorial, we show two separate examples: One for single-node training and one for multi-node.

View example scripts on GitHub 

Before you start#

Tracking a single-node multi-GPU job#

To configure Neptune to track your single-node, multi-GPU distributed training job:

  1. Create a new Neptune run only on the main process.

    if __name__ == "__main__":
        ...
        rank = rank = int(os.environ["RANK"])
        local_rank = int(os.environ["LOCAL_RANK"])
        ...
        net = nn.parallel.DistributedDataParallel(net, device_ids=[local_rank])
        ...
        run = (
            neptune.init_run()
            if rank == 0
            else None
        )
    

    This ensures that you create and log metadata to a single run.

  2. Log metadata only from the main process.

    ...
    
    if __name__ == "__main__":
        run = (
            neptune.init_run()
            if rank == 0
            else None
        )
    
        ...
    
        if rank == 0:
            acc = ...
            run["metrics/valid/acc"] = acc
    

Expected outcome

All metadata of the training job is logged to the same run.

See example script 

Logging to multiple instances of the same run#

You can initialize multiple run instances that log metadata to the same run by creating a custom run identifier.

You can then pass the custom ID to the custom_run_id argument of the init_run() method across all processes. Initializing each run with the same ID ensures that all the metadata is ultimately logged to a single run.

if __name__ == "__main__":

    ...

    if rank == 0:
        custom_run_id = [hashlib.md5(str(time.time()).encode()).hexdigest()]  # (1)!
        monitoring_namespace = "monitoring"
    else:
        custom_run_id = [None]
        monitoring_namespace = f"monitoring/{rank}"

    dist.broadcast_object_list(custom_run_id, src=0)
    custom_run_id = custom_run_id[0]

    run = neptune.init_run(
        custom_run_id=custom_run_id,
        monitoring_namespace=monitoring_namespace,
    )

    # Train model
    ...
  1. Automatically create and broadcast custom_run_id to all processes.

See example script 

Related

You can also export the custom run ID as an environment variable. To ensure a unique ID for every run, you can use the following example:

export NEPTUNE_CUSTOM_RUN_ID=`date | md5`
export NEPTUNE_CUSTOM_RUN_ID=`date | md5`
set NEPTUNE_CUSTOM_RUN_ID=`date | md5`

For more instructions, see Setting custom run ID.

Tracking a multi-node DDP job#

This example shows how to configure Neptune for multi-node or multi-GPU distributed training jobs.

As for single-node training, the idea is to ensure that each run instance is initialized with the same custom run ID. This ensures that all the metadata is logged to the same run.

  1. In each node, export the same NEPTUNE_CUSTOM_RUN_ID environment variable.

    To ensure a unique ID for every run, you can use the following example:

    export NEPTUNE_CUSTOM_RUN_ID=`date | md5`
    
    export NEPTUNE_CUSTOM_RUN_ID=`date | md5`
    
    set NEPTUNE_CUSTOM_RUN_ID=`date | md5`
    
  2. To log metadata across processes only from the main process, you can use the PyTorch distributed reduce() method. It reduces the tensor data across all machines and makes the final tensor available to the main process.

def train(net, trainloader, run, rank, params):
    if rank == 0:
        # Log params
        run["parameters"] = params

    print("Start training...")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(
        net.parameters(), lr=params["lr"], momentum=params["momentum"]
    )
    num_of_batches = len(trainloader)
    for epoch in range(params["epochs"]):
        trainloader.sampler.set_epoch(epoch)
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            images, labels = inputs.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Gather loss value from all processes on main process for logging
            dist.reduce(tensor=loss, dst=0)
            dist.barrier()  # synchronizes all the threads

            if rank == 0:
                running_loss += loss.item() / dist.get_world_size()

        if rank == 0:
            epoch_loss = running_loss / num_of_batches
            # Log metrics
            run["metrics/train/loss"].append(epoch_loss)
            print(f'[Epoch {epoch + 1}/{params["epochs"]}] loss: {epoch_loss:.3f}')

    print("Finished training.")
def test(net, testloader, run, rank):

    correct = 0
    total = 0

    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images, labels = images.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)

            # Gather labels and predicted tensors from all processes on main process for logging
            dist.reduce(tensor=labels, dst=0)
            dist.barrier()  # synchronizes all the threads

            dist.reduce(tensor=predicted, dst=0)
            dist.barrier()  # synchronizes all the threads

            if rank == 0:
                correct += (predicted == labels).sum().item()

    if rank == 0:
        acc = 100 * correct // total
        # Log metrics
        run["metrics/valid/acc"] = acc
        print(f"Accuracy of the network on the 10 000 test images: {acc} %")
if __name__ == "__main__":

    init_distributed()

    rank = rank = int(os.environ["RANK"])
    local_rank = int(os.environ["LOCAL_RANK"])

    params = {"batch_size": 256, "lr": 0.001, "epochs": 2, "momentum": 0.9}

    trainloader, testloader = create_data_loader_cifar10(
        rank=rank, batch_size=params["batch_size"]
    )

    net = torchvision.models.resnet50(weights=None).to(f"cuda:{rank}")
    net = nn.SyncBatchNorm.convert_sync_batchnorm(net)
    net = nn.parallel.DistributedDataParallel(net, device_ids=[rank])

    # To correctly monitor each GPU usage
    os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
    os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)

    # Creates multiple run instances
    run = neptune.init_run(monitoring_namespace=f"monitoring/rank/{rank}")
    # We can leave out the custom_run_id argument, as Neptune picks it up
    # from the environment variable

    # Train model
    train(net, trainloader, run, rank, params)

    # Test model
    test(net, testloader, run, rank)

Expected outcome

Multiple run instances are created per node, but by exporting the custom_run_id as an env argument on each node terminal, you ensure that all instances log metadata to the same run.

Logging from different ranks#

If you have performance concerns about sending files to the main process (rank 0), you can modify your script to track metadata separately from each node.

The limitation is that in order to make sense, metrics or other values that are logged as a series (resulting in a Series field) must still be logged from the main process.

...
loss.backward()
optimizer.step()

if i % 10 == 0:
    for img in images:
        # (Neptune) Save batch of images from every node
        run[f"images/samples/rank_{rank}"].append(
            File.as_image(img.cpu().squeeze().permute(2, 1, 0).clip(0, 1))
        )

dist.reduce(tensor=loss, dst=0)
...

Note: In order to create an image series from image-like objects, as in the example above, import the File Neptune type:

from neptune.new.types import File
def test(net, testloader, run, rank):

    classes = [
        "airplane",
        "automobile",
        "bird",
        ...
    ]

    correct = 0
    total = 0

    with torch.no_grad():
        for i, data in enumerate(testloader):
            images, labels = data
            images, labels = images.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)

            if i % 10 == 0:
                probs = F.softmax(outputs, dim=1)
                probs = probs.cpu().numpy()

                for i, ps in enumerate(probs):
                    pred = classes[np.argmax(ps)]
                    gt = classes[labels[i]]
                    # (Neptune) Save images and predictions from every node
                    run[f"images/predictions/rank_{rank}"].append(
                        File.as_image(
                            images[i].cpu().squeeze().permute(2, 1, 0).clip(0, 1)
                        ),
                        name=f"Predicted: {pred}, Ground Truth: {gt}",
                    )

            dist.reduce(tensor=labels, dst=0)
            ...

For the complete code examples, see the example scripts on GitHub.

See example script