0 x00 the

Parameter server is a kind of paradigm of machine learning and training, in order to solve the problem of distributed machine learning a programming framework, mainly including the server, client and scheduler, compared with other paradigm, store model parameters and parameter server update as main component, and use various methods to improve the ability to handle.

This article is the first part of the parameter server series, which introduces the overall design of PS-Lite and the basic module Postoffice.

0 x01 profile

1.1 What is a Parameter Server

By analogy, a parameter server is a distributed in-memory database for machine learning, storing and updating models.

So let’s look at a couple of steps in machine learning, and these steps go on and on and on.

  1. Prepare data: Training process gets weight weight and data (data + label);
  2. Forward calculation: The training process uses data for forward calculation to obtain loss = f(weight, data & Label);
  3. Reverse derivative: Through reverse derivative of loss, derivative grad = b(loss, weight, data & Label) is obtained;
  4. Update weight: weight -= grad * lr;
  5. Go to 1 and do the next iteration;

If the parameter server is used for training, we can correspond the above steps as follows:

  1. Parameter delivery: The parameter server server sends the weight to each worker (or the worker pulls it by itself), and the worker is the parameter server Client;
  2. Parallel computation: each worker completes its own computation (including forward computation and reverse derivation);
  3. Grad collection: The parameter server server obtains grad from each Worker and completes the merge (or the Worker pushes it by itself);
  4. Update weight: The parameter server applies grad to weight by itself;
  5. Go to 1 and do the next iteration;

The details are as follows:

FP/BP +--------+ Gather/Sum FP/BP +-------+ Gather/Sum +----------> | grad 1 +------+ +----------------------> |grad 2 +-----------+ | +--------+ | | +-------+ | +-----+----+ v +--------------+-------------------+ v | | +---+----------+ Update | | +------+-----+ Update +------------------+ | weight 1 | | total grad 1 +--------->+weight 2 = weight 1 - total grad 1| |total grad 2+--------> |weight 2 = ...... | | | + + -- -- -- -- -- -- -- -- -- -- -- -- -- + | | + -- -- -- -- -- - + + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + + - + - + ^ + -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + ^ | FP/BP +--------+ | | FP/BP +-------+ | +----------> | grad 2 +------+ +----------------------> |grad 2 +-----------+ +--------+ Gather/Sum +-------+ Gather/SumCopy the code

Mobile phones are as follows:

Therefore, we can deduce the function of each module in the parameter server:

  • Server: Stores machine learning model parameters, receives the gradient sent by the client, completes merging, and updates local model parameters.

  • Client (or Worker) :

    • Get the latest parameters from the server;
    • Using the training data and the predicted values calculated from the latest parameters, the gradient of the training parameters is calculated according to the loss function.
    • Send the gradient to the server;
  • Scheduler: Manages server/client nodes, completes data synchronization between nodes, and adds/deletes nodes.

1.2 Tracing the history

Parameter servers belong to a paradigm of machine learning training, which can be divided into three generations (companies should have their own in-house latest implementations, which can be considered the fourth generation).

Prior to parameter servers, most distributed machine learning algorithms were implemented through periodic synchronization, such as all-Reduce for collection communication, or reduce steps for map-Reduce-class systems. But there are two problems with regular synchronization:

  • During synchronization, you can only synchronize, not train.
  • Straggler problem: Due to some hardware and software reasons, the computing power of nodes is often different. For the iteration problem, at the end of each round, the fast node has to wait for the slow node to finish the calculation, and then proceed to the next iteration. This wait becomes particularly noticeable as the number of nodes increases, slowing overall performance.

So, when async SGD came along, someone came up with parameter servers.

The concept of parameter server first came from the framework of parallel LDA proposed by Alex Smola in 2010. By using a distributed Memcached as the storage of shared parameters, it provides an effective mechanism for synchronizing model parameters among different workers in the distributed system, and each Worker only needs to save a small part of the parameters he has calculated. It also prevents all processes from stopping to synchronize at one point in time. However, the independent KV pair brings a lot of communication overhead, and the server is difficult to program.

In the second generation, Jeff Dean of Google further proposed the DistBelief solution of the first generation of Google brain: DistBelief. DistBelief distributes the huge deep learning model in the global parameter server, and compute nodes pass information through the parameter server, which well solves the distributed training problem of SGD and L-BFGS algorithm.

Then it is the parameter server designed by the DMLC group that Li Mu is in. According to the paper, the parameter server belongs to the third generation of parameter server, that is, it provides a more general design. The architecture consists of a Server Group and several Worker groups.

1.3 Paper Structure

Let’s first take a look at the architecture of the system using the diagram from Mu’s paper.

