0 x00 the

This is the third part of the parameter server. It introduces the Customer module of PS-Lite.

Now that we have the PostOffice and the Van, the next step is to look at the PostOffice Customer.

Customer is SimpleApp’s agent at the post office. Because the worker and server need to focus on the algorithm, the logically network-related message sending and receiving functions of the worker and server are summarized/transferred to the Customer.

The other articles in this series are:

Machine learning parameter server pS-Lite (1) —– PostOffice

Machine learning parameter server PS-Lite (2) —– communication module Van

0 x01 source

1.1 Overall

Let’s summarize the overall situation:

  • PostOffice: a singleton global management class, where a node has a PostOffice for its lifetime and relies on its class members to manage the node;

  • Van: communication module, responsible for network communication with other nodes and the actual sending and receiving of messages. PostOffice holds a Van member;

  • SimpleApp: Parent class of KVServer and KVWorker, it provides simple Request, Wait, Response, Process functions; KVServer and KVWorker have rewritten these features to suit their respective missions;

  • Node: information, which stores information about the Node. Each Node is uniquely identified by hostname and port.

  • Customer: Each SimpleApp object holds a member of the Customer class that needs to be registered with PostOffice. This class is responsible for:

    • As a sender, track the responses to messages sent by SimpleApp;
    • As the receiver, it maintains a message queue for a Node and receives messages for the Node.

Knowing the context of a class gives us a better understanding of the class, so we first need to look at where Customer is used. We have analyzed two classes so far, so let’s look at how Customer is used in both classes.

1.2 Postoffice

In PostOffice, there are the following member variables:

// app_id -> (customer_id -> customer pointer) 
std::unordered_map<int, std::unordered_map<int, Customer*>> customers_;
Copy the code

And the following member functions that register Customer with customers_ :

void Postoffice::AddCustomer(Customer* customer) { std::lock_guard<std::mutex> lk(mu_); int app_id = CHECK_NOTNULL(customer)->app_id(); // check if the customer id has existed int customer_id = CHECK_NOTNULL(customer)->customer_id(); customers_[app_id].insert(std::make_pair(customer_id, customer)); std::unique_lock<std::mutex> ulk(barrier_mu_); barrier_done_[app_id].insert(std::make_pair(customer_id, false)); } Customer* Postoffice::GetCustomer(int app_id, int customer_id, int timeout) const { Customer* obj = nullptr; for (int i = 0; i < timeout * 1000 + 1; ++i) { { std::lock_guard<std::mutex> lk(mu_); const auto it = customers_.find(app_id); if (it ! = customers_.end()) { std::unordered_map<int, Customer*> customers_in_app = it->second; obj = customers_in_app[customer_id]; break; } } std::this_thread::sleep_for(std::chrono::milliseconds(1)); } return obj; }Copy the code

Therefore, we can see several points:

  • An app instance can have multiple Customers;
  • Customer needs to register with Postoffice;

1.3 Van

In the Van, we can see that when processing a data message, it will:

  • Get customer_ID from Postoffice based on app_ID in message;
  • Get Customer from Postoffice based on customer_id;
  • Call Customer’s Accept method to process the message;
void Van::ProcessDataMsg(Message* msg) {
  // data msg
  int app_id = msg->meta.app_id;
  int customer_id =
      Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id;
  auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5);
  obj->Accept(*msg);
}
Copy the code

So we know:

  • An app instance can have multiple Customers;
  • Customer needs to register with Postoffice;
  • When receiving the message, it will find the Customer according to the APP ID in the message, so as to call the Accept method of Customer to process the specific data message;

1.4 the Customer

In the case of Customer, we can see that the function of Accept is to insert messages into the queue of Customer.

ThreadsafePQueue recv_queue_;
 
inline void Accept(const Message& recved) {
   recv_queue_.Push(recved);
}
Copy the code

The Customer object itself also starts an Receiving thread recv_thread_, using Customer::Receiving(), where the registered recv_handle_ function is called to process the message.

std::unique_ptr<std::thread> recv_thread_;
​
recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
​
void Customer::Receiving() {
  while (true) {
    Message recv;
    recv_queue_.WaitAndPop(&recv);
    if (!recv.meta.control.empty() &&
        recv.meta.control.cmd == Control::TERMINATE) {
      break;
    }
    recv_handle_(recv);
    if (!recv.meta.request) {
      std::lock_guard<std::mutex> lk(tracker_mu_);
      tracker_[recv.meta.timestamp].second++;
      tracker_cond_.notify_all();
    }
  }
}
Copy the code

1.5 Current Logic

Therefore, we can conclude that the current logic (accept message logic) is as follows:

  • The worker node or server node executes Postoffice::start() at the beginning of the program.

  • Postoffice::start() initializes the node information and calls Van::start().

  • Van::start() starts a local thread using Van::Receiving() to continuously listen for incoming messages.

  • Van::Receiving() After Receiving the message, perform different actions according to different commands. For data messages, ProcessDataMsg is called if further processing is required:

    • Based on the APP ID in the messageCustomer.
    • Pass the message toCustomer::AcceptFunction.
  • The Customer::Accept() function adds the message to a queue recv_queue_;

  • The Customer object itself will also start an Receiving thread recv_thread_, using Customer::Receiving()

    • fromrecv_queue_Queue fetch message.
    • Call registeredrecv_handle_Function to process the message.

We can also see that the Van, Postoffice, and Customer classes are a little too coupled to each other, so it might be better to sort them out:

+--------------------------+ | Van | | | DataMessage +-----------> Receiving | | 1 + | +---------------------------+ | |  | | Postoffice | | | 2 | | | | v | GetCustomer | | | ProcessDataMsg <------------------> unordered_map customers_| | + | 3 | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | Customer | | | | | | v | | Accept | | + | | | | | | 5 | | v | | recv_queue_ | | + | | | 6 | | | | | v | | Receiving | | + | | | 7 | | | | | v | | recv_handle_ | | | +-------------------------+Copy the code

