The comm_buf_copy[key] is introduced to keep the updated parameters from the server. You can check my last discussion and the figure here is just makeup.
Note: I used the first 600 batches for warming up, and when I pull back the updated parameters every 3 times, the training result converges to the target accuracy. However, when I pull back the parameters every 4 times, the training accuracy decreases to ~0.1, even though the accuracy increases in the warming up phase.
In my view, the accuracy should not decrease after the warming up phase, what’s wrong with the values in comm_buf_copy[key] When I pull the updated parameter every 4 times, the comm_->Broadcast(key, comm_buf_copy[key], grouped_vals[i* *], priority ) operation does not pass the correct values to the arg_list or the values of comm_buf[key] is changed?
Can you share your full code and how you are updating this value pull_firstst_time
. Are you sure you push all the parameters every 4 times? I also think there should be an easier way for you to achieve that without modifying the cpp code. For example you could have 2 trainers one with a device kvstore one with a dist kvstore and update the dist kvstore only every n times.
The pull_first_time is increased by 1 after the pull operation, and its initial value is 0. In fact, the parameters are pushed every time, and I chose to pull back the update parameters every 4 times. However, every 3 times is ok and every 4 times makes the training wrong.
for (size_t i = 0; i < uniq_keys.size(); ++i) {
int key = uniq_keys[i];
// use the same array for merging to guarantee that pull always happens
// after the previous push on this key
auto& recv_buf = comm_buf_[key];
auto& recv_buf_copy = comm_buf_copy[key]; // xym edit 4-8
const auto storage_type = grouped_vals[i][0]->storage_type();
CHECK_EQ(storage_type, kDefaultStorage)
<< "Expected stype of value to be kDefaultStorage";
if (recv_buf.is_none()) {
// it may happen for the first time a no-rank-0 worker pull the weight.
recv_buf = NDArray(grouped_vals[i][0]->shape(), pinned_ctx_,
true, grouped_vals[i][0]->dtype());
}
if (recv_buf_copy.is_none()) {
// it may happen for the first time a no-rank-0 worker pull the weight.
recv_buf_copy = NDArray(grouped_vals[i][0]->shape(), pinned_ctx_,
true, grouped_vals[i][0]->dtype());
}
auto pull_from_servers = [this, key, recv_buf](
RunContext rctx, Engine::CallbackOnComplete cb) {
// convert to ps keys
size_t size = recv_buf.shape().Size();
const int dtype = recv_buf.dtype();
const int num_bytes = mshadow::mshadow_sizeof(dtype);
PSKV& pskv = (gradient_compression_->get_type() == CompressionType::kNone) ?
EncodeDefaultKey(key, size, num_bytes) :
EncodeCompressedKey(key, size, false, num_bytes);
char* data = static_cast<char*> (recv_buf.data().dptr_);
// false means not to delete data when SArray is deleted
auto vals = new ps::SArray<char>(data, size * num_bytes, false);
// issue pull
RequestType mode = (gradient_compression_->get_type() != CompressionType::kNone) ?
RequestType::kCompressedPushPull : RequestType::kDefaultPushPull;
const int cmd = GetCommandType(mode, dtype);
CHECK_NOTNULL(ps_worker_)->ZPull(
pskv.keys, vals, &pskv.lens, cmd, [vals, cb](){ delete vals; cb(); });
};
if(pull_firtst_time/parameter_count < 600) {
CHECK_NOTNULL(Engine::Get())->PushAsync(
pull_from_servers,
pinned_ctx_,
{},
{recv_buf.var()},
FnProperty::kNormal,
priority,
"KVStoreDistDefaultStoragePull");
CopyFromTo(recv_buf, &recv_buf_copy, priority);
} else {
if(pull_firtst_time/parameter_count % 3 == 0) {
CHECK_NOTNULL(Engine::Get())->PushAsync(
pull_from_servers,
pinned_ctx_,
{},
{recv_buf.var()},
FnProperty::kNormal,
priority,
"KVStoreDistDefaultStoragePull");
CopyFromTo(recv_buf, &recv_buf_copy, priority);
}
}
comm_->Broadcast(key, comm_buf_copy[key], grouped_vals[i], priority);
pull_firtst_time += 1;
}
}
Thanks, Thomas.
I’m quite interested in your suggestion about setting 2 trainers, one with a device kvstore and one with a dist kvstore. Where should I modify then? Would you please give me some more details.