Explain each module in the overall architecture shown below:

  • Resource Manager: Resource allocation and manager. Parameter Servers use existing resource management systems in the industry, such as YARN and K8S.

  • Training data: Tens of billions of training data are generally stored in a distributed file system (such as HDFS), and the Resource Manager will evenly distribute it to each worker.

  • The node of parameter server is divided into a server group and multiple worker groups.

  • Server Group: Servers applied in a training task for updating model parameters and pull responses.

    • Each server in the Server group is only responsible for its own part of the global share parameters (the server maintains a global share parameters together), and the optimizer is generally implemented here.
    • Servers communicate with each other for parameter backup/migration.
    • Server Group has a Server Manager node, which is responsible for maintaining the consistency of server metadata, such as node status and parameter allocation. There is generally no logic, only adjustments to maintain a consistent hash when server nodes are joining or exiting.
  • Worker group: workers applied in a training task, used for forward process and gradient calculation.

    • Each worker group runs a computing task, and each worker in the worker group uses part of the data for training.
    • Divided into multiple groups to support multi-tasking parallel computing.
    • Each worker group has a Task Scheduler, which is responsible for assigning tasks to workers and monitoring their running status. When a worker enters or exits, the Task Scheduler reassigns unfinished tasks.
    • The worker does not communicate with each other, and only communicates with the corresponding server for parameter update.

In the distributed computing gradient, the data flow of the system is as follows:

The functions of each step in the figure are:

  1. Worker node calculates the gradient of model weight based on the samples in the batch.
  2. The worker pushes the gradient to the server in the form of key-value;
  3. The server updates the model weight gradient according to the specified optimizer;
  4. The worker pulls the latest model weight from the server;

The above two figures are based on their original code. Ps-lite is a later lite version of the code, so some features are not provided in Ps-Lite.

1.4 Development history of PS-Lite

Found some ps- Lite development history from the Internet, can see its evolution of the train of thought.

The first generation, Parameter, was designed and optimized for specific algorithms (such as logistic regression and LDA) to meet large scale industrial machine learning tasks (tens of billions of examples and 10-100TB data size capabilities).

Later attempts were made to build an open source generic framework for machine learning algorithms. The project is located in DMLC/parameter_Server.

In view of the growing needs of other projects, PS-Lite was created, which provides a clean data communication API and a lightweight implementation. The implementation is based on DMLC/parameter_Server, but refactoring job initiators, file IO, and machine learning algorithm code such as DMLC-core and Wormhole for different projects

Based on the experience learned during the development of DMLC/MXNET, the API and implementation were further refactored from V1. Key changes include:

  • Fewer library dependencies;
  • More flexible user-defined callbacks for binding to other languages;
  • Allowing users (such as MxNet’s dependency engine) to manage data consistency;

1.5 Overall PS-Lite system

Ps-lite is a framework for implementing Paramter Server. The user needs to implement the policies related to parameter processing.

Parameter Server consists of three roles: Worker, Server, and Scheduler. The specific relationship is as follows:

Specific role functions are as follows:

  • Worker: several, execute data pipeline, forward and gradient calculation, push model weight gradient to server node in the form of key-value and pull the latest model weight from server node;
  • Server (service node) : several, responsible for responding to worker’s push and pull requests, storing, maintaining and updating model weights for use by each worker (each server only maintains a part of the model);
  • Scheduler (controller node) : The system has only one controller node. Responsible for heartbeat monitoring for all nodes, node ID assignment, and communication establishment between worker and server, it can also be used to send control signals to other nodes and collect their progress.

The advantages of introducing scheduler are as follows:

  • If a scheduler module is introduced, a classic three-role distributed system architecture will be formed. The roles and responsibilities of the worker and server remain the same, while the Scheduler module has more options:

    • It only interacts with lower-layer resource scheduling systems such as Yarn and MESOS.
    • Additional functions of heartbeat monitoring and process control for worker and server are added;
  • Another benefit of introducing a Scheduler module is that it leaves room for implementation model parallelism;

  • Scheduler module is not only beneficial to the realization of model parallel training paradigm, but also has other benefits: for example, fine-grained scheduling of the parameter training process can further accelerate the convergence rate of the model and even have the opportunity to improve the model index by understanding the correlation of specific model parameters.

Students familiar with distributed systems may worry about the single point problem of scheduler module, which can be better solved through RAFT, ZAB and other PAXOS protocols.

1.6 Basic Modules

Some basic modules in the PS-Lite system are as follows:

  • Environment: a singleton Environment variable class that maintains a set of KVS to hold all Environment variable names and values via a STD ::unordered_map< STD ::string, STD ::string> KVS;

  • PostOffice: a singleton global management class, where a node has a PostOffice for its lifetime and relies on its class members to manage the node;

  • Van: communication module, responsible for network communication with other nodes and the actual sending and receiving of messages. PostOffice holds a Van member;

  • SimpleApp: Parent class of KVServer and KVWorker, it provides simple Request, Wait, Response, Process functions; KVServer and KVWorker have rewritten these features to suit their respective missions;

  • Customer: Each SimpleApp object holds a member of the Customer class that needs to be registered with PostOffice. This class is responsible for:

    • Track responses to messages sent by SimpleApp;
    • Maintains a message queue for a Node and receives messages for the Node.
  • Node: information, which stores information about the Node. Each Node is uniquely identified by hostname and port.

