Skip to content

Tracking distributed training jobs with Neptune#

Use cases addressed in this tutorial

  • How to track experiments in DDP mode and avoid creating a new run for every process
  • How to track single and multi node distributed training jobs

When training models in a distributed environment – for example, over multiple GPUs or nodes – you can use Neptune to track all the metadata across multiple processes into a single run. This lets you evaluate the results of the entire model training, validation, or testing session.

In this tutorial, we show two separate examples: One for single-node training and one for multi-node.

View example scripts on GitHub 

Before you start#

  • Sign up at neptune.ai/register.
  • Create a project for storing your metadata.
  • Install Neptune:

    pip install neptune
    
    Passing your Neptune credentials

    Once you've registered and created a project, set your Neptune API token and full project name to the NEPTUNE_API_TOKEN and NEPTUNE_PROJECT environment variables, respectively.

    export NEPTUNE_API_TOKEN="h0dHBzOi8aHR0cHM.4kl0jvYh3Kb8...6Lc"
    

    To find your API token: In the bottom-left corner of the Neptune app, expand the user menu and select Get my API token.

    export NEPTUNE_PROJECT="ml-team/classification"
    

    Your full project name has the form workspace-name/project-name. You can copy it from the project settings: Click the menu in the top-right → Details & privacy.

    On Windows, navigate to SettingsEdit the system environment variables, or enter the following in Command Prompt: setx SOME_NEPTUNE_VARIABLE 'some-value'


    While it's not recommended especially for the API token, you can also pass your credentials in the code when initializing Neptune.

    run = neptune.init_run(
        project="ml-team/classification",  # your full project name here
        api_token="h0dHBzOi8aHR0cHM6Lkc78ghs74kl0jvYh...3Kb8",  # your API token here
    )
    

    For more help, see Set Neptune credentials.

Tracking a single-node multi-GPU job#

To configure Neptune to track your single-node, multi-GPU distributed training job:

  1. Create a new Neptune run only on the main process.

    if __name__ == "__main__":
        ...
        rank = int(os.environ["RANK"])
        local_rank = int(os.environ["LOCAL_RANK"])
        ...
        net = nn.parallel.DistributedDataParallel(net, device_ids=[local_rank])
        ...
        run = (
            neptune.init_run()
            if rank == 0
            else None
        )
    

    This ensures that you create and log metadata to a single run.

  2. Log metadata only from the main process.

    ...
    
    if __name__ == "__main__":
        run = (
            neptune.init_run()
            if rank == 0
            else None
        )
    
        ...
    
        if rank == 0:
            acc = ...
            run["metrics/valid/acc"] = acc
    

Expected outcome

All metadata of the training job is logged to the same run.

See example script 

Logging to multiple instances of the same run#

You can initialize multiple run instances that log metadata to the same run by creating a custom run identifier.

You can then pass the custom ID to the custom_run_id argument of the init_run() function across all processes. Initializing each run with the same ID ensures that all the metadata is ultimately logged to a single run.

if __name__ == "__main__":

    ...

    if rank == 0:
        custom_run_id = [hashlib.md5(str(time.time()).encode()).hexdigest()] # (1)!
        monitoring_namespace = "monitoring"
    else:
        custom_run_id = [None]
        monitoring_namespace = f"monitoring/{rank}"

    dist.broadcast_object_list(custom_run_id, src=0)
    custom_run_id = custom_run_id[0]

    run = neptune.init_run(
        custom_run_id=custom_run_id,
        monitoring_namespace=monitoring_namespace,
    )

    # Train model
    ...
  1. Automatically create and broadcast custom_run_id to all processes.

See example script 

Related

You can also export the custom run ID as an environment variable. To ensure a unique ID for every run, you can use the following example:

export NEPTUNE_CUSTOM_RUN_ID=`date +"%Y%m%d%H%M%s%N" | md5sum`
export NEPTUNE_CUSTOM_RUN_ID=`date +"%Y%m%d%H%M%s%N" | md5`
set NEPTUNE_CUSTOM_RUN_ID=`date +"%Y%m%d%H%M%s%N"`

For more instructions, see Set custom run ID.

Tracking a multi node DDP job#

This example shows how to configure Neptune for distributed training jobs that use multiple nodes or GPUs.

As for single-node training, the idea is to ensure that each run instance is initialized with the same custom run ID. This ensures that all the metadata is logged to the same run.

  1. In each node, export the same NEPTUNE_CUSTOM_RUN_ID environment variable.

    To ensure a unique ID for every run, you can use the following example:

    export NEPTUNE_CUSTOM_RUN_ID=`date +"%Y%m%d%H%M%s%N" | md5sum`
    
    export NEPTUNE_CUSTOM_RUN_ID=`date +"%Y%m%d%H%M%s%N" | md5`
    
    set NEPTUNE_CUSTOM_RUN_ID=`date +"%Y%m%d%H%M%s%N"`
    
  2. To log metadata across processes only from the main process, you can use the PyTorch distributed reduce() method. It reduces the tensor data across all machines and makes the final tensor available to the main process.

