Distributed training questions

Dear all,

I am getting my hands dirty with asynchronous distributed training. All looks good, and this suggested tutorial is awesome. Based on this, I have few questions, perhaps someone can help.

This function splits the data:

class SplitSampler(gluon.data.sampler.Sampler):
    """ Split the dataset into `num_parts` parts and sample from the part with index `part_index`
    Parameters
    ----------
    length: int
      Number of examples in the dataset
    num_parts: int
      Partition the data into multiple parts
    part_index: int
      The index of the part to read from
    """
    def __init__(self, length, num_parts=1, part_index=0):
        # Compute the length of each partition
        self.part_len = length // num_parts
        # Compute the start index for this partition
        self.start = self.part_len * part_index
        # Compute the end index for this partition
        self.end = self.start + self.part_len

    def __iter__(self):
        # Extract examples between `start` and `end`, shuffle and return them.
        indices = list(range(self.start, self.end))
        random.shuffle(indices)
        return iter(indices)

    def __len__(self):
        return self.part_len
  1. The case where the data cannot be evenly divided with the number of workers is not capture from this function. The DataLoader provides the option last_batch but I don’t know how this works with sampler in the distributed setting. I can always modify the self,start and self.end indices to accurately describe the dataset but I don’t know how this will behave in the distributed setting. For example, from the function above I see that shuffling is always happening in the partition of data that belongs to a specific machine (part_index). So I must always shuffle within the range of indices that belong to a particular worker.

  2. Since this is going to be a multiple machines/multiple gpu training, I am going to use dist_async (and not `dist_device_async’ - is this correct? based on the discussion here)

  3. A general question between modes dist_sync and dist_async: it is my understanding that dist_sync is used as in a single machine training context that uses multiple machines to increase the batch size (and aggregate gradients from all machines). So dist_sync is used when we want to increase the batch size. Now dist_async when used, updates the weights in each worker independently from other machines. So there is no increase in batch size for the gradient evaluation, however due to many machines, the model trains faster (as many more updates, as machines available). Is this correct? Could please someone verify? I am a bit confused based on the guidelines here.

Thank you very much for your time and apologies for silly questions (the inner geek speaks within me :slight_smile: )

  • You can let the last worker read the additional data in the end and set last_batch to rollover. That should work, though I haven’t tried it. But note that when other workers complete an epoch, the last worker wouldn’t have completed the epoch. You need to be careful about that if you are writing some logic at the end of every epoch.

  • There should be no difference in accuracy between dist_device_async and dist_async. In my experience I haven’t seen dist_device_* giving better performance than dist_*, probably because the parameters need to be sent through the ethernet anyway and that cost dominates). I would suggest sticking with ‘dist_async’ or ‘dist_sync’. This has the added advantage of freeing up some space in the GPU since you won’t be using the GPU space for parameter server.

  • That is correct. Note also that in ‘dist_async’ some updates from a worker could get overwritten by other workers since the update is not synchronous. Studies have shown faster convergence when using ‘dist_sync’. I’ve seen ‘dist_sync’ being used more frequently than ‘dist_async’. If it was me, I would use ‘dist_sync’ unless there is a specific reason to use ‘dist_async’ (like maybe I have too many machines and synchronization cost becomes the bottleneck. For small number of machines, this is not the case).

1 Like

Thank you very much @indu for your reply. Extremely appreciated.

I am doing tests with dist_async, because I want to keep the batch size small. LeCun suggests:

image

and this is why I want to experiment with small batch size first. It seems that the DCSAG is the best available optimizer for this at the moment.

Again, many thanks!
Foivos

3 Likes

Hi again, I have some more questions, as I am trying to make this run. I am working on an HPC cluster that is managed by SLURM environment. I have implemented parallel gpu training on a single node, so I can efficiently split and train my data etc. This is well tested. I can also run distributed hyper parameter optimization, using ray for hyperparameter optimization (works great). In this, each node is using 4 GPUs, however these runs are (with regards to mxnet) completely independent.

Now, l am trying to use mxnet/tools/launch.py to start distributed training of my model. I have the following problems/questions.

  1. The scripts I am using to launch my job is the following (currently testing):
#!/bin/bash -l

#SBATCH --job-name="DSTR"
#SBATCH --job-name="DSTR"
#SBATCH -t 00:03:30
#SBATCH --nodes=4
#SBATCH --cpus-per-task=28
#SBATCH --gres=gpu:4
#SBATCH --mem=128gb


./get_nodes_ip.sh > workers_ip.txt


srun python /data/dia021/Software/mxnet/tools/launch.py  -n $(wc -l < workers_ip.txt) -s  $(wc -l < workers_ip.txt) -H workers_ip.txt --sync-dst-dir /home/dia021/Projects/isprs_potsdam/distributed  --launcher ssh "python main.py"

The script get_nodes_ip.sh is a small script that writes on the ascii file the workers ip address (or node names, as suggested by the tutorial, I tried both). So for example, the contents of workers_ip.txt are

b001
b002
b003
b004

With this I am assigning 4 workers and 4 servers. It is my understanding that with this configuration I will have 4 nodes, each running a single parameter server and a single worker. However when I do that I end up with 16 instances of code running. To make this more clear, each run of main.py that I use to execute my code writes a directory where in there outputs files for monitoring training. I see a variable number of them, not 4 as I expected (sometimes I see 16 = 4x4, others 22 != 4x4). I suspect the problem may be the srun command, but I am not sure. So here is my question: the number of workers -n 4 refers to the total number of workers, not the number of workers per node, is this correct? The same for the servers, -s 4 is the total number of (parameter) servers, right?

  1. If I understand correctly, the tool /mxnet/tools/launch.py is supposed to launch as many processes as there are workers. So in this particular example I will have 4 instances of my code (as I expect), each consuming 4 gpus per worker. So I suspect there is definitely something wrong going on between slurm and launch.py, right?

  2. Following the above, the “print screen” message I get after launching the job until is killed is

2018-07-13 20:12:50,786 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-13 20:12:50,855 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-13 20:12:50,867 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-13 20:12:50,868 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b001:/home/dia021/Projects/isprs_potsdam/distributed

what is weird in this message (to me) is that all 4 lines refer to the same node. However this is not always the case (some times I may have a line refering to a different node). Is this normal?

  1. Despite the fact that I do see directories being created, there is no content in them. Normally my script writes a txt file that has all information but now nothing is happening. Any ideas?

  2. Is there a tutorial where I can launch mxnet distributed from within a python environment? Any pointers most appreciated. In addition, could I use another manager for distributed training - say spark or ray - with the same efficiency as dlmc? Or mxnet performance will drop and is best to use dlmc?

Thank you very much for your time.

update: Hi all, some more info as I am trying to understand this.

a) If I remove the srun command, and use the default cifar10_dist.py file, everything runs fine, distributed is working (woohooooooo!!). This is now my launch file:

#!/bin/bash -l

#SBATCH --job-name="DSTR"
#SBATCH -q express
#SBATCH -t 02:10:30
#SBATCH --nodes=4
#SBATCH --cpus-per-task=28
#SBATCH --gres=gpu:4
#SBATCH --mem=128gb


./get_nodes_names.sh > workers_ip.txt

python /data/dia021/Software/mxnet/tools/launch_slurm_foivos.py  -n $(wc -l < workers_ip.txt) -s  $(wc -l < workers_ip.txt) -H workers_ip.txt   --sync-dst-dir /home/dia021/Projects/isprs_potsdam/distributed   --launcher ssh "python cifar10_dist.py"

b) If I change line 29 from store = kv.create('dist') to store = kv.create('dist_async') I get the following error (running on 4 nodes, with 4 GPUs each):

2018-07-23 17:09:19,062 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b010:/home/dia021/Projects/isprs_potsdam/distributed
2018-07-23 17:09:19,407 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b022:/home/dia021/Projects/isprs_potsdam/distributed
Warning: Permanently added 'b022,10.141.1.88' (ECDSA) to the list of known hosts.
2018-07-23 17:09:19,763 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b033:/home/dia021/Projects/isprs_potsdam/distributed
Warning: Permanently added 'b033,10.141.1.99' (ECDSA) to the list of known hosts.
2018-07-23 17:09:20,125 INFO rsync /home/dia021/Projects/isprs_potsdam/distributed/ -> b083:/home/dia021/Projects/isprs_potsdam/distributed
[17:09:30] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:107: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[17:09:30] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:107: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[17:09:31] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:107: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
[17:09:31] src/operator/nn/./cudnn/./cudnn_algoreg-inl.h:107: Running performance tests to find the best convolution algorithm, this can take a while... (setting env variable MXNET_CUDNN_AUTOTUNE_DEFAULT to 0 to disable)
terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:09:33] src/kvstore/././kvstore_dist_server.h:295: Check failed: sync_mode_ Updater needs to be set for async mode

Stack trace returned 10 entries:
[bt] (0) /data/dia021/Software/mxnet/libmxnet.so(+0x36003a) [0x2aaab332703a]
[bt] (1) /data/dia021/Software/mxnet/libmxnet.so(+0x360671) [0x2aaab3327671]
[bt] (2) /data/dia021/Software/mxnet/libmxnet.so(+0x295a209) [0x2aaab5921209]
[bt] (3) /data/dia021/Software/mxnet/libmxnet.so(+0x295f598) [0x2aaab5926598]
[bt] (4) /data/dia021/Software/mxnet/libmxnet.so(+0x295fbb4) [0x2aaab5926bb4]
[bt] (5) /data/dia021/Software/mxnet/libmxnet.so(+0x293b61b) [0x2aaab590261b]
[bt] (6) /data/dia021/Software/mxnet/libmxnet.so(+0x2e47285) [0x2aaab5e0e285]
[bt] (7) /data/dia021/Software/anaconda3/bin/../lib/libstdc++.so.6(+0xafc5c) [0x2aaaf08b0c5c]
[bt] (8) /lib64/libpthread.so.0(+0x8724) [0x2aaaaacd6724]
[bt] (9) /lib64/libc.so.6(clone+0x6d) [0x2aaaaafd4c9d]


terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:09:33] src/kvstore/././kvstore_dist_server.h:295: Check failed: sync_mode_ Updater needs to be set for async mode

Stack trace returned 10 entries:
[bt] (0) /data/dia021/Software/mxnet/libmxnet.so(+0x36003a) [0x2aaab332703a]
[bt] (1) /data/dia021/Software/mxnet/libmxnet.so(+0x360671) [0x2aaab3327671]
[bt] (2) /data/dia021/Software/mxnet/libmxnet.so(+0x295a209) [0x2aaab5921209]
[bt] (3) /data/dia021/Software/mxnet/libmxnet.so(+0x295f598) [0x2aaab5926598]
[bt] (4) /data/dia021/Software/mxnet/libmxnet.so(+0x295fbb4) [0x2aaab5926bb4]
[bt] (5) /data/dia021/Software/mxnet/libmxnet.so(+0x293b61b) [0x2aaab590261b]
[bt] (6) /data/dia021/Software/mxnet/libmxnet.so(+0x2e47285) [0x2aaab5e0e285]
[bt] (7) /data/dia021/Software/anaconda3/bin/../lib/libstdc++.so.6(+0xafc5c) [0x2aaaf08b0c5c]
[bt] (8) /lib64/libpthread.so.0(+0x8724) [0x2aaaaacd6724]
[bt] (9) /lib64/libc.so.6(clone+0x6d) [0x2aaaaafd4c9d]


terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:09:33] src/kvstore/././kvstore_dist_server.h:295: Check failed: sync_mode_ Updater needs to be set for async mode

Stack trace returned 10 entries:
[bt] (0) /data/dia021/Software/mxnet/libmxnet.so(+0x36003a) [0x2aaab332703a]
[bt] (1) /data/dia021/Software/mxnet/libmxnet.so(+0x360671) [0x2aaab3327671]
[bt] (2) /data/dia021/Software/mxnet/libmxnet.so(+0x295a209) [0x2aaab5921209]
[bt] (3) /data/dia021/Software/mxnet/libmxnet.so(+0x295f598) [0x2aaab5926598]
[bt] (4) /data/dia021/Software/mxnet/libmxnet.so(+0x295fbb4) [0x2aaab5926bb4]
[bt] (5) /data/dia021/Software/mxnet/libmxnet.so(+0x293b61b) [0x2aaab590261b]
[bt] (6) /data/dia021/Software/mxnet/libmxnet.so(+0x2e47285) [0x2aaab5e0e285]
[bt] (7) /data/dia021/Software/anaconda3/bin/../lib/libstdc++.so.6(+0xafc5c) [0x2aaaf08b0c5c]
[bt] (8) /lib64/libpthread.so.0(+0x8724) [0x2aaaaacd6724]
[bt] (9) /lib64/libc.so.6(clone+0x6d) [0x2aaaaafd4c9d]


terminate called after throwing an instance of 'dmlc::Error'
  what():  [17:09:33] src/kvstore/././kvstore_dist_server.h:295: Check failed: sync_mode_ Updater needs to be set for async mode

Stack trace returned 10 entries:
[bt] (0) /data/dia021/Software/mxnet/libmxnet.so(+0x36003a) [0x2aaab332703a]
[bt] (1) /data/dia021/Software/mxnet/libmxnet.so(+0x360671) [0x2aaab3327671]
[bt] (2) /data/dia021/Software/mxnet/libmxnet.so(+0x295a209) [0x2aaab5921209]
[bt] (3) /data/dia021/Software/mxnet/libmxnet.so(+0x295f598) [0x2aaab5926598]
[bt] (4) /data/dia021/Software/mxnet/libmxnet.so(+0x295fbb4) [0x2aaab5926bb4]
[bt] (5) /data/dia021/Software/mxnet/libmxnet.so(+0x293b61b) [0x2aaab590261b]
[bt] (6) /data/dia021/Software/mxnet/libmxnet.so(+0x2e47285) [0x2aaab5e0e285]
[bt] (7) /data/dia021/Software/anaconda3/bin/../lib/libstdc++.so.6(+0xafc5c) [0x2aaaf08b0c5c]
[bt] (8) /lib64/libpthread.so.0(+0x8724) [0x2aaaaacd6724]
[bt] (9) /lib64/libc.so.6(clone+0x6d) [0x2aaaaafd4c9d]

any idea what is going wrong? I installed mxnet with pip install -t . mxnet-cu91 --pre I am mostly interested in the async distributed mode, so any help most appreciated.

edit: I fixed this problem for the ‘dist_async’ case by following information here, namely I replaced the lines:

trainer = gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': .001}, kvstore=store)

with

optimizer = mx.optimizer.DCASGD(momentum=0.8, learning_rate=0.01)
store.set_optimizer(optimizer)
trainer = gluon.Trainer(net.collect_params(), optimizer, kvstore=store,  update_on_kvstore=True) ## This looks like working ...

edit see also issue #11910

c) I am doing some more tests to understand why my code was failing and I’ll get back with more feedback / questions. In my attempt I am wrapping everything (including the kv-store) in a class, so maybe this is the problem.

Thank you all in advance,
Foivos

1 Like

Thanks @feevos for keeping us updated with the progress on the distributed training :+1:

1 Like

Thank you guys for all your help, it would never be possible for me to go parallel/distributed without your support.
I’ll keep updating and present a full solution under SLURM once I have both sync and async training finalized. For now, I think there is a bug in async mode (I can train sync with the example provided), reported it here #11965.

Adding this for reference: under the SLURM manager, when I start a distributed training (kvstore = kv.create('dist') and for some reason the job get’s killed (e.g. time of calculation exceeds allocation time, a cuda malloc, anything except a normal completion of the job), the python process remain active and I need to ssh manually to each individual node and kill the processes (```kill -9 `pgrep python -u YourUserName````). This is the command of launching the distributed training:

python /SomeDir/Software/mxnet/tools/launch.py  -n $(wc -l < workers_ip.txt) -s  $(wc -l < workers_ip.txt)  -H workers_ip.txt   --sync-dst-dir /SomeDir/  --launcher ssh  "python main.py"

If I don’t, the gpu memory is not free and I have problems running my code (I assume other users too).

Hello again,

two more questions:

  1. When I am using the parameter sever model (based on the official mxnet tutorial - my full code is here) and I print the test loss/accuracy etc, these are different in different machines:
Epoch 0: Test_mcc 0.386387: test_Tnmt: 0.671969
Epoch 0: Test_mcc 0.370691: test_Tnmt: 0.693026

Question: is this happening because each worker has a different set of initial weights? Having followed the horovod distributed training tutorial (haven’t managed to make it work yet), they explicitely mention to broadcast parameters to all workers. Do we need to manually broadcast all parameters to all servers before training (or give the same seed to all machines?) or is this taken care of for us?

  1. In the same tutorial with horovod, they mention that using a ratio of server/worker ~ 2 for the parameter server model training gives better scaling performance. Is this universal (applies to most problems/training?).

Thank you for your time,
Foivos

  1. Hi @feevos, I think what you see is the result not of different parameters but of different data sent to each machine, producing each different accuracy results. For each epoch, each workers will process 1/n slice of the data, some slices will contain data that is throughout the epoch better classified by their local workers compared to others, hence the final epoch accuracy difference.

Using the same seed on each workers at the beginning of your training seems a reasonable thing to do, I don’t know if the parameters are broadcasted before the first batch, good question, I’ll dig on that.

  1. From all benchmark I have seen the higher the ratio PS/W the better result, however I have not seen benchmark with out of bound ratios like 10:1 where I think it could have a negative impact due to communication overhead.
1 Like

Thank you very much @ThomasDelteil !! :slight_smile:

@feevos, the collective hive mind solved it (thanks @thomelane for the pointer).

If you look at the kvstore comment for the init function: https://github.com/apache/incubator-mxnet/blob/master/include/mxnet/kvstore.h#L97

  • For multiple workers, all workers must call \ref Init. But only worker 0
  • (get_rank() == 0)‘s values are used for initialization. So others’ values
  • can be empty (but not keys). This function blocks until all workers are
  • finished. That means, any worker can push and pull on the keys now.

And you look at the _init_params() function of the trainer https://github.com/apache/incubator-mxnet/blob/master/python/mxnet/gluon/trainer.py#L153, the init function is called at the beginning.
The weights are set by the worker of rank 0 and then pulled by the others

1 Like

Hello again,

I finally managed to make mxnet+horovod work together (woohoooo - for now). Some questions:

  1. what is the operational difference when it comes to trainer.step(SomeBatchSize) between horovod.mxnet.DistributedTrainer and mxnet.gluon.Trainer? I read in the documentation (but I cannot really understand how this affects the usage) that:
# DistributedTrainer, a subclass of MXNet gluon.Trainer.
# There are two differences between DistributedTrainer and Trainer:
# 1. DistributedTrainer calculates gradients using Horovod allreduce
#    API while Trainer does it using kvstore push/pull APIs;
# 2. DistributedTrainer performs allreduce(summation) and average
#    while Trainer only performs allreduce(summation).

The first thing I note, based on comparison of the horovod mnist example is that in horovod we are using step(batch_per_worker_rank), i.e. the batch inside the step function is the one that is being seen by a single process. In the “standard” case (with the kvstore), assuming e.g. we had a single node with 4 gpus, we were using the total batch size (with all gpus), not just the batch_size per process. This is evident e.g. in the distributed training example with parameter-server.

# Train a batch using multiple GPUs
def train_batch(batch, ctx, net, trainer):

    # Split and load data into multiple GPUs
    data = batch[0]
    data = gluon.utils.split_and_load(data, ctx)
    
    # Split and load label into multiple GPUs
    label = batch[1]
    label = gluon.utils.split_and_load(label, ctx)

    # Run the forward and backward pass
    forward_backward(net, data, label)

    # Update the parameters
    this_batch_size = batch[0].shape[0]
    trainer.step(this_batch_size)

So in comparison with the kvstore, I see that in hvd we use hvd.trainer.step(batch_size_per_gpu), while kv.trainer.step(total_batch_size_over_all_gpus_for_a_single_node). Right?

  1. Can I still use the method of delayed gradient update (grad_req=‘add’ etc), in combination with horovod?

  2. If I want to load optimizer states trainer.load_states(filename_states), do I need to broadcast them to all other workers the same way I do with model parameters?

params = model.collect_params()
if params is not None:
    hvd.broadcast_parameters(params, root_rank=0)
  1. In the definition of the DataLoader, for pinned device memory, I assume I must use hvd.rank(), not hvd.local_rank(), right?
    Edit in issue 14136 user yuxihu mentions that we should use hvd.local_rank(). Could please someone explain this a bit more? Does kv.store.rank() provides the “same” ID as hvd.local_rank()
data_generator = gluon.data.DataLoader(some_dataset, 
    batch_size = batch_per_gpu,
    sampler = SplitSampler(len(some_dataset),
    hvd.size(), 
    hvd.rank),
    pin_memory=True, 
    pin_device_id = hvd.rank(),
    last_batch = 'discard',
    num_workers = 0)

Thank you very much for your time.

1 Like

Hi @feevos,

Congrats :wink:

There is one thing though, with Horovod the canonical way is to start one process per GPU, that way you don’t split_and_load but simply pass the right data straight to the GPU. You need to make sure you get the right data to the right GPU by using a combination of rank() and local_rank().

  1. I think you are correct, in horovod there is one process per GPU. That is why the step is only the local batch_size.
  2. Yes, it is independent of the optimizer and happens locally.
  3. Not sure, I think the states might be local to each process? Save and load them accordingly.
  4. See my intro, because of that you want to use indeed the local_rank() because each GPU will have its own process and need to be pinned accordingly.
1 Like

Thank you @ThomasDelteil, indeed, the greatest difficulty was to set up everything properly on the HPC environment. Critically, there was a bug due to different gcc compilers that was resolved few weeks ago, and made installation easy. I finally have a fully working version and it’s super fast. I think even for a single node is faster, but I need to do proper benchmarks for this. The official mxnet_mnist.py example on horovod is super awesome. I also found this tutorial on ring-allreduce extremely beneficial.

Leaving here for reference some tips for running Horovod + mxnet under SLURM manager (please correct me if you see anything weird/wrong). The corresponding modules loaded, may be different in different HPC environments, but this is my take (assuming you have everything installed and running).

slurm job submit file (if your HPC environment supports it):

#!/bin/bash

#SBATCH --job-name="HVDRC"

#SBATCH --nodes=12
#SBATCH -t 23:30:30
#SBATCH --cpus-per-task=4
#SBATCH --gres=gpu:4
#SBATCH --ntasks-per-node=4 ##### This should be EQUAL to the number of GPUs for the MPI, specifiying the gres=gpu:4 only doesn't work
#SBATCH --mem=32gb


##### Number of total processes 
echo " "
echo " Nodelist:= " $SLURM_JOB_NODELIST
echo " Number of nodes:= " $SLURM_JOB_NUM_NODES
echo " NGPUs per node:= " $SLURM_GPUS_PER_NODE 
echo " Ntasks per node:= "  $SLURM_NTASKS_PER_NODE
echo " "



#### Load modules that you used when installed horovod. 
module load cuda/9.2.88
module load nccl/2.3.7-cuda92 #### I am not sure I need it due to the allreduce mpi
module load cudnn/v7.5.0-cuda92
module load gcc/8.3.0
module load openmpi/4.0.0-simple-gcc ### Working 
module load hpc-x

# print on screen what you used
module list

####    Use MPI for communication with Horovod - this can be hard-coded during installation as well.
export HOROVOD_GPU_ALLREDUCE=MPI
export HOROVOD_GPU_ALLGATHER=MPI
export HOROVOD_GPU_BROADCAST=MPI

####   Produce a timeline for debugging purposes
####export HOROVOD_TIMELINE=./timeline.json ### Do not use for production runs, it produces very large files 
export NCCL_DEBUG=DEBUG


ulimit -s 20480 ####### Horovod recommends 10240 for this

echo "Running on multiple nodes and GPU devices"
echo ""
echo "Run started at:- "
date

##### Actual executable 
mpirun -np $SLURM_NTASKS  -bind-to none -map-by slot -x HOROVOD_TIMELINE  -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH  -mca pml ob1 -mca btl ^openib python ./main.py ######## SUCCESSS 

echo "Run completed at:- "
date

mxnet/gluon specific things I tweaked.

  1. As you said @ThomasDelteil, there is no split_and_load anymore. The whole code is like running on a single context.
  2. DataLoader for train and validation data (SplitSampler is the one from the parameter-server distributed tutorial - see above) with pinned memory (#14136):
data_generator = gluon.data.DataLoader(dataset,
batch_size = self[C.C_BATCH_SIZE],
sampler = SplitSampler(length=len(dataset),
num_parts=hvd.size(),
part_index=hvd.rank(),
shuffle=True),
# ******* fixes a bug causing segm fault ******
pin_memory=True,
pin_device_id = hvd.local_rank(),  # See issue 14136
# *********************************************
last_batch = 'discard',
num_workers = 0)# in all my tests, num_workers=0 is fastest. 
  1. All the horovod mxnet-related useful functions one can use are in horovod.mxnet.__init__.py and horovod.mxnet.mpi_tools.py. I found super useful the function allreduce (from mpi_tools) in order to calculate global/average statistics across all workers. So, assuming one calculates a validation loss/per worker (after the split in the validation data), then one can calculate the average of these losses with:
from horovod.mxnet.mpi_tools import allreduce

# This is per worker, returns nd.array of shape 1. 
valLoss = some_function_eval_val_loss()  
# the argument of allreduce must be an nd.array. 
# returns a scalar value, average of all losses.
valLoss = allreduce(valLoss,average=True).asscalar() 

you can then print only this average from the first node:

if hvd.rank() == 0:
    print ("avg validation loss:{}".format(valLoss))

And if you’ve defined a metric that uses _BinaryClassificationMetrics for binary classification, this function will come handy for calculating global statistics. In my case I use MCC as metric.

# This is specific for metrics derived from the class _BinaryClassificationMetrics
# Construct tensor (i.e. nd.array) objects, on the same local context to feed into allreduce
# Note the square brackets [ ], these give tp.shape = [1]. If you ommit them - I did - you'll get an error. 
tp =  nd.array([mcc._metrics.global_true_positives], ctx = mx.cpu(hvd.local_rank()) )
tn =  nd.array([mcc._metrics.global_true_negatives], ctx = mx.cpu(hvd.local_rank()) )
fp =  nd.array([mcc._metrics.global_false_positives], ctx = mx.cpu(hvd.local_rank()) )
fn =  nd.array([mcc._metrics.global_false_negatives], ctx = mx.cpu(hvd.local_rank()) )

# Some over all true positives/negatives etc to get global stats.  
tp = allreduce(tp,average=False).asscalar()
tn = allreduce(tn,average=False).asscalar()
fp = allreduce(fp,average=False).asscalar()
fn = allreduce(fn,average=False).asscalar()

# The definition of accuracy, mcc, precision, recall can be found on wiki (among other places)
# these are functions you need to define, e.g. 
def accuracy(tp,tn,fp,fn):
    num = tp+tn
    denum = num+fp+fn
    return num/denum


def mcc(tp,tn,fp,fn):

    terms = [(tp+fp),(tp+fn),(tn+fp),(tn+fn)]
    denom = np.prod(terms)
    return (tp*tn - fp*fn)/np.sqrt(denom)

def precision(tp,tn,fp,fn):
    return tp/(tp+fp)

def recall(tp,tn,fp,fn):
    return tp/(tp+fn)
# put everything in a dict for saving history (at least, that's how I use it).
kwards = dict()
metric_name, metric_value = mcc.get() # This provides per worker statistics
kwards[metric_name ]         = metric_value
kwards['global_acc']          = accuracy(tp,tn,fp,fn)     # global accuracy
kwards['global_mcc']           = mcc(tp,tn,fp,fn)          # global mcc
kwards['global_precision']   = precision(tp,tn,fp,fn)    # global precision 
kwards['global_recall']         = recall(tp,tn,fp,fn)       # global recall

# Now kwards holds all statistics. 
  1. When loading parameters from a file (for a neural network), I currently load them in all workers. I think the recommended approach is to load once (on first node - hvd.rank() == 0) and then broadcast but it hasn’t worked for me (probably some silly bug). I don’t think it’s a huge issue (computationally wise).

  2. The definition of context for heavy computation is (assuming we want to take advantage of gpus)

local_context = mx.gpu(hvd.local_rank()) if len( list( mx.test_utils.list_gpus()) ) > 0 else mx.cpu(hvd.local_rank())

For metric evaluations, or other local operations (e.g. true positive above), it is unnecessary to copy labels (or tensors) into gpu, use mx.cpu(hvd.local_rank()). So, in the definition of accuracy evaluation in the official mxnet_mnist.py example:

def evaluate(model, data_iter, context):
    data_iter.reset()
    metric = mx.metric.Accuracy()
    for _, batch in enumerate(data_iter):
        data = batch.data[0].as_in_context(context)
        #label = batch.label[0].as_in_context(context) # this is an unnecessary copy
         label = batch.label[0]
        output = model(data.astype(args.dtype, copy=False))
        metric.update([label], [output]) #  is copied back to cpu() internally
    return metric.get()

You can see this in the official mxnet accuracy function, the lines:

def update(self, labels, preds):
        """Updates the internal evaluation result.

        Parameters
        ----------
        labels : list of `NDArray`
            The labels of the data with class indices as values, one per sample.

        preds : list of `NDArray`
            Prediction values for samples. Each prediction value can either be the class index,
            or a vector of likelihoods for all classes.
        """
        labels, preds = check_label_shapes(labels, preds, True)

        for label, pred_label in zip(labels, preds):
            if pred_label.shape != label.shape:
                pred_label = ndarray.argmax(pred_label, axis=self.axis)
            pred_label = pred_label.asnumpy().astype('int32')
            label = label.asnumpy().astype('int32') # <============ here ===

hope it’ll prove useful to someone out there :).

Many thanks to the community for all the help.

1 Like

Thanks for sharing your learnings and code @feevos, much appreciated since we see a lot of users trying Horovod for their distributed training as well.

here is a diagram by the great @thomelane that explain the ring all-reduce on a 2-node 3-GPU each setup. Horovod uses NCCL on each node to communicate between GPU and MPI across nodes.

1 Like

@feevos @ThomasDelteil I don’t get why in Horovod would the step use only local batch vs cluster-level batch like in parameter server? The avg gradient computed in the ring-allreduce represents a full cluster-level batch, with chunks coming from all nodes, so the step should use the whole cluster-level batch no?

1 Like

There are a few things that play a role when you do the update:

  • Learning rate: Did you scale your learning rate linearly with your batch size?
  • Loss: Did you average your loss or summed it?
  • Gradient Aggregation: Are you averaging your gradients or summing them?

Depending on what you do here, the value of your step might change.

In this case we scale the learning rate by the overall number of workers: 'learning_rate':args.lr * hvd.size()

In this case we don’t average the local loss before calling backward (equivalent to summing the gradients), so we need to normalize it by the local batch_size. https://github.com/apache/incubator-mxnet/blob/master/python/mxnet/gluon/trainer.py#L317

So now each worker will produce locally normalized gradients that are going to be averaged by horovod and then the optimizer will use a learning rate commensurate with the overall batch size and “reliability” of the gradient update step.

1 Like

Hi again,

I am exploring the option of using horovod for a mixture of model and data parallelism in adversarial training. The use case of a single node program will be:

  1. Define two models (and thus, two trainer objects, one for each model), Generator(mx.gpu(0),mx.gpu(1)) and Discriminator (mx.gpu(2),mx.gpu(3)).
  2. For each node, use a slice of the data to evaluate gradients for G and D.
  3. Communicate/average gradients across different nodes (machines) with horovod.

Any tips on how I can achieve this? In particular:

  1. in the examples currently we are using context = mx.gpu(hvd.local_rank()) How one should modify this context (now we need contextG and contextD) to encapsulate - say - the splitting of model G on first two GPUs, and D on the other two?
  2. What would be the appropriate launch command in this scenario?

E.g. how one could modify the mxnet_mnist.py example, to run two independent models distributed say in 4 machines, for which in a single machine model1 uses 2 gpus, and model2 the other two, and both are trained on portions of the data with distributed parallelism?

If I get there first, I’ll post the solution here too. Again, thank you for your time.

edit: relative reference (model parallelism + horovod): https://github.com/horovod/horovod/issues/96

Hi to all,

I am trying to train a GAN system (basically a domain adaptation system, that is based on he GAN philosophy) with mxnet (nope, I don’t have 2nd order gradients yet …), and I am facing a problem with Horovod (DUPLICATE_NAME_ERROR). I document this error in issue 1679.

I basically try to use model parallelism and distributed training. So, I define Generator G on even number of GPUs, and Discriminator T on odd. So, there are two trainers, each living on different contexts.

Any ideas how I can solve this? I think it has to do something with the naming (string) in net.collect_params() or something …

Thank you very much for your time.