0x02 The system is Started

2.1 How Can I Start the System?

As you can see from the example in the source code, the entire system can be started using the script local.sh provided by Ps-Lite, where test_Connection is the compiled executable.

./local.sh 2 3 ./test_connection
Copy the code

2.2 Startup Script

The local.sh code is as follows: Note that in the shell script, there are three Shifts, which makes the $1 always used in the script.

For our example, the script parameter corresponds to

  • DMLC_NUM_SERVER for 2;
  • DMLC_NUM_WORKER for 3;
  • Bin is. / test_connection;

As you can see from the script, this script does two things:

  • Before each application execution, the environment variables are set according to the role of the execution. Except that DMLC_ROLE is set differently, the variables are the same on each node.

  • Running multiple different roles locally. In this way, PS-Lite uses several different processes (programs) to work together.

    • The Scheduler node is started first. While the number of servers and workers is fixed, the Scheduler node manages all node addresses.
    • Start the Worker or Server node. Each node needs to know the IP address and port of the Scheduler node. The Scheduler node is connected at startup, bound to local ports, and registers itself with the Scheduler node (reporting its IP address, port).
    • Scheduler waits for all Worker nodes to register, assigns ids to them, and transmits node information (for example, Worker nodes need to know the IP and port of Server nodes, and Server nodes need to know the IP and port of Worker nodes). The Scheduler node is ready.
    • The Worker or Server establishes the connection to the corresponding node after receiving the information sent by the Scheduler. At this point, the Worker or Server is ready and will be officially started.

The details are as follows:

#! /bin/bash # set -x if [ $# -lt 3 ]; then echo "usage: $0 num_servers num_workers bin [args..] " exit -1; Fi # performs various configurations for environment variables, Export DMLC_NUM_SERVER=$1 shift export DMLC_NUM_WORKER=$1 shift bin=$1 shift arg="$@" # start the DMLC_NUM_SERVER=$1 shift export DMLC_NUM_WORKER=$1 shift bin=$1 shift arg="$@" # start the Scheduler export DMLC_PS_ROOT_URI='127.0.0.1' export DMLC_PS_ROOT_PORT=8000 export DMLC_ROLE=' Scheduler '${bin} ${arg} &  # start servers export DMLC_ROLE='server' for ((i=0; i<${DMLC_NUM_SERVER}; ++i)); do export HEAPPROFILE=./S${i} ${bin} ${arg} & done # start workers export DMLC_ROLE='worker' for ((i=0; i<${DMLC_NUM_WORKER}; ++i)); do export HEAPPROFILE=./W${i} ${bin} ${arg} & done waitCopy the code

2.3 Sample Program

Let’s use the official example again.

Ps-lite uses C++ language, in which worker, server, and scheduler all use the same set of code. This is going to be very uncomfortable for those of you who are used to Java and Python, and there’s going to be a phase.

In the case of this sample program, it may initially be a bit confusing to see why scheduler, worker, and server are started in the code every time the program is run. In fact, as you can see from the comments below, the execution is determined by the environment variables. If the environment variable sets the role to server, the scheduler and worker will not be started.

#include <cmath> #include "ps/ps.h" using namespace ps; void StartServer() { if (! IsServer()) { return; } auto server = new KVServer<float>(0); server->set_request_handle(KVServerDefaultHandle<float>()); Functor RegisterExitCallback([server](){delete server; }); } void RunWorker() { if (! IsWorker()) return; KVWorker<float> kv(0, 0); // init int num = 10000; std::vector<Key> keys(num); std::vector<float> vals(num); int rank = MyRank(); srand(rank + 7); for (int i = 0; i < num; ++i) { keys[i] = kMaxKey / num * i + rank; vals[i] = (rand() % 1000); } // push int repeat = 50; std::vector<int> ts; for (int i = 0; i < repeat; ++i) { ts.push_back(kv.Push(keys, vals)); // kv.push () return timestamp of the request which leads huge memory usage if (i > 10) kv.Wait(ts[ts.size()-10]); } for (int t : ts) kv.Wait(t); // pull std::vector<float> rets; kv.Wait(kv.Pull(keys, &rets)); // pushpull std::vector<float> outs; for (int i = 0; i < repeat; ++i) { // PushPull on the same keys should be called serially kv.Wait(kv.PushPull(keys, vals, &outs)); } float res = 0; float res2 = 0; for (int i = 0; i < num; ++i) { res += std::fabs(rets[i] - vals[i] * repeat); res2 += std::fabs(outs[i] - vals[i] * 2 * repeat); } CHECK_LT(res / repeat, 1e-5); CHECK_LT(res2 / (2 * repeat), 1e-5); LL << "error: " << res / repeat << ", " << res2 / (2 * repeat); } int main(int argc, char *argv[]) { // start system Start(0); Postoffice::start() : Postoffice::start() : Postoffice::start() : Postoffice::start() : Postoffice::start() : Postoffice::start() : Postoffice::start() : Postoffice::start() : Postoffice::start(); // setup server nodes StartServer(); // The Server will do a valid execution, and the other nodes will not do a valid execution. // run worker nodes RunWorker(); // The Worker performs a valid execution on the node, but the other nodes do not. // stop system Finalize(0, true); / / end. This function needs to be executed on each node. return 0; }Copy the code

