Distributed Training / Model Parallelism with sparse embeddings in Gluon

Hello,

I’m trying to build a recommender model in Gluon. The model learns user and item embeddings from a co-occurrence matrix, similar to the example here: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_applying_machine_learning/gluon_recommender_system/recommender.py The training data is in the format <UserID: int32, ItemID: int32, Score: float32>

My user embedding matrix is too large to fit into the memory of a single GPU so my plan was to partition co-occurrences by UserID and only store embeddings for a given user on a subset of the GPUs (probably just 1). My hope was to use sparse weights indexed by the original UserID to avoid having to manage a separate mapping from UserID to local GPU index which would be required if I were to use dense embeddings on each GPU, and I think this would become even harder to manage if I wanted to assign a user to more than 1 GPU.

I have some proof of concept code which tries to use mxnet.gluon.contrib.nn.SparseEmbedding for this but I can’t get it to work, or figure out if it’s even possible. If someone could review what I’m trying to do and provide guidance that would be much appreciated!

import mxnet as mx
from mxnet.gluon.contrib.nn import SparseEmbedding

# Weight initializer from a pre-determined NDArray
class NDArrayWeightInitializer(mx.init.Initializer):
  def __init__(self, nd_array):
    super(NDArrayWeightInitializer, self).__init__()
    self._nd_array = nd_array
  def _init_weight(self, _, arr):
    self._nd_array.copyto(arr)

# Total number of users across all contexts. User IDs range from [0, total_users - 1]
total_users = 100000000
embedding_dimensionality = 100

# User IDs that are assigned to this context
# (I do not want their embeddings to be replicated to all contexts)
user_ids_in_context = mx.nd.array([0,2,4,6,8,10])

# Create initial sparse weights (ones for users embedded in this context)
dense_weights_for_users_in_context = mx.nd.ones((user_ids_in_context.shape[0], embedding_dimensionality))
sparse_weights_for_users_in_context = mx.nd.sparse.row_sparse_array((dense_weights_for_users_in_context, user_ids_in_context), shape=(total_users, embedding_dimensionality))

# Create sparse embedding layer and initialize weights for users in this context
layer = SparseEmbedding(total_users, embedding_dimensionality)
layer.initialize(init=NDArrayWeightInitializer(sparse_weights_for_users_in_context))

trainer = mx.gluon.Trainer(layer.collect_params(), 'sgd')

print(layer.weight.row_sparse_data(row_id=user_ids_in_context).indices)
print(layer.weight.row_sparse_data(row_id=user_ids_in_context).data)

# Minibatch of training data contains a subset of the users that are embedded in this context
user_ids_in_minibatch = mx.nd.array([0,4,8])
with mx.autograd.record():
  y = layer(user_ids_in_minibatch)
  y.backward()

trainer.step(user_ids_in_minibatch.shape[0])

# Check gradients are properly set for users in minibatch
print(layer.weight.grad().indices)
print(layer.weight.grad().data)

# Check weights are updated for users in minibatch 
print(layer.weight.row_sparse_data(row_id=user_ids_in_context).data)

When I run this, the last line fails with an error related to an implementation gap in SGDUpdate:

print(layer.weight.row_sparse_data(row_id=user_ids_in_context).data)
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
  File "/usr/local/lib/python3.6/dist-packages/mxnet/ndarray/sparse.py", line 730, in data
    return self._data()
  File "/usr/local/lib/python3.6/dist-packages/mxnet/ndarray/sparse.py", line 268, in _data
    self.wait_to_read()
  File "/usr/local/lib/python3.6/dist-packages/mxnet/ndarray/ndarray.py", line 1806, in wait_to_read
    check_call(_LIB.MXNDArrayWaitToRead(self.handle))
  File "/usr/local/lib/python3.6/dist-packages/mxnet/base.py", line 252, in check_call
    raise MXNetError(py_str(_LIB.MXGetLastError()))
mxnet.base.MXNetError: [16:50:32] src/operator/contrib/.././operator_common.h:477: Check failed: arr.storage_shape()[0] == arr.shape()[0] SGDUpdate for RowSparse weights is only implemented for RowSparse weights with all rows containing non-zeros. Expects weights.data.shape[0] (6) == weights.shape[0] (100000000).

I’ve edited my post to contain a more minimal example now that I’ve gotten past the initialization problem. I was able to properly initialize sparse weights in the Gluon parameter without hitting the OOM issue, but I think I’m now totally blocked by the error: “mxnet.base.MXNetError: [21:43:29] src/operator/contrib/…/./operator_common.h:477: Check failed: arr.storage_shape()[0] == arr.shape()[0] SGDUpdate for RowSparse weights is only implemented for RowSparse weights with all rows containing non-zeros. Expects weights.data.shape[0] (6) == weights.shape[0] (100000000).”

Hi @jschmitz28 how large is your embedding layer?
If you use nn.SparseEmbedding, the embedding parameters are stored in the kvstore, and during forward, a subset of the weight is pulled from the kvstore to the current GPU context.
The weight stored in the kvstore is assumed to be dense, that’s why you’re seeing the OOM issue.

Are you trying to avoid managing the mapping between which user is mapped to which GPU? If you create multiple nn.SparseEmbedding blocks, with each one of them assigned with a particular shard, the OOM issue can be curcumvented, as kvstore tends to store parameters across devices evenly.