def train(net, trainloader, run, rank, params):
    if rank == 0:
        # Log params
        run["parameters"] = params

    print("Start training...")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(
        net.parameters(), lr=params["lr"], momentum=params["momentum"]
    )
    num_of_batches = len(trainloader)
    for epoch in range(params["epochs"]):
        trainloader.sampler.set_epoch(epoch)
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            images, labels = inputs.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
            optimizer.zero_grad()
            outputs = net(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Gather loss value from all processes on main process for logging
            dist.reduce(tensor=loss, dst=0)
            dist.barrier()  # synchronizes all the threads

            if rank == 0:
                running_loss += loss.item() / dist.get_world_size()

        if rank == 0:
            epoch_loss = running_loss / num_of_batches
            # Log metrics
            run["metrics/train/loss"].append(epoch_loss)
            print(f'[Epoch {epoch + 1}/{params["epochs"]}] loss: {epoch_loss:.3f}')

    print("Finished training.")
def test(net, testloader, run, rank):

    correct = 0
    total = 0

    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images, labels = images.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)

            # Gather labels and predicted tensors from all processes on main process for logging
            dist.reduce(tensor=labels, dst=0)
            dist.barrier()  # synchronizes all the threads

            dist.reduce(tensor=predicted, dst=0)
            dist.barrier()  # synchronizes all the threads

            if rank == 0:
                correct += (predicted == labels).sum().item()

    if rank == 0:
        acc = 100 * correct // total
        # Log metrics
        run["metrics/valid/acc"] = acc
        print(f"Accuracy of the network on the 10 000 test images: {acc} %")
if __name__ == "__main__":

    init_distributed()

    rank = int(os.environ["RANK"])
    local_rank = int(os.environ["LOCAL_RANK"])

    params = {"batch_size": 256, "lr": 0.001, "epochs": 2, "momentum": 0.9}

    trainloader, testloader = create_data_loader_cifar10(
        rank=rank, batch_size=params["batch_size"]
    )

    net = torchvision.models.resnet50(weights=None).to(f"cuda:{rank}")
    net = nn.SyncBatchNorm.convert_sync_batchnorm(net)
    net = nn.parallel.DistributedDataParallel(net, device_ids=[rank])

    # To correctly monitor each GPU usage
    os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
    os.environ["CUDA_VISIBLE_DEVICES"] = str(rank)

    # Creates multiple run instances
    run = neptune.init_run(monitoring_namespace=f"monitoring/rank/{rank}")
    # We can leave out the custom_run_id argument, as Neptune picks it up
    # from the environment variable

    # Train model
    train(net, trainloader, run, rank, params)

    # Test model
    test(net, testloader, run, rank)

Expected outcome

Multiple run instances are created per node, but by exporting the custom_run_id as an env argument on each node terminal, you ensure that all instances log metadata to the same run.

Logging from different ranks#

If you are concerned about the performance of sending files to the main process (rank 0), you can modify your script to separately track large metadata types, such as files and dataset samples, from each node.

For a clear and accurate representation in charts, metrics or other values logged as a FloatSeries must still be tracked from the main process.

...
loss.backward()
optimizer.step()

if i % 10 == 0:
    for img in images:
        # (Neptune) Save batch of images from every node
        run[f"images/samples/rank_{rank}"].append(
            File.as_image(img.cpu().squeeze().permute(2, 1, 0).clip(0, 1))
        )

...

Note: In order to create an image series from image-like objects, as in the example above, import the File Neptune type:

from neptune.types import File
def test(net, testloader, run, rank):

    classes = [
        "airplane",
        "automobile",
        "bird",
        ...
    ]

    correct = 0
    total = 0

    with torch.no_grad():
        for i, data in enumerate(testloader):
            images, labels = data
            images, labels = images.to(f"cuda:{rank}"), labels.to(f"cuda:{rank}")
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)

            if i % 10 == 0:
                probs = F.softmax(outputs, dim=1)
                probs = probs.cpu().numpy()

                for i, ps in enumerate(probs):
                    pred = classes[np.argmax(ps)]
                    gt = classes[labels[i]]
                    # (Neptune) Save images and predictions from every node
                    run[f"images/predictions/rank_{rank}"].append(
                        File.as_image(
                            images[i].cpu().squeeze().permute(2, 1, 0).clip(0, 1)
                        ),
                        name=f"Predicted: {pred}, Ground Truth: {gt}",
                    )

            ...
if __name__ == "__main__":
    ...

    run = neptune.init_run(monitoring_namespace=f"monitoring/rank/{rank}")

    train(...)

    test(...)

For the complete code examples, see the example scripts on GitHub.

See example script