Where KVServerDefaultHandle is functor, which is used to process the requests received by the server from the worker, specifically as follows:

/** * \brief an example handle adding pushed kv into store */ template <typename Val> struct KVServerDefaultHandle { // Functor is used to process a request received by the server from the worker // req_meta is used to store some meta information about the request, such as which node the request came from, // req_data is the data that was sent // server is a pointer to the current server object void operator()(const KVMeta& req_meta, const KVPairs<Val>& req_data, KVServer<Val>* server) { size_t n = req_data.keys.size(); KVPairs<Val> res; if (! Req_meta-.vals) {// The pull request is CHECK_EQ(n, req_data.vals. Size ()); } else {res.keys = req_data.keys; res.vals.resize(n); } for (size_t i = 0; i < n; ++i) { Key key = req_data.keys[i]; If (req_meta-vals) {//push request store[key] += req_data.vals[I]; } if (req_meta-vals) {//pull request res.vals[I] = store[key]; } } server->Response(req_meta, res); } std::unordered_map<Key, Val> store; };Copy the code

0x03 Postoffice

Postoffice is a singleton mode global management class, which maintains a global system information, has the following characteristics:

  • All three Node roles depend on Postoffice for management, and each Node has a singleton Postoffice for its lifetime.

  • As we said earlier, pS-Lite features that worker, Server, and Scheduler all use the same set of code, as does Postoffice, so we’d better describe them separately.

  • In the Scheduler side, as the name implies, Postoffice is a Postoffice, which can be regarded as an address book and a control center, which records the information of all nodes in the system (the system composed of Scheduler, server and worker). The specific functions are as follows:

    • Maintain a Van object, responsible for the entire network pull up, communication, command management such as adding nodes, removing nodes, recovering nodes and so on;
    • Responsible for the management of the basic information of the whole cluster, such as the acquisition of worker and server number, the address management of all nodes, the acquisition of server feature distribution, the exchange of worker/server Rank and node ID, node role identity, etc.
    • Responsible for Barrier function;
  • On the Server/Worker side, responsible for:

    • Configure information about the current node, such as what type of node the current node is (server, worker), what the nodeID is, and worker/server rank to node ID conversion.
    • Routing: Responsible for the mapping between keys and servers.
    • The Barrier function;

Note that this code is all within Postoffice classes, not divided into modules by role.

3.1 define

The class UML diagram is as follows:

Below, we only show the key variables and member function descriptions. Since each node contains a PostOffice, the data structure of PostOffice contains the variables required by various nodes, which can be complicated.

The main variables function as follows:

  • Van_ : underlying communication object;
  • Customers_ : What customers are currently on this node;
  • Node_ids_ : Node ID mapping table;
  • Server_key_ranges_ : Server Key range object
  • is_worker , is_server, is_Scheduler_ : Specifies the node type;
  • Heartbeats_ : Indicates the heartbeat object of a node.
  • Barrier_done_ : Barrier synchronization variable;

The main functions are as follows:

  • InitEnvironment: initializes environment variables and creates van objects;
  • Start: establish communication initialization.
  • Finalize: Node blocking exit;
  • Manage: exit the barrier blocking state.
  • Barrier: enter the Barrier blocking state;
  • UpdateHeartbeat:
  • GetDeadNodes: Obtain dead nodes based on heartbeats_.

The details are as follows:

class Postoffice {
  /**
   * \brief start the system
   *
   * This function will block until every nodes are started.
   * \param argv0 the program name, used for logging.
   * \param do_barrier whether to block until every nodes are started.
   */
  void Start(int customer_id, const char* argv0, const bool do_barrier);
  /**
   * \brief terminate the system
   *
   * All nodes should call this function before existing.
   * \param do_barrier whether to do block until every node is finalized, default true.
   */
  void Finalize(const int customer_id, const bool do_barrier = true);
  /**
   * \brief barrier
   * \param node_id the barrier group id
   */
  void Barrier(int customer_id, int node_group);
  /**
   * \brief process a control message, called by van
   * \param the received message
   */
  void Manage(const Message& recv);
  /**
   * \brief update the heartbeat record map
   * \param node_id the \ref Node id
   * \param t the last received heartbeat time
   */
  void UpdateHeartbeat(int node_id, time_t t) {
    std::lock_guard<std::mutex> lk(heartbeat_mu_);
    heartbeats_[node_id] = t;
  }
  /**
   * \brief get node ids that haven't reported heartbeats for over t seconds
   * \param t timeout in sec
   */
  std::vector<int> GetDeadNodes(int t = 60);  
 private:  
 void InitEnvironment();  
  Van* van_;
  mutable std::mutex mu_;
  // app_id -> (customer_id -> customer pointer)
  std::unordered_map<int, std::unordered_map<int, Customer*>> customers_;
  std::unordered_map<int, std::vector<int>> node_ids_;
  std::mutex server_key_ranges_mu_;
  std::vector<Range> server_key_ranges_;
  bool is_worker_, is_server_, is_scheduler_;
  int num_servers_, num_workers_;
  std::unordered_map<int, std::unordered_map<int, bool> > barrier_done_;
  int verbose_;
  std::mutex barrier_mu_;
  std::condition_variable barrier_cond_;
  std::mutex heartbeat_mu_;
  std::mutex start_mu_;
  int init_stage_ = 0;
  std::unordered_map<int, time_t> heartbeats_;
  Callback exit_callback_;
  /** \brief Holding a shared_ptr to prevent it from being destructed too early */
  std::shared_ptr<Environment> env_ref_;
  time_t start_time_;
  DISALLOW_COPY_AND_ASSIGN(Postoffice);
}; 
Copy the code

3.2 ID Mapping

First, we will introduce the node ID mapping function, that is, how to map logical nodes and physical nodes, how to divide physical nodes into logical groups, and how to send messages to all physical nodes in the group in a simple way.

  • 1, 2, and 4 identify Scheduler, ServerGroup, and WorkerGroup respectively.
  • SingleWorker: Rank * 2 + 9; SingleServer: Rank * 2 + 8
  • Any set of nodes can be identified by a single ID equal to the sum of all ids.

3.2.1 concept

  • A Rank is a logical concept that is the only logical identifier inside each node (scheduler, work, server).

  • A Node ID is a unique identifier of a physical Node and uniquely corresponds to a binary group of host + port.

  • A Node Group is a logical concept. Each Group can contain multiple Node ids. Ps-lite consists of three groups: scheduler group, Server group, and Worker group.

  • The Node group ID is the unique identifier of a Node group.

    • Ps-lite uses the numbers 1, 2, and 4 to identify the Scheduler, ServerGroup, and WorkerGroup, respectively. Each number represents a set of nodes equal to the sum of all node ids of that type. For example, 2 represents server group, which is a combination of all server nodes.

    • Why these three numbers? Since the binary values are “001, 010, 100”, if you want to send messages to multiple groups, you can simply do or operate on several node group ids.

    • Any number between 1 and 7 represents some combination of Scheduler/ServerGroup/WorkerGroup.

      • If you want to send a request to all worker nodes, set the request target node ID to 4.

      • Suppose that a worker wants to send requests to all server nodes and scheduler nodes at the same time, it only needs to set the id of the requested target node to 3, because 3 = 2 + 1 = kServerGroup + kScheduler.

      • If you want to send messages to all nodes, set this parameter to 7.

3.2.2 Implementation of logical groups

The three logical groups are defined as follows:

/** \brief node ID for the scheduler */ static const int kScheduler = 1; /** * \brief the server node group ID * * group id can be combined: * - kServerGroup + kScheduler means all server nodes and the scheuduler * - kServerGroup + kWorkerGroup means all server  and worker nodes */ static const int kServerGroup = 2; /** \brief the worker node group ID */ static const int kWorkerGroup = 4;Copy the code
for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); }}Copy the code

As shown in the following code, if three workers are configured and the worker’s rank ranges from 0 to 2, the actual physical node IDS for these workers will be calculated using the WorkerRankToID.

The node ID is the unique identifier of the physical node, and the rank is the unique identifier of each logical concept (scheduler, work, server). These two notations are determined by an algorithm.

3.2.3 Rank vs Node ID

The node ID is the unique identifier of the physical node, and the rank is the unique identifier of each logical concept (scheduler, work, server). These two notations are determined by an algorithm.

As shown in the following code, if three workers are configured and the worker’s rank ranges from 0 to 2, the actual physical node IDS for these workers will be calculated using the WorkerRankToID.

for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); }}Copy the code

The specific calculation rules are as follows:

  /**
   * \brief convert from a worker rank into a node id
   * \param rank the worker rank
   */
  static inline int WorkerRankToID(int rank) {
    return rank * 2 + 9;
  }
  /**
   * \brief convert from a server rank into a node id
   * \param rank the server rank
   */
  static inline int ServerRankToID(int rank) {
    return rank * 2 + 8;
  }
  /**
   * \brief convert from a node id into a server or worker rank
   * \param id the node id
   */
  static inline int IDtoRank(int id) {
#ifdef _MSC_VER
#undef max
#endif
    return std::max((id - 8) / 2, 0);
  }
​
Copy the code
  • SingleWorker: Rank * 2 + 9;
  • SingleServer: Rank * 2 + 8;

The algorithm also guarantees that the server ID is even and the node ID is odd.

In this way, we know that the ids from 1 to 7 are node group, and the ids of individual nodes start from 8.

The specific calculation rules are as follows:

3.2.4 Group vs the node

Because requests are sometimes sent to more than one node, PS-Lite uses a map to store the actual set of node nodes for each node group/single node, which determines the set of node ids for each ID value.

std::unordered_map<int, std::vector<int>> node_ids_ 
​
Copy the code
for (int i = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); }}Copy the code

These five ids correspond to the node_ids_ mapping table and need to be added to the five items 4, 4 +1, 4 + 2, 4 +1 + 2, 12. Is the internal for loop condition in the code above. That is,node_ids_ [4], node_ids_ [5],node_ids_ [6],node_ids_ [7], and node_ids_ [12] need to be added to the end of the vector.

  • 12 (itself)
  • 4 (kWorkerGroup)
  • 4+1 (kWorkerGroup + kScheduler)
  • 4+2 (kWorkerGroup + kServerGroup)
  • 4+1+2, kWorkerGroup + kServerGroup + kScheduler

Therefore, in order to realize the function of “setting any number within 1-7 can be sent to all corresponding nodes”, for each new node, it needs to correspond to multiple ids (node, node group), which are the nodes that this node can communicate with. For example, for worker 2, its node ID is 2 * 2 + 8 = 12, so it needs to be matched with

  • Ids from 1 to 7 indicate node group.
  • Subsequent ids (8,9,10,11…) Represents a single node. Where even numbers 8,10,12… Worker 0, worker 1, worker 2… That is (2n + 8), 9,11,13… Server 0, server 1, server 2… (2n + 9);

Let’s recall the previous node information:

How do I use this node_ids_? We still need to look at the previous code:

3.3 Parameter Representation

Workers and servers communicate through push and pull. The worker sends the calculated gradient to the server through push, and then updates the parameters from the server through pull.

3.3.1 KV format

In parameter Server, parameters are all sets that can be expressed as (key, value). For example, in a problem of minimizing loss function, key is feature ID and value is its weight. For sparse parameters, if the key does not have value, value can be considered as 0.

The parameters are expressed as K-V, which is more natural and easy to understand and program.

3.3.2 rainfall distribution on 10-12 key values

Distributed algorithms have two additional costs: data communication costs, unsatisfactory load balancing, and synchronization costs due to machine performance differences.

For high-dimensional machine learning training, because high-frequency features are updated very frequently, the network will be under great pressure. If each parameter set a key and press the key update, then makes communication become more frequently inefficient, in order to recover this problem, you need to have a compromise and balance, i.e., using the characteristics of machine learning algorithms, for each key value gives a vector or of the corresponding matrix, so that it can be a one-time pass multiple parameters, The cost of convergence versus synchronization is weighed.

The premise of doing this is to assume that the parameters are ordered. The disadvantage is that for the sparse model, there will always be 0 parameters in the vector or matrix, which does not need to be stored in the state of single parameter, resulting in data redundancy.

But there are two benefits:

  • Reduced network traffic
  • The operation at the vector level becomes feasible, so that many optimization features of linear libraries can be utilized, such as BLAS, LAPACK, ATLAS, etc.

3.3.3 Range operation

In order to improve computing performance and bandwidth efficiency, the parameter server will also adopt batch update method to reduce the pressure of high frequency key. For example, merge the high frequency keys in miniBatch into a miniBatch for update.

Ps-lite allows users to use Range Push and Range Pull operations.

3.4 Routing Function (Keyslice)

The routing function refers to how the Worker knows which Servers to send messages to when doing Push/Pull.

As we know, PS-Lite is a multi-server architecture, and an important issue is how to distribute multiple parameters. For example, given the key of a parameter, how to determine which Server to store it on. Therefore, there must be a routing logic to establish the mapping between the key and the server.

PS Lite places the routing logic on the Worker side and adopts the strategy of scope division, that is, each Server has its own fixed key scope. This range is determined when the Worker starts. Here are the details:

  • The data type of the parameter’s key is determined by whether the macro USE_KEY32 is set when compiling PS Lite, which can be either a 32-bit unsigned integer or a 64-bit one.
  • Determine the upper bound on the range of the key based on its data type. For example the upper bound of uint32_t is 4294967295.
  • The range is divided by the upper bound of the key field and the number of servers fetched at startup (that is, the value of the environment variable DMLC_NUM_SERVER).
  • Each server maintains a key range of uint32_t/Uint64_t isometrically spaced between partitions. Given the upper bound MAX and the number of servers N, the i-th Server is responsible for the range[MAX/N*i, MAX/N*(i+1)).
  • The hash value of the key must be constructed to avoid key skew between servers (for example, 32-bit, 16-bit, 8-bit, 4-bit, and 2-bit high-low exchanges).
  • Worker push and pull keys are sliced in ascending order to achieve zero copy.

Note that in the case of not being exactly divisible, a small portion of the upper bound of the key field is discarded.

The specific implementation is as follows:

First, ps-Lite only supports int keys.

#if USE_KEY32
/*! \brief Use unsigned 32-bit int as the key type */
using Key = uint32_t;
#else
/*! \brief Use unsigned 64-bit int as the key type */
using Key = uint64_t;
#endif
/*! \brief The maximal allowed key value */
static const Key kMaxKey = std::numeric_limits<Key>::max();
​
Copy the code