Let’s analyze the specific logic in detail.

0 x02 base classes

We’ll start with some base classes.

2.1 SArray

SArray has the following features:

  • SArray is a smart array of shared data that provides functionality similar to STD ::vector.
  • SArray can be built from STD ::vector.
  • A SArray can be copied and assigned like a C pointer, and when a reference to a SArray is zero, the memory of that SArray is automatically reclaimed.
  • We can think of it as a vector with zero copies, compatible with vector data structures.

2.2 KVPairs

In PS-Lite, each server has a sequence of keys and their corresponding values. Key and value are stored separately, and each key may correspond to multiple values. Therefore, the length of each key needs to be recorded, so there is KVPairs.

KVPairs features are as follows:

  • KVPairs encapsulates the key-value structure and contains a length option with three arrays: keys, values and lens.
  • KVPairs contains template classes for SArray Keys, SArray vals, and SArray Lens. Key is an alias for int64, and Val is a template variable.
  • The length of lens and keys is the same, indicating the number of values corresponding to each key.
  • Lens can be empty, in which case values are bisecting.

For example:

  • If keys=[1,5], lens=[2,3], then values[0], values[1], values[2], values[3], values[5].
  • If len is empty, then values.size() must be a multiple of keys.size() (here 2), with key[0] and key[1] matching half of the values.

The definition is as follows:

struct KVPairs { // /** \brief empty constructor */ // KVPairs() {} /** \brief the list of keys */ SArray<Key> keys; /** \brief the according values */ SArray<Val> vals; /** \brief the according value lengths (could be empty) */ SArray<int> lens; Vector /** \brief priority */ int priority = 0; };Copy the code

2.3 the Node

Node encapsulates Node information, such as role, IP address, port, and whether to restore the Node.

struct Node {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief default constructor */
  Node() : id(kEmpty), port(kEmpty), is_recovery(false) {}
  /** \brief node roles */
  enum Role { SERVER, WORKER, SCHEDULER };
​
  /** \brief the role of this node */
  Role role;
  /** \brief node id */
  int id;
  /** \brief customer id */
  int customer_id;
  /** \brief hostname or ip */
  std::string hostname;
  /** \brief the port this node is binding */
  int port;
  /** \brief whether this node is created by failover */
  bool is_recovery;
};
Copy the code

2.4 the Control

Control: Encapsulates the meta information of the Control message, barrier_group (used when command=BARRIER), node (node class, used to identify the nodes on which the Control command is used), and method signatures.

As you can see, Control contains the Node type described above.

struct Control {
  /** \brief empty constructor */
  Control() : cmd(EMPTY) { }
  /** \brief return true is empty */
  inline bool empty() const { return cmd == EMPTY; }
​
  /** \brief all commands */
  enum Command { EMPTY, TERMINATE, ADD_NODE, BARRIER, ACK, HEARTBEAT };
  /** \brief the command */
  Command cmd;
  /** \brief node infos */
  std::vector<Node> node;
  /** \brief the node group for a barrier, such as kWorkerGroup */
  int barrier_group;
  /** message signature */
  uint64_t msg_sig;
};
Copy the code

2.5 Meta

Meta: metadata part of the message, including timestamp, sender ID, receiver ID, Control information Control, message type, etc.

struct Meta {
  /** \brief the empty value */
  static const int kEmpty;
  /** \brief default constructor */
  Meta() : head(kEmpty), app_id(kEmpty), customer_id(kEmpty),
           timestamp(kEmpty), sender(kEmpty), recver(kEmpty),
           request(false), push(false), pull(false), simple_app(false) {}
  /** \brief an int head */
  int head;
  /** \brief the unique id of the application of messsage is for*/
  int app_id;
  /** \brief customer id*/
  int customer_id;
  /** \brief the timestamp of this message */
  int timestamp;
  /** \brief the node id of the sender of this message */
  int sender;
  /** \brief the node id of the receiver of this message */
  int recver;
  /** \brief whether or not this is a request message*/
  bool request;
  /** \brief whether or not a push message */
  bool push;
  /** \brief whether or not a pull message */
  bool pull;
  /** \brief whether or not it's for SimpleApp */
  bool simple_app;
  /** \brief an string body */
  std::string body;
  /** \brief data type of message.data[i] */
  std::vector<DataType> data_type;
  /** \brief system control message */
  Control control;
  /** \brief the byte size */
  int data_size = 0;
  /** \brief message priority */
  int priority = 0;
};
Copy the code

2.6 the Message

2.6.1 structure

Message is the Message to be sent, as follows:

  • The meta header is metadata (using Protobuf for compression) and includes:

    • A Control message indicates the meaning of the message (e.g., termination, acknowledgement of ACK, synchronization, etc.), including:

      • Command type;

      • A vector of nodes. The nodes include:

        • Roles of nodes
        • ip, port
        • id
        • Yes No Recovery node
      • The group ID indicates to whom the control command is executed;

      • Method signature;

    • The sender;

    • The receiver;

    • The time stamp.

    • .

  • The body of the message is the data sent. It uses the custom SArray to share data and reduce data copy.

2.6.2 Logical Relationships

The logical relationship between the classes is as follows:

Some functionality in Message depends on Meta, and so on.

2.6.3 message type

Message contains the following types:

  • ADD_NODE: Workers and servers add nodes to shceduler
  • BARRIER: Synchronous blocking messages between nodes
  • HEARTBEAT: Indicates the HEARTBEAT signals between nodes. Check alive
  • TERMINATE: Indicates that the node exits
  • EMPTY: ordinary messages, such as push or pull

2.6.4 definition

Specific definitions are as follows:

struct Message { /** \brief the meta info of this message */ Meta meta; /** \brief the large chunk of data of this message */ std::vector<SArray<char> > data; /** * \brief push array into data, and add the data type */ template <typename V> void AddData(const SArray<V>& val) { CHECK_EQ(data.size(), meta.data_type.size()); meta.data_type.push_back(GetDataType<V>()); SArray<char> bytes(val); meta.data_size += bytes.size(); data.push_back(bytes); }};Copy the code

Each time a message is sent, the message is encapsulated in this format, and the member of the class responsible for sending the message (the Customer class) delivers the message to the door according to the information in the Meta.

0x03 Customer

3.1 an overview of the

Customer actually has two functions:

  • As a sender, to track the Response of each Request sent by SimpleApp;
  • As a receiver, Customer actually exists as a receiving message processing engine (or part of an engine) because it has its own receiving thread and receiving message queue.

Specific features are as follows:

  • Each SimpleApp object holds a member of class Customer, and the Customer needs to be registered with PostOffice.
  • Because the Customer also handles messages but does not take over the network itself, the actual Response and Message need to be told by the external caller, so there is a bit of a split in function and responsibility.
  • Each connection corresponds to a Customer instance, and each Customer is bound to a node ID and sent to the node with the corresponding NODE ID on behalf of the current node. The id of the connecting party is the same as the ID of the Customer instance.
  • A new request will return a timestamp, which will be used as the ID of the request. Each request will increment by 1, and the corresponding RES will also increment by 1. When calling WAIT, it will ensure that the id will be identified in the subsequent wait, such as when doing wait.

3.2 define

3.2.1 Member Variables

Let’s first look at the member variable for Customer.

It should be noted that the understanding of variable function here can be seen from the message flow. That is, if there is a receiving message, the data flow of this process is as follows. Therefore, we also sort out the member variables of Customer in this order:

Van::ProcessDataMsg ---> Customer::Accept ---> Customer::recv_queue_ ---> Customer::recv_thread_ ---> Customer::recv_handle_ 
Copy the code

The main member variables are as follows:

  • ThreadsafePQueue RECv_QUEUe_ : Threadsafe message queue;

  • STD ::unique_ptr< STD ::thread> recv_thread_ : reads message from recv_queue and calls recv_handle_;

  • RecvHandle RECV_handle_ : Message handling function for the worker or server.

    • Bind to the Customer handler (SimpleApp::Process) after receiving the request;
    • Customer will pull up a new thread that will process accepted requests with recv_handle_ for the lifetime of the Customer. In this case, a thread-safe queue is used. Accept() is used to send messages to the queue.
    • The received messages come from Van receiving threads. After receiving messages, the Van object of each node pushes them to different Customer objects based on different messages.
    • For workers, such as KVWorker, recv_handle_ holds the data in the pulled MSG,
    • For Server, you need to use set_request_handle to set the corresponding handler, such as KVServerDefaultHandle,
  • STD ::vector< STD ::pair<int, int>> Tracker_ : Request & Response synchronization variable.

    • Tracker_ is a map within Customer that is used to record the status of request and response. It records how many nodes each request (with request ID) may have sent and how many times the response was returned from.
    • Tracker_ subscript is the timestamp of each request, that is, the request number.
    • Tracker_ [I]. First indicates how many nodes the request was sent to, that is, the number of responses the node should receive.
    • Tracker_ [I]. Second indicates the actual number of responses received so far.

3.2.2 Specific definitions

Specific definitions are as follows:

class Customer { public: /** * \brief the handle for a received message * \param recved the received message */ using RecvHandle = std::function<void(const Message& recved)>; /** * \brief constructor * \param app_id the globally unique id indicating the application the postoffice * serving for * \param customer_id the locally unique id indicating the customer of a postoffice * \param recv_handle the functino for  processing a received message */ Customer(int app_id, int customer_id, const RecvHandle& recv_handle); /** * \brief desconstructor */ ~Customer(); /** * \brief return the globally unique application id */ inline int app_id() { return app_id_; } /** * \brief return the locally unique customer id */ inline int customer_id() { return customer_id_; } /** * \brief get a timestamp for a new request. threadsafe * \param recver the receive node id of this request * \return the timestamp of this request */ int NewRequest(int recver); /** * \brief wait until the request is finished. threadsafe * \param timestamp the timestamp of the request */ void WaitRequest(int timestamp); /** * \brief return the number of responses received for the request. threadsafe * \param timestamp the timestamp of the  request */ int NumResponse(int timestamp); /** * \brief add a number of responses to timestamp */ void AddResponse(int timestamp, int num = 1); /** * \brief accept a received message from \ref Van. threadsafe * \param recved the received the message */ inline void  Accept(const Message& recved) { recv_queue_.Push(recved); } private: /** * \brief the thread function */ void Receiving(); int app_id_; int customer_id_; RecvHandle recv_handle_; ThreadsafePQueue recv_queue_; std::unique_ptr<std::thread> recv_thread_; std::mutex tracker_mu_; std::condition_variable tracker_cond_; std::vector<std::pair<int, int>> tracker_; DISALLOW_COPY_AND_ASSIGN(Customer); };Copy the code

3.3 Accept Threads

In the constructor, the receiving thread is created.

recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
​
Copy the code

Thread processing function is as follows, the specific logic is:

  • Wait on the message queue and retrieve the message if there is one;
  • Use recv_handle_ to process the message;
  • If meta. Request is false, it is response, then increase the corresponding count in tracker.
void Customer::Receiving() {
  while (true) {
    Message recv;
    recv_queue_.WaitAndPop(&recv);
    if (!recv.meta.control.empty() &&
        recv.meta.control.cmd == Control::TERMINATE) {
      break;
    }
    recv_handle_(recv);
    if (!recv.meta.request) {
      std::lock_guard<std::mutex> lk(tracker_mu_);
      tracker_[recv.meta.timestamp].second++;
      tracker_cond_.notify_all();
    }
  }
}
​
Copy the code

Since recv_handle_ is used for the specific business logic, let’s take a look at how recv_handle_ is set up, and in fact, how Customer is built and used.

3.4 How Can I Build a Vm?

We need to use some of the classes we’ll analyze below ahead of time, because they are customers of Customer and are too tightly coupled.

3.4.1 track In SimpleApp has

First let’s look at SimpleApp, which is the base class for the concrete logical function node.

Each SimpleApp object holds a member of class Customer, which needs to be registered with PostOffice,

In this case, create a new Custom object and initialize the obj_ member.

inline SimpleApp::SimpleApp(int app_id, int customer_id) : SimpleApp() {
  using namespace std::placeholders;
  obj_ = new Customer(app_id, customer_id, std::bind(&SimpleApp::Process, this, _1));
}
​
Copy the code

Let’s look at two subclasses of SimpleApp.

3.4.2 KVServer (app_id)

The KVServer class is mainly used to save key-values data and perform some business operations, such as gradient updates. The main methods are Process() and Response().

In its constructor it will:

  • Create a new Customer object to initialize the obj_ member;
  • Passing KVServer::Process into the Customer constructor gives the Process methodCustomer:: recv_handle_;
  • For Server, app_id = custom_id = Server’s id;

The constructor is as follows:

  /**
   * \brief constructor
   * \param app_id the app id, should match with \ref KVWorker's id
   */
  explicit KVServer(int app_id) : SimpleApp() {
    using namespace std::placeholders;
    obj_ = new Customer(app_id, app_id, std::bind(&KVServer<Val>::Process, this, _1));
  }
​
Copy the code

Rule 3.4.3 KVWorker (app_id, custom_id)

The KVWorker class is primarily used to Push/Pull its own key-value data to the Server. Methods include: Push(), Pull(), Wait().

In its constructor it will:

  • Bind the slicer_ member with the default KVWorker::DefaultSlicer;
  • Create a new Customer object to initialize the obj_ member and pass the Customer constructor with KVWorker::Process, essentially giving the Process method to Customer:: recv_handle_;
  /**
   * \brief constructor
   *
   * \param app_id the app id, should match with \ref KVServer's id
   * \param customer_id the customer id which is unique locally
   */
  explicit KVWorker(int app_id, int customer_id) : SimpleApp() {
    using namespace std::placeholders;
    slicer_ = std::bind(&KVWorker<Val>::DefaultSlicer, this, _1, _2, _3);
    obj_ = new Customer(app_id, customer_id, std::bind(&KVWorker<Val>::Process, this, _1));
  }
​
Copy the code

3.4.4 Customer

The build function logic is as follows:

  • Initialize the app_ID_, custom_id_, and recv_handle members with the arguments passed to the constructor, respectively

  • Call PostOffice::AddCustomer to register the current Customer with PostOffice;

    • PostOffice Customers_ member: add custom_id to the element corresponding to app_ID;
    • The barrier_done_ member of PostOffice sets the synchronization status of this Custom_id to false
  • New Receiving thread recv_thread_.

The specific construction functions are as follows:

Customer::Customer(int app_id, int customer_id, const Customer::RecvHandle& recv_handle)
    : app_id_(app_id), customer_id_(customer_id), recv_handle_(recv_handle) {
  Postoffice::Get()->AddCustomer(this);
  recv_thread_ = std::unique_ptr<std::thread>(new std::thread(&Customer::Receiving, this));
}
​
Copy the code

3.4.5 comb

3.4.5.1 Example code

You may have questions about app_id and customer_id, such as:

In the KVWorker builder there are:

  • app_id the app id, should match with KVServer’s id
  • customer_id the customer id which is unique locally

In the KVServer builder there are:

  • app_id the app id, should match with KVWorker’s id

We use tests/ test_kv_app_multi_worker. cc to sort out the logical relationship between app_id and customer_id.

Let’s get ahead of the story: The worker uses customer_id to identify itself. Customer ID is used in worker code to determine the range of key corresponding to this worker.

As you can see from the script, use the following test:

find test_* -type f -executable -exec ./repeat.sh 4 ./local.sh 2 2 ./{} ;
Copy the code

A server and two workers are started in the file.

  • Server app_id, customer_id 0;

  • Worker app_id = 0, customer_id = 0, customer_id = 1;

  • The worker is executed using STD ::thread, which means that two worker nodes are running within the same process. This explains the comment “the customer ID which is unique locally” in the KVWorker build function.

  • STD ::unordered_map<int, STD ::unordered_map<int, Customer*>> Customers_ member variable

    • [0, [0, Customer_0]], the first 0 is the APP ID and the second 0 is the Customer ID
    • [0, [1, Customer_1]], where the first 0 is the APP ID and the second 1 is the Customer ID

Therefore, we can find out:

  • The app ID is used to identify an application.
  • The Customer ID is used to determine a local worker or a server in the application (app ID).
  • So the KVServer builder says that the app ID needs to be the same as the KVWorker’s ID, which means that everyone’s app ID needs to be the same.
  • The customer ID is used to determine the range of key corresponding to this worker in the following worker code.

The specific code is as follows:

#include <cmath> #include "ps/ps.h" using namespace ps; Void StartServer() {// start the service if (! IsServer()) return; auto server = new KVServer<float>(0); server->set_request_handle(KVServerDefaultHandle<float>()); RegisterExitCallback([server](){ delete server; }); } void RunWorker(int customer_id) {// Start worker (customer_id); if (! IsWorker()) { return; } KVWorker<float> kv(0, customer_id); // init int num = 10000; std::vector<Key> keys(num); std::vector<float> vals(num); int rank = MyRank(); srand(rank + 7); for (int i = 0; i < num; ++i) { keys[i] = kMaxKey / num * i + customer_id; vals[i] = (rand() % 1000); } // push int repeat = 50; std::vector<int> ts; for (int i = 0; i < repeat; ++i) { ts.push_back(kv.Push(keys, vals)); // to avoid too frequency push, which leads huge memory usage if (i > 10) kv.Wait(ts[ts.size()-10]); } for (int t : ts) kv.Wait(t); // pull std::vector<float> rets; kv.Wait(kv.Pull(keys, &rets)); // pushpull std::vector<float> outs; for (int i = 0; i < repeat; ++i) { kv.Wait(kv.PushPull(keys, vals, &outs)); } float res = 0; float res2 = 0; for (int i = 0; i < num; ++i) { res += fabs(rets[i] - vals[i] * repeat); res += fabs(outs[i] - vals[i] * 2 * repeat); } CHECK_LT(res / repeat, 1e-5); CHECK_LT(res2 / (2 * repeat), 1e-5); LL << "error: " << res / repeat << ", " << res2 / (2 * repeat); // stop system Finalize(customer_id, true); } int main(int argc, char *argv[]) { // start system bool isWorker = (strcmp(argv[1], "worker") == 0); if (! isWorker) { Start(0); // Setup server nodes, start the server node StartServer(); Finalize(0, true); return 0; } // Run worker nodes, start two worker nodes STD ::thread t0(RunWorker, 0); std::thread t1(RunWorker, 1); t0.join(); t1.join(); return 0; }Copy the code
3.4.5.2 Identification

Recall the Postoffice initialization, and you can see that the worker uses customer_id to identify itself at startup. Therefore, the customer ID is used to determine the range of key corresponding to this worker in the worker code.

Void Postoffice::Start(int customer_id, const char* argv0, const bool do_barrier) {// init node info. // For all workers, For (int I = 0; i < num_workers_; ++i) { int id = WorkerRankToID(i); for (int g : {id, kWorkerGroup, kWorkerGroup + kServerGroup, kWorkerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); }} for (int I = 0; i < num_servers_; ++i) { int id = ServerRankToID(i); for (int g : {id, kServerGroup, kWorkerGroup + kServerGroup, kServerGroup + kScheduler, kWorkerGroup + kServerGroup + kScheduler}) { node_ids_[g].push_back(id); } // Set scheduler node for (int g:); {kScheduler, kScheduler + kServerGroup + kWorkerGroup, kScheduler + kWorkerGroup, kScheduler + kServerGroup}) { node_ids_[g].push_back(kScheduler); } init_stage_++; } // start van van_->Start(customer_id); // Here is customer_id...... // do a barrier here, where there is customer_id if (do_barrier) barrier (customer_id, kWorkerGroup + kServerGroup + kScheduler); }Copy the code

Take a look at Van initialization, which also uses customer_id to identify itself.

void Van::Start(int customer_id) { if (init_stage == 0) { // get my node info if (is_scheduler_) { my_node_ = scheduler_; } else { my_node_.hostname = ip; my_node_.role = role; my_node_.port = port; my_node_.id = Node::kEmpty; my_node_.customer_id = customer_id; Customer_id}} if (! is_scheduler_) { // let the scheduler know myself Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; Customer_id MSG. Meta. Recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); }... }Copy the code

This also explains why app_ID and customer_ID are used when KVWorker sends messages.

template <typename Val> void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) { ..... for (size_t i = 0; i < sliced.size(); ++i) { Message msg; msg.meta.app_id = obj_->app_id(); Customer_id = obj_->customer_id(); Request = true; . Postoffice::Get()->van()->Send(msg); }}Copy the code

In KVServer, you also need to use app_ID and customer_ID when responding to messages.

template <typename Val> void KVServer<Val>::Response(const KVMeta& req, const KVPairs<Val>& res) { Message msg; msg.meta.app_id = obj_->app_id(); Customer_id = req.customer_id; Request = false; msg.meta.push = req.push; msg.meta.pull = req.pull; msg.meta.head = req.cmd; msg.meta.timestamp = req.timestamp; msg.meta.recver = req.sender; . Postoffice::Get()->van()->Send(msg); }Copy the code
3.4.5.3 problem

So why is app_id equal to customer_id on the Server side?

Since there is no original code for PHOTOSHOP, the guess is:

In the Ps code, there are also multiple Cusomers on the Server side, but this feature was removed in Ps-Lite for streamlining purposes, so in Ps-Lite, app_id is equal to customer_id.

3.5 Current Logic

So let’s comb through the process again (receiving message logic) as follows:

  • The worker node or server node executes Postoffice::start() at the beginning of the program.

  • Postoffice::start() initializes the node information and calls Van::start().

  • Van::start() starts a local thread using Van::Receiving() to continuously listen for incoming messages.

  • Van::Receiving() After Receiving the message, perform different actions according to different commands. For data messages, ProcessDataMsg is called if further processing is required:

    • The Customer is found according to the APP ID in the message, that is, the Recv thread that sends the message to different customers according to the different Customer ID.
    • Pass the message toCustomer::AcceptFunction.
  • The Customer::Accept() function adds the message to a queue recv_queue_;

  • The Customer object itself will also start an Receiving thread recv_thread_, using Customer::Receiving()

    • fromrecv_queue_Queue fetch message.
    • If (! Recv.meta. Request), which means response, thentracker_[req.timestamp].second++
    • Call registeredrecv_handle_Function to process the message.
  • For workers, the recv_handle_ registered is the KVWorker::Process() function. Because the message received by the worker’s Recv thread is mainly the KV pair pulled from the server, the Process() mainly receives the KV pair in the message.

  • For the Server, the recv_handle_ registered is the KVServer::Process() function. KVServer::set_request_handle() ¶ KVServer::set_request_handle() ¶

In step 8, recv_handle_ points to either KVServer::Process or KVWorker::Process.

+--------------------------+ | Van | | | DataMessage +-----------> Receiving | | 1 + | +---------------------------+ | |  | | Postoffice | | | 2 | | | | v | GetCustomer | | | ProcessDataMsg <------------------> unordered_map customers_| | + | 3 | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | Customer | | | | | | v | | Accept | | + | | | | | | 5 | | v | | recv_queue_ | +-----------------+ | + | |KVWorker | | | 6 | +--------> | | | | | | 8 | Process | | v | | +-----------------+ | Receiving | | | + | | | | 7 | | | | | | +-----------------+ | v | | |KVServer | | recv_handle_+---------+--------> | | | | 8 | Process | +-------------------------+ +-----------------+Copy the code

0x04 Functions Functions

The following Customer functions are called by other modules.

Customer: 4.1: NewRequest

4.4.1 implementation

When a request is sent, this function adds the count of the request. So, we use this function when we need to count a Resquest.

Features are as follows:

  • Before each message is sent, the number of responses that should be received for this message is modified.
  • Recver represents the node_id of the receiver, because an integer in Ps-Lite may correspond to multiple node_id, so use Postoffice decoding to get all the real number of node_id.
  • For example, if you send a message to kServerGroup and there are three servers in kServerGroup, then num is 3, which means you should receive three responses. Tracker_ corresponds to item [3,0], which means it should have received three, but it has received zero.
  • The return value of this function can be thought of as a timestamp. This timestamp will be used as the ID of the request.
int Customer::NewRequest(int recver) { std::lock_guard<std::mutex> lk(tracker_mu_); int num = Postoffice::Get()->GetNodeIDs(recver).size(); // Recver may represent a group. tracker_.push_back(std::make_pair(num, 0)); return tracker_.size() - 1; // select * from customer; // select * from customer;Copy the code

4.1.2 call

The specific invocation example is when the worker pushes to the server.

int ZPush(const SArray<Key>& keys, const SArray<Val>& vals, const SArray<int>& lens = {}, int cmd = 0, const Callback& cb = nullptr, int priority = 0) { int ts = obj_->NewRequest(kServerGroup); // This will call AddCallback(ts, cb); KVPairs<Val> kvs; kvs.keys = keys; kvs.vals = vals; kvs.lens = lens; kvs.priority = priority; Send(ts, true, false, cmd, kvs); return ts; }Copy the code

Customer: 4.2: AddResponse

2 implementation

The function is: for the request has returned response count.

Features are as follows:

  • When the external caller receives a Response, it calls AddResponse to tell the Customer object.
  • Actively increase the number of responses actually received for a request, mainly used for the client to send the request, sometimes it can skip the communication with some servers (the keys of this communication are not distributed on these servers), and the client can directly think that the Response has been received.
  • In addition, inCustomer::Receiving, after processing a non-request request, the Response number of the corresponding request will also be increased.tracker_[recv.meta.timestamp].second++;
  • A bug in this class is that there is no deletion operation for expired Request information that will never be used again. The lifetime of a single object of this class is almost equal to the lifetime of the process. Therefore, based on the PS-Lite program run for a long time will be basically OOM.
void Customer::AddResponse(int timestamp, int num) {
  std::lock_guard<std::mutex> lk(tracker_mu_);
  tracker_[timestamp].second += num;
}
​
Copy the code

4.2.2 call

The Send method of KVWorker will be called, because in some cases (the keys of this communication are not distributed on these servers), the client can directly think that the Response has been received, so skip.

template <typename Val> void KVWorker<Val>::Send(int timestamp, bool push, bool pull, int cmd, const KVPairs<Val>& kvs) { // slice the message SlicedKVs sliced; slicer_(kvs, Postoffice::Get()->GetServerKeyRanges(), &sliced); // need to add response first, since it will not always trigger the callback int skipped = 0; for (size_t i = 0; i < sliced.size(); ++i) { if (! sliced[i].first) ++skipped; } obj_->AddResponse(timestamp, skipped); To bounce or bounce over. To bounce or bounce. To bounce or bounce over. To bounce or bounce over. } for (size_t i = 0; i < sliced.size(); ++i) { const auto& s = sliced[i]; if (! s.first) continue; Message msg; msg.meta.app_id = obj_->app_id(); msg.meta.customer_id = obj_->customer_id(); msg.meta.request = true; msg.meta.push = push; msg.meta.pull = pull; msg.meta.head = cmd; msg.meta.timestamp = timestamp; msg.meta.recver = Postoffice::Get()->ServerRankToID(i); msg.meta.priority = kvs.priority; const auto& kvs = s.second; if (kvs.keys.size()) { msg.AddData(kvs.keys); msg.AddData(kvs.vals); if (kvs.lens.size()) { msg.AddData(kvs.lens); } } Postoffice::Get()->van()->Send(msg); }}Copy the code

Customer: 4.3: WaitRequest

This implementation

When we need to wait for all the responses corresponding to a sent Request to be received, using this function will block the wait until the number of responses that should be received equals the number of responses actually received.

The process of a wait operation is that tracker_cond_ blocks and waits until the number sent is equal to the number that has been returned.

void Customer::WaitRequest(int timestamp) {
  std::unique_lock<std::mutex> lk(tracker_mu_);
  tracker_cond_.wait(lk, [this, timestamp]{
      return tracker_[timestamp].first == tracker_[timestamp].second;
    });
}
Copy the code

4.3.2 call

The Wait function uses WaitRequest to ensure that the operation completes.

  /**
   * \brief Waits until a push or pull has been finished
   *
   * Sample usage:
   * \code
   *   int ts = w.Pull(keys, &vals);
   *   Wait(ts);
   *   // now vals is ready for use
   * \endcode
   *
   * \param timestamp the timestamp returned by the push or pull
   */
  void Wait(int timestamp) { obj_->WaitRequest(timestamp); }
​
Copy the code

However, it is up to the user to decide how to call it. For example:

  for (int i = 0; i < repeat; ++i) {
    kv.Wait(kv.Push(keys, vals));
  }
Copy the code

This brings us to the question of synchronization strategy.

0x05 Synchronization policy

When different workers perform parallel operations at the same time, the progress of different workers may be different due to external reasons such as network and machine configuration. How to control the synchronization mechanism of workers is an important topic.

5.1 Synchronization Protocols

In general, there are three levels of asynchronous control protocols: Bulk Synchronous Parallel (BSP), Stalness Synchronous Parallel (SSP), and Asynchronous Parallel (ASP), their synchronization restrictions are relaxed in turn. In order to pursue faster computing speed, the algorithm can choose more loose synchronization protocol.

In order to solve the problem of performance, the industry began to explore the consistency model here, the first version is ASP model, after ASP proposed another relatively extreme synchronization protocol BSP, later someone proposed ASP and BSP do a compromise, is SSP.

The three agreements are as follows:

  • ASP: Tasks do not need to wait for each other at all, regardless of the order among workers, each worker moves at its own pace, updates after running an iteration, and the tasks completed first continue the next round of training.

    • Advantages: Eliminates the time to wait for slow tasks and reduces the idle time of the GPU, thus improving hardware efficiency compared with BSP. The computing speed is fast, which makes the maximum use of the computing power of the cluster, and the machines where all workers reside do not have to wait

    • Disadvantages:

      • This process can cause gradients to be calculated with outdated weights, thereby reducing statistical efficiency.
      • Poor applicability, in some cases can not guarantee the convergence

  • BSP: is a synchronization protocol used in general distributed computing. In each iteration, all tasks need to be computed. Each worker must run in the same iteration. Only when all workers of an iteration task have completed, will the synchronization and sharding update between worker and server be carried out.

    • The BSP mode differs from the single-machine serial mode only by batch size, so the convergence of the model is exactly the same. At the same time, because each worker can perform parallel computation within a cycle, it has certain parallel capability. This is what Spark does.

    • Advantages: Wide range of application; The convergence quality of each iteration is high

    • Disadvantages: In each iteration, BSP requires each worker to wait or pause the gradient from other workers, so it needs to wait for the slowest task, which significantly reduces the hardware efficiency and leads to a long time for overall task calculation. The performance of the entire worker group is determined by the slowest worker; This worker is commonly referred to as a Straggler.

  • SSP: A certain degree of task progress inconsistency is allowed, but this inconsistency has a ceiling called staleness value, that is, the fastest task is the most ahead of the slowest task staleness round iteration.

    • It’s a compromise between ASP and BSP. Since ASP allows the iteration interval between different workers to be arbitrarily large, while BSP only allows it to be 0, THEN I will take a constant s. With SSP, BSP can be obtained by specifying s=0. And ASP can also be achieved by specifying s= infinity.

    • Advantages: Reduces the waiting time between tasks to a certain extent, and the calculation speed is fast.

    • Disadvantages: The convergence quality of each iteration is not as good as that of BSP, more iterations may be needed to achieve the same convergence effect, and the applicability is not as good as that of BSP, so some algorithms are not applicable.

5.2 the paper

Mu Shen mentioned in the paper that parameter Server provides users with a variety of task dependence methods:

  • Sequential: Synchronous task. It is Sequential between tasks. The next task starts only when the previous task completes.

  • Eventual: opposite of sequential, there’s no order between tasks, they do their own tasks independently,

  • Bounded Delay: this is the trade-off between sequential and eventual, and a τ\tauτ can be set as the maximum Delay time. In other words, a new task can only be started when all the previous tasks >τ> tau>τ have been completed; Extreme cases:

    • τ tauτ = 0, the case is Sequential;
    • τ tauτ = infinity, that’s Eventual;

5.3 ps – lite

There are several areas in PS-Lite that involve waiting for synchronization:

  • Worker pull is an asynchronous operation. If the pull needs to be completed, Wait can be called to ensure that the request and response in customer are equal, that is, other operations can be done after the pull is completed.
  • There can be multiple customers in a worker. When the first one sends a barrier, the scheduler receives a request, and then judges it to be a request according to MSG. Then, after receiving the request to all nodes in the Barrier_group, Postoffice::Get()->Manage(* MSG) Set the bool corresponding to customer_id in barrier_done_ to true.
  • A barrier can also be performed when building a node connection;

More complex ones such as Asp, BSP, and SSP can be accomplished by adding corresponding commands.

0x06 Distributed optimization

6.1 Problem Definition

Suppose we want to solve the following problem

Where (yi, xi) is a sample pair and w is the model weight.

We consider using small batch random gradient descent (SGD) with batch size B to solve the above problems. In step T, the algorithm first randomly selects B samples, and then updates the weight W by the following formula

We use two examples to show how to implement a distributed optimization algorithm in PS-Lite.

6.2 Asynchronous SGD

In the first example, we extend SGD to asynchronous SGD. The server will maintain the model weight W, where server K will obtain the KTH stage of the weight W, represented by wk. Once the Server receives the gradient from the worker, Server K updates the weights it maintains.

t = 0;
while (Received(&grad)) {
  w_k -= eta(t) * grad;
  t++;
}
Copy the code

For a worker, each step does four things

Read(&X, &Y); // Read a miniBatch data Pull(&w); // Pull the latest weight ComputeGrad(X, Y, w, &grad) from the server; // Calculate the gradient Push(grad); // Push weights to the serverCopy the code

Ps-lite will provide push and pull functions, and the worker will communicate with the server that has the correct part of the data.

Please note that the asynchronous SGD differs from the stand-alone version in algorithmic mode. Because there is no communication between workers, it is possible that when one worker calculates the gradient, other workers update the weight on the server. That is, each worker may use the weight of the delay.

6.3 Synchronized SGD

Unlike the asynchronous version, the synchronous version is semantically identical to the stand-alone algorithm. That is, every iteration requires all workers to calculate the gradient and synchronize it to the server.

We use scheduler to manage data synchronization.

for (t = 0, t < num_iteration; ++t) {
  for (i = 0; i < num_worker; ++i) {
     IssueComputeGrad(i, t);
  }
  for (i = 0; i < num_server; ++i) {
     IssueUpdateWeight(i, t);
  }
  WaitAllFinished();
}
Copy the code

IssueComputeGrad and IssueUpdateWeight send commands to the worker and servers, and then scheduler calls WaitAllFinished to wait for all sent commands to finish.

When a worker receives a command, it does the following:

ExecComputeGrad(i, t) { Read(&X, &Y); Minibatch = batch/num_workers Pull(&w); ComputeGrad(X, Y, w, &grad); // Calculate the gradient Push(grad); // Push the weight to the server}Copy the code

This algorithm is almost the same as ASGD, except that in each step, only B/NUM_workers samples are processed.

On the Server node, there is an additional aggregation step compared to ASGD. After the gradient of all workers is accumulated, the iteration is carried out in accordance with the learning rate.

ExecUpdateWeight(i, t) {
   for (j = 0; j < num_workers; ++j) {
      Receive(&grad);
      aggregated_grad += grad;
   }
   w_i -= eta(t) * aggregated_grad;
}
​
Copy the code

0 x07 summary

  • PostOffice: a global management class in singleton mode. Each node (which can be uniquely identified by hostname + Port) has a PostOffice for its lifetime.

  • Van: communication module, responsible for network communication with other nodes and the actual sending and receiving of messages. PostOffice has a Van member, which literally means a cart that delivers mail;

  • SimpleApp: Parent class of KVServer and KVWorker, it provides simple Request, Wait, Response, Process functions; KVServer and KVWorker have rewritten these features to suit their respective missions;

  • Customer: Each SimpleApp object holds a member of the Customer class that needs to be registered with PostOffice. This class is responsible for:

    • As a sender, track the responses to messages sent by SimpleApp;
    • As the receiver, it maintains the message queue of a Node and receives messages for the Node.

    Customer, as you can see from the name, is the Customer of the post office, which is SimpleApp’s agent at the post office. Because the worker and server need to concentrate on the algorithm, the worker and server’s logically network-related message sending and receiving functions are summarized/transferred to the Customer.

The logic diagram is shown below.

+--------------------------+ | Van | | | DataMessage +-----------> Receiving | | 1 + | +---------------------------+ | |  | | Postoffice | | | 2 | | | | v | GetCustomer | | | ProcessDataMsg <------------------> unordered_map customers_| | + | 3 | | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | Customer | | | | | | v | | Accept | | + | | | | | | 5 | | v | | recv_queue_ | +-----------------+ | + | |KVWorker | | | 6 | +--------> | | | | | | 8 | Process | | v | | +-----------------+ | Receiving | | | + | | | | 7 | | | | | | +-----------------+ | v | | |KVServer | | recv_handle_+---------+--------> | | | | 8 | Process | +-------------------------+ +-----------------+Copy the code

0xEE Personal information

Thoughts on life and technology

Wechat public account: Rosie’s Thinking

If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.

0 XFF reference

www.cs.cmu.edu/~muli/file/…

Ps-lite code parsing

PS – Lite

Ps-lite source code learning

Ps-lite Code notes

PS Lite notes

Ps-lite source code interpretation

Ps-lite deep source code interpretation

Introduction to distributed TensorFlow

Architecture design of distributed machine learning platform

The four-fold realm of large-scale machine learning frameworks

Sona: An introduction to Spark on Angel’s Massively distributed machine learning platform

Ps-lite deep source code interpretation

Extensible distributed machine learning architecture based on Parameter Server

Mu Li. Scaling Distributed Machine Learning with the Parameter Server.

CMU. parameterserver.org/

Joseph E.Gonzalez. Emerging Systems For Large-scale Machine Learning.

MapReduce alternative -Parameter Server

Parameter_server architecture

Adam: Large-scale distributed machine Learning framework

Parameter Server for Distributed Machine Learning Ps-Lite Documents PS-Lite

Ps-lite source code analysis

Blog.csdn.net/stdcoutzyx/… Blog.csdn.net/cyh_24/arti… www.zybuluo.com/Dounm/note/… Blog.csdn.net/KangRoger/a…

www.cnblogs.com/heguanyou/p…

MXNet pS-Lite and Parameter Server principles

Set up the PS-Lite environment

Ps-lite learn one of the series —– Install PS-Lite for MAC

Ps-lite Notes (Dist – LR Analysis)

[Tech1] Simple parameter server: PS-Lite parsing

Introduction to distributed machine learning — implementation principle of logistic regression based on parameter server

Ps-lite source code analysis

www.zhihu.com/topic/20175…

Large Scale Machine Learning–An Engineering Perspective– Directory

Parallel logistic regression

Distributed Word2VEc based on PS-Lite

Ps-lite series 3 — Overview of pS-Lite

Ps-lite Code notes

Ps-lite Notes (Dist – LR Analysis)

www.zhihu.com/topic/20175…

Blog.csdn.net/zkwdn/artic…

Ps-lite Notes (Dist – LR Analysis)

www.zhihu.com/topic/20175…

Blog.csdn.net/zkwdn/artic…