Hi @VishaalKapoor, thank you very much for your answer. So I just created a new version of my code, that I built explicitely on top of cifar10 distributed example. It runs on a single node, but when I go distributed everything hangs. For this debugging I am using a batch_size_per_gpu = 4
, and a total batch_size = 4*4=16
. The batch size for the validation loss is 4, i.e. the batch_size_per_gpu
. The code runs successfully (single node) also for batch_size_per_gpu=6
This is my code, it defers from the original in the following ways: different network called from external library (works great, debugged etc), different dataset (derived from gluon dataset), different transformations of the dataset and normalization, custom loss function (works great, debugged etc). The following code runs successfully in a single node (local HPC), with 4xP100 gpus, with either batch size per gpu of 4 or 6. With the same batch size (per gpu) of 4 or 6 in distributed training returns cuda malloc error. In addition, if I use batch_size_per_gpu = 2
, the code hangs, meaning that the gpus are not working - although there is memory ocupied. This is what I see when the code hangs:
This is the code:
#!/usr/bin/env python
from __future__ import print_function
import random, sys
import mxnet as mx
from mxnet import autograd, gluon, kv, nd
from mxnet.gluon.model_zoo import vision
import numpy as np
# @@@@@@@@@@@@@@@@ Foivos @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
from phaino.nn.loss.loss import *
from phaino.models.resunet_d7_causal_mtskcolor_ddist import *
import os
import sys
sys.path.append(r'/home/dia021/Projects/isprs_potsdam/src/')
import uuid
import pathlib
from ISPRSDataset import *
from ISPRSNormal import *
from phaino.datagen.semseg_aug_cv2 import * # dataset augmentor
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# Create a distributed key-value store
store = kv.create('dist') ## ORIGINAL FOR DISTRIBUTED
#store = kv.create('device') ## FOR SINGLE MACHINE
# @@@@@@@@@@@@@@@@@@ Foivos @@@@@@@@@@@@@@@@@@@@@@@@@@
NClasses = 6
nfilters_init = 32
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# How many epochs to run the training
epochs = 4
# batch_per_gpu
batch_size_per_gpu = 2 # 4 works - single machine, 6 works after calculating some scalar on cpu
# How many GPUs per machine
gpus_per_machine = 4
# Effective batch size across all GPUs - single machine
batch_size = batch_size_per_gpu * gpus_per_machine
# Number of cpus:
num_cpus = 8
# Create the context (a list of all GPUs to be used for training)
ctx = [mx.gpu(i) for i in range(gpus_per_machine)]
class SplitSampler(gluon.data.sampler.Sampler):
""" Split the dataset into `num_parts` parts and sample from the part with index `part_index`
Parameters
----------
length: int
Number of examples in the dataset
num_parts: int
Partition the data into multiple parts
part_index: int
The index of the part to read from
"""
def __init__(self, length, num_parts=1, part_index=0):
# Compute the length of each partition
self.part_len = length // num_parts
# Compute the start index for this partition
self.start = self.part_len * part_index
# Compute the end index for this partition
self.end = self.start + self.part_len
def __iter__(self):
# Extract examples between `start` and `end`, shuffle and return them.
indices = list(range(self.start, self.end))
random.shuffle(indices)
return iter(indices)
def __len__(self):
return self.part_len
# @@@@@@@@@@@@@@@@@ Foivos Datasets/DataGens @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
params_range = ParamsRange() # Parameters range for CV data augmentation
tnorm = ISPRSNormal()
ttransform = SemSegAugmentor_CV(params_range = params_range, norm = None, one_hot = True) #Default for semseg problems
data_dir = r'/data/dia021/isprs_potsdam/Data/'
RUN_ID = str(uuid.uuid4())
dataset_train = ISPRSDataset(root= data_dir,
mode = 'train' ,
mtsk = True,
color = True,
norm= tnorm,
transform= ttransform )
dataset_val = ISPRSDataset(root= data_dir,
mode = 'val',
mtsk = True,
color = True,
norm= tnorm,
transform=None)
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# Load the training data
train_data = gluon.data.DataLoader(dataset_train,
batch_size,
sampler=SplitSampler(len(dataset_train), store.num_workers, store.rank),
last_batch='discard',
num_workers = num_cpus)
# Load the test data
test_data = gluon.data.DataLoader(dataset_val,
batch_size_per_gpu,
shuffle=False,
last_batch='discard',
num_workers = num_cpus)
# Use ResNet from model zoo
net = ResUNet_d7(_nfilters_init = nfilters_init, _NClasses= NClasses)
# Initialize the parameters with Xavier initializer
net.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
# ########################### Foivos Loss function
TrainLoss = Tanimoto_wth_dual()
def val_loss (pred, label):
return 1. - TrainLoss(pred,label)
def loss_Tnmt(_prediction,_label):
"""
The output of the loss is a tensor of dimensions (_label.shape[0],)
"""
skip=6
pred_segm = _prediction[0]
pred_bound = _prediction[1]
pred_dists = _prediction[2]
pred_c = _prediction[3]
label_segm = _label[:,:skip,:,:]
label_bound = _label[:,skip:2*skip,:,:]
label_dists = _label[:,2*skip:-3,:,:]
label_c = _label[:,-3:,:,:]
loss_segm = 1.0 - TrainLoss(pred_segm, label_segm)
loss_bound = 1.0 - TrainLoss(pred_bound, label_bound)
loss_dists = 1.0 - TrainLoss(pred_dists, label_dists) #self.loss2(pred_dists,label_dists)
loss_c = 1.0 - TrainLoss(pred_c, label_c)
return (loss_segm+loss_bound+loss_dists+loss_c)/4.0
# Use Adam optimizer. Ask trainer to use the distributer kv store.
trainer = gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': .001}, kvstore=store)
# @@@@@@@@@@@@@@@@@@ Foivos @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
def eval_mcc(data_iterator, net):
skip = 6
mcc = mx.metric.MCC(average='micro')
avgLoss = 0.0
for i, data in enumerate(data_iterator):
d, l = data
d = nd.array(d)
l = nd.array(l)
l = l[:,:skip,:,:]
# Use single GPU for validation
data = d.as_in_context(ctx[0])
label = l.as_in_context(ctx[0])
with autograd.predict_mode():
prediction, _ , _ , _ = net(data)
tloss = val_loss(prediction, label)
avgLoss += nd.mean(tloss).asscalar()
if (l.shape[1] == prediction.shape[1]):
tl = nd.reshape(data=nd.flatten(l),shape=(-3))
else:
tl = nd.transpose(nd.one_hot(nd.squeeze(l),depth = NClasses ),axes=[0,3,1,2])
tl = nd.reshape(data=nd.flatten(l),shape=(-3))
tp = nd.reshape(data=nd.flatten(prediction.as_in_context(mx.cpu())),shape=(-3))
tp_2D = nd.stack( 1.-tp , tp, axis=-1)
mcc.update(labels = tl, preds = tp_2D)
avgLoss /= (i+1.0)
return avgLoss, mcc.get()[1]
# @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
# We'll use cross entropy loss since we are doing multiclass classification
loss = loss_Tnmt # gluon.loss.SoftmaxCrossEntropyLoss()
# Run one forward and backward pass on multiple GPUs
def forward_backward(net, data, label):
# Ask autograd to remember the forward pass
with autograd.record():
# Compute the loss on all GPUs
losses = [loss(net(X), Y) for X, Y in zip(data, label)]
# Run the backward pass (calculate gradients) on all GPUs
tsum = 0.0
for l in losses:
l.backward()
# Calculate mean loss, only to avoid memory flooding
tsum += nd.mean(l.as_in_context(mx.cpu())).asscalar()
#return tsum / float(len(losses))
# Train a batch using multiple GPUs
def train_batch(batch, ctx, net, trainer):
# Split and load data into multiple GPUs
data = batch[0]
data = gluon.utils.split_and_load(data, ctx)
# Split and load label into multiple GPUs
label = batch[1]
label = gluon.utils.split_and_load(label, ctx)
# Run the forward and backward pass
forward_backward(net, data, label)
# Update the parameters
this_batch_size = batch[0].shape[0]
trainer.step(this_batch_size)
# Run as many epochs as required
print ("Initiating training, RUN-ID:{}".format(RUN_ID))
#test_accuracy = evaluate_accuracy(test_data, net)
#test_accuracy, test_loss = eval_mcc(test_data, net)
#print("First test evaluation, Test_mcc %f: test_Tnmt: %f" % (test_accuracy,test_loss))
#sys.stdout.flush()
for epoch in range(epochs):
# Iterate through batches and run training using multiple GPUs
for batch in train_data:
# Train the batch using multiple GPUs
train_batch(batch, ctx, net, trainer)
# Print test accuracy after every epoch
#test_accuracy = evaluate_accuracy(test_data, net)
test_accuracy, test_loss = eval_mcc(test_data, net)
print("Epoch %d: Test_mcc %f: test_Tnmt: %f" % (epoch, test_accuracy,test_loss))
sys.stdout.flush()
A similar âhangâ of the code happens even in single node if I use the following definition for the average inside the forward_backward
function:
# Run one forward and backward pass on multiple GPUs
def forward_backward(net, data, label):
# Ask autograd to remember the forward pass
with autograd.record():
# Compute the loss on all GPUs
losses = [loss(net(X), Y) for X, Y in zip(data, label)]
# Run the backward pass (calculate gradients) on all GPUs
tsum = 0.0
for l in losses:
l.backward()
# Calculate mean loss, only to avoid memory flooding
tsum += nd.mean(l.as_in_context(mx.cpu())).asscalar() # This works OK
#tsum += nd.mean(l).asscalar() # This makes the code hang (gpus are idle), with memory occupied.
do you think this is a bug in mxnet (version 1.5.0)?
Thank you very much for your time,
Foivos