Second, divide the int range equally

const std::vector<Range>& Postoffice::GetServerKeyRanges() {
  if (server_key_ranges_.empty()) {
    for (int i = 0; i < num_servers_; ++i) {
      server_key_ranges_.push_back(Range(
          kMaxKey / num_servers_ * i,
          kMaxKey / num_servers_ * (i+1)));
    }
  }
  return server_key_ranges_;
}
​
Copy the code

3.5 Initializing the Environment

As we know from the previous analysis, PS-Lite controls specific nodes through environment variables.

Which one a node falls into depends on which environment variables are set and their values before starting the node.

Environment variables include node role, number of worker&Server, IP address, and port.

The InitEnvironment function creates the Van, obtains the number of workers and servers, and obtains the type of the node.

void Postoffice::InitEnvironment() {
  const char* val = NULL;
  std::string van_type = GetEnv("DMLC_PS_VAN_TYPE", "zmq");
  van_ = Van::Create(van_type);
  val = CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_WORKER"));
  num_workers_ = atoi(val);
  val =  CHECK_NOTNULL(Environment::Get()->find("DMLC_NUM_SERVER"));
  num_servers_ = atoi(val);
  val = CHECK_NOTNULL(Environment::Get()->find("DMLC_ROLE"));
  std::string role(val);
  is_worker_ = role == "worker";
  is_server_ = role == "server";
  is_scheduler_ = role == "scheduler";
  verbose_ = GetEnv("PS_VERBOSE", 0);
}
​
Copy the code

3.6 start

The main thing is:

  • Call InitEnvironment() to initialize the environment and create the VAN object;
  • Node_ids_ initialization. According to the number of worker and server nodes, the node ID set corresponding to each ID value is determined. We have analyzed the specific logic earlier.
  • Start the van, where various interactions take place (there is an ADD_NODE synchronous wait, unlike the subsequent barrier wait);
  • If PostOffice::Start is called for the first time, initialize the start_time_ member;
  • If barrier is set, barrier is called to wait/process and eventually the system starts uniformly. That is, all nodes prepare and send a Message to the Scheduler requesting synchronization for the first time.

The specific code is as follows:

void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) { start_mu_.lock(); if (init_stage_ == 0) { InitEnvironment(); // Init node info. // Set node for (int I = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); }} for (int I = 0; i < num_servers_; ++i) { int id = ServerRankToID(i); for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup, kServerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } // Set scheduler node for (int g:); {kScheduler, kScheduler + kServerGroup + kWorkerGroup, kScheduler + kWorkerGroup, kScheduler + kServerGroup}) { node_ids_[g].push_back(kScheduler); } init_stage_++; } start_mu_.unlock(); // start van van_->Start(customer_id); start_mu_.lock(); if (init_stage_ == 1) { // record start time start_time_ = time(NULL); init_stage_++; } start_mu_.unlock(); // do a barrier here if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler); }Copy the code

3.7 the Barrier

3.7.1 synchronization

In general, schedular nodes synchronize nodes by counting them. Specifically:

  • Each node sends a request to the schedular node for a Control::BARRIER command after running its command and blocks until it receives a return from the schedular.
  • When the schedular node receives a request, it counts locally to see if it has the same number of requests as the barrier_group, which means that each machine has run the specified command, and the schedular node sends a return message to each machine in the Barrier_group. And unblock it.

3.7.2 initialization

Ps-lite uses barriers to control the initialization of the system, meaning that everyone moves forward when they are ready. This is an option. The details are as follows:

  • Scheduler waits for all workers and servers to send BARRIER information.
  • After completing ADD_NODE, each node enters the Barrier blocking synchronization mechanism of the specified group (sending the Barrier to the Scheduler) to ensure that the above process is complete for each node.
  • All nodes (worker and server, including scheduler) wait for the scheduler to receive the response after receiving the BARRIER information of all nodes;
  • Finally, all nodes exit the blocking state after receiving the Barrier message replied by the Scheduler.
3.7.2.1 Waiting for BARRIER messages

Node calls the Barrier function to tell the Scheduler that it is in a waiting state.

Notice, when I call it, I call it

if (do_barrier) Barrier(customer_id, kWorkerGroup + kServerGroup + kScheduler);  
​
Copy the code
void Postoffice::Barrier(int customer_id, int node_group) { if (GetNodeIDs(node_group).size() <= 1) return; auto role = van_->my_node().role; if (role == Node::SCHEDULER) { CHECK(node_group & kScheduler); } else if (role == Node::WORKER) { CHECK(node_group & kWorkerGroup); } else if (role == Node::SERVER) { CHECK(node_group & kServerGroup); } std::unique_lock<std::mutex> ulk(barrier_mu_); barrier_done_[0][customer_id] = false; Message req; req.meta.recver = kScheduler; req.meta.request = true; req.meta.control.cmd = Control::BARRIER; req.meta.app_id = 0; req.meta.customer_id = customer_id; req.meta.control.barrier_group = node_group; Req.meta. Timestamp = van_->GetTimestamp(); van_->Send(req); BARRIER barrier_cond_. Wait (ulk, [this, customer_id] {// then wait for barrier_done_[0][customer_id]; }); }Copy the code

That is, waiting for all the groups, the Scheduler nodes, to send messages to themselves as well.

3.7.2.2 Processing BARRIER Messages

The action that handles waiting is in the Van class, which we’ll release ahead of time.

The ProcessBarrierCommand logic is as follows:

  • If MSG ->meta. Request is true, the scheduler receives the message and processes it.

    • Scheduler increments the count of Barrier requests.
    • When the Scheduler receives the last request (the count equals the total number of nodes in the group), it clears the count and sends the command to end the Barrier. Meta. Request is set to false;
    • To all nodes in the grouprequest==falsetheBARRIERThe message.
  • If meta -> request is false, the barrier is received and the respones are called. If meta -> request is false, the barrier is received and the respones are called.

    • The Manage function will app_id all costomer’sbarrier_done_Set to true, and notify all wait condition variablesbarrier_cond_.notify_all().
void Van::ProcessBarrierCommand(Message* msg) { auto& ctrl = msg->meta.control; If (MSG ->meta. Request) {// Scheduler received the message, because the Postoffice::Barrier function is set to true on send. if (barrier_count_.empty()) { barrier_count_.resize(8, 0); } int group = ctrl.barrier_group; ++barrier_count_[group]; If (barrier_count_[group] == static_cast<int>(Postoffice::Get()->GetNodeIDs(group).size())) {// If yes, the last request has been received, so the barrier is unblocked. barrier_count_[group] = 0; Message res; res.meta.request = false; Res.meta. App_id = MSG ->meta. App_id; res.meta.customer_id = msg->meta.customer_id; res.meta.control.cmd = Control::BARRIER; for (int r : Postoffice::Get()->GetNodeIDs(group)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { res.meta.recver = recver_id; res.meta.timestamp = timestamp_++; Send(res); }}}} else {// that barrier respones have been received and you can remove them. See the setting to false above for details. Postoffice::Get()->Manage(*msg); }}Copy the code

The Manage function removes the barrier.

void Postoffice::Manage(const Message& recv) { CHECK(! recv.meta.control.empty()); const auto& ctrl = recv.meta.control; if (ctrl.cmd == Control::BARRIER && ! recv.meta.request) { barrier_mu_.lock(); auto size = barrier_done_[recv.meta.app_id].size(); for (size_t customer_id = 0; customer_id < size; customer_id++) { barrier_done_[recv.meta.app_id][customer_id] = true; } barrier_mu_.unlock(); barrier_cond_.notify_all(); Barrier}}Copy the code

The detailed diagram is as follows:

                                                    +
    Scheduler                                       |                  Worker
        +                                           |                     +
        |                                           |                     |
        |                                           |                     |
        +--------------------------------+          |                     +-----------------+
        |                                |          |                     |                 |
        |                                |          |                     |                 |
        |                                |          |                     |                 |
        |                                v          |                     |                 v
        |                         receiver_thread_  |                     |           receiver_thread_
        |                                +          |                     |                 |
        |                                |          |                     |                 |
        v              BARRIER           |          |   BARRIER           v                 |
Postoffice::Barrier +----------------->  | <---------------------+ Postoffice::Barrier      |
        +                                |          |                     +                 |
        |                                |          |                     |                 |
        |                                |          |                     |                 |
        |                                |          |                     |                 |
        |                                v          |                     |                 |
        v                                           |                     v                 |
 barrier_cond_.wait          ProcessBarrierCommand  |               barrier_cond_.wait      |
        |                                +          |                     |                 |
        |                                |          |                     |                 |
        |                  All Nodes OK  |          |                     |                 |
        |                                |          |                     |                 |
        |                 +--------------+          |   BARRIER           |                 |
        |                 |              +---------------------------------------------->   |
        |                 |  BARRIER     |          |                     |                 |
        |                 +------------> |          |                     |                 |
        |                                |          |                     |                 |
        |                                |          |                     |                 |
        +<-------------------------------<          |                     | <---------------+
        |          barrier_cond_.notify_all         |                     |    barrier_cond_.notify_all
        v                                           |                     v
                                                    +
​
​
Copy the code

Mobile phones are as follows:

So far, we have preliminarily completed the analysis of Postoffice, and the remaining functions will be analyzed in the following articles combined with Van and Customer.

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

Introduction to MXNet design and implementation

The most comprehensive PS-Lite understanding ever

Ps-lite deep source code interpretation

Ps-lite source code analysis

Extensible distributed machine learning architecture based on Parameter Server

Ps-lite code parsing

Ps-lite Code notes

Introduction to distributed TensorFlow

Distributed Machine Learning (1) – Parallel computing and machine learning

Distributed Machine Learning (MIDDLE) – Parallel computing and machine learning

Distributed Machine Learning (part 2) – Federated learning

Ps-lite source code analysis

Talk-scaling Distributed Machine Learning with System and Algorithm